How to compose two parallel Tasks to cancel one task if another one fails?

How to compose two parallel Tasks to cancel one task if another one fails?

I would like to implement my asynchronous processing with 
scalaz.concurrent.Task. I need a function (Task[A], Task[B]) => Task[(A, B)] to return a new task that works as follows:

run Task[A] and Task[B] in parallel and wait for the results;
if one of the tasks fails then cancel the second one and wait until it terminates;
return the results of both tasks. 

How would you implement such a function ?

Solutions/Answers:

Answer 1:

As I mention above, if you don’t care about actually stopping the non-failed computation, you can use Nondeterminism. For example:

import scalaz._, scalaz.Scalaz._, scalaz.concurrent._

def pairFailSlow[A, B](a: Task[A], b: Task[B]): Task[(A, B)] = a.tuple(b)

def pairFailFast[A, B](a: Task[A], b: Task[B]): Task[(A, B)] =
  Nondeterminism[Task].both(a, b)

val divByZero: Task[Int] = Task(1 / 0)
val waitALongTime: Task[String] = Task {
  Thread.sleep(10000)
  println("foo")
  "foo"
}

And then:

pairFailSlow(divByZero, waitALongTime).run // fails immediately
pairFailSlow(waitALongTime, divByZero).run // hangs while sleeping
pairFailFast(divByZero, waitALongTime).run // fails immediately
pairFailFast(waitALongTime, divByZero).run // fails immediately

In every case except the first the side effect in waitALongTime will happen. If you wanted to attempt to stop that computation, you’d need to use something like Task‘s runAsyncInterruptibly.

Answer 2:

There is a weird conception among java developers that you should not cancel parallel tasks. They comminate Thread.stop() and mark it deprecated. Without Thread.stop() you could not really cancel future. All you could do is to send some signal to future, or modify some shared variable and make code inside future to check it periodically. So, all libraries that provides futures could suggest the only way to cancel future: do it cooperatively.

I’m facing the same problem now and is in the middle of writing my own library for futures that could be cancelled. There are some difficulties but they may be solved. You just could not call Thread.stop() in any arbitrary position. The thread may perform updating shared variables. Lock would be recalled normally, but update may be stopped half-way, e.g. updating only half of double value and so on. So I’m introducing some lock. If the thread is in guarded state, then it should be now killed by Thread.stop() but with sending specific message. The guarded state is considered always very fast to be waited for. All other time, in the middle of computation, thread may be safely stopped and replaced with new one.

So, the answer is that: you should not desire to cancel futures, otherwise you are heretic and no one in java community would lend you a willing hand. You should define your own executional context that could kill threads and you should write your own futures library to run upon this context

References