Concurrent Programming with Futures

Finagle uses futures 1 to encapsulate and compose concurrent operations such as network RPCs. Futures are directly analogous to threads — they provide independent and overlapping threads of control — and can be thought of as featherweight threads. They are cheap in construction, so the economies of traditional threads do not apply. It is no problem to maintain millions of outstanding concurrent operations when they are represented by futures.

Futures also decouple Finagle from the operating system and runtime thread schedulers. This is used in important ways; for example, Finagle uses thread biasing to reduce context switching costs.

The harmony of this analogy has one discordant caveat: don’t perform blocking operations in a Future. Futures aren’t preemptive; they must yield control via flatMap. Blocking operations disrupt this, halting the progress of other asynchronous operations, and cause your application to experience unexpected slowness, a decrease in throughput, and potentially deadlocks. But of course it’s possible for blocking operations to be combined safely with Futures as we’ll see.

Blocking or synchronous work

When you have work that is blocking, say I/O or a library not written in an asynchronous style, you should use a com.twitter.util.FuturePool. FuturePool manages a pool of threads that don’t do any other work, which means that blocking operations won’t halt other asynchronous work.

In the code below, someIO is an operation that waits for I/O and returns a string (e.g., reading from a file). Wrapping someIO(): String in FuturePool.unboundedPool returns a Future[String], which allows us to combine this blocking operation with other Futures in a safe way.

Scala:

import com.twitter.util.{Future, FuturePool}

def someIO(): String =
  // does some blocking I/O and returns a string

val futureResult: Future[String] = FuturePool.unboundedPool {
  someIO()
}

Java:

import com.twitter.util.Future;
import com.twitter.util.FuturePools;
import static com.twitter.util.Function.func0;

Future<String> futureResult = FuturePools.unboundedPool().apply(
  func0(() -> someIO());
);

Synchronized work

Synchronization is different from synchronous behavior. Synchronous calls wait on some work to complete before executing the next statement within the same thread. Synchronized sections, in contrast, allow one thread to execute statements and block all other callers until the initial thread is finished with the enclosed statements.

An example that could fail without synchronization would be:

def incrementAndReturn(): Integer = {
  counter += 1;
  counter
}

If two threads are executing the incrementAndReturn function concurrently on the same object, it is possible that both execute the statement counter += 1 before either executes the return statement. If that happens, both threads would get the same value (e.g. if counter was 45 before either thread ran, 47 would be returned to both threads).

A simple way to guarantee that each thread sees a unique counter value without skipping would be to enclose the critical statements in a synchonized block. Syntactially it looks like this:

def incrementAndReturn(): Integer = {
  this.synchronized {
    counter += 1;
    counter
  }
}

Scoping synchronization

In the previous section a simple introduction and example were given for synchronization wherein multiple threads have a handle to an object and can experience unexpected behavior when executing concurrently. The simple remedy was to add a synchronized block on the this object within the function body. What does that mean though?

The syntax for creating a synchronized block requires the user to define a lock object which grants access to the block that follows. Only one thread at a time can be in control of a lock object. In the example the this object was the lock, meaning that within the class anywhere a this.synchronized {…} block occurs, it is tied to the same object, this. For example:

def incrementAndReturn(): Integer = {
  this.synchronized {
    counter += 1;
    counter
  }
}

def decrementAndReturn(): Integer = {
  this.synchronized {
    counter -= 1;
    counter
  }
}

Both functions above are now gated on the same object, this, so not only will multiple threads with the same object handle serially execute incrementAndReturn, they will also be waiting in line for this when calling decrementAndReturn.

Other methods within the class could be free to execute without waiting by omitting the synchronized block

def incrementAndReturn(): Integer = {
  this.synchronized {
    counter += 1;
    counter
  }
}

def decrementAndReturn(): Integer = {
  this.synchronized {
    counter -= 1;
    counter
  }
}

def readCounter(): Integer = {
  counter
}

Here, any thread may call readCounter without waiting to control this.

Furthermore, it may be useful for readability and segmenting logic to define objects whose only purpose is as a synchronization lock rather than blocking at the granularity of the whole instance. For example:

private[this] var counter: Integer = 0
private[this] val lock: Object = counter

def incrementAndReturn(): Integer = {
  lock.synchronized {
    counter += 1;
    counter
  }
}

def decrementAndReturn(): Integer = {
  lock.synchronized {
    counter -= 1;
    counter
  }
}

def readCounter(): Integer = {
  counter
}

This gives us flexibility as we evolve the class to discover new operations that need synchronization that can be covered under the umbrella of the lock object. At the moment, the lock object is synonymous with the counter itself, but we may discover some other useful member to use as the lock in the future, or have separate locks for different sets of entangled state.

Synchronization risks

Synchronzation is a very useful language feature to define critical sections and let the runtime manage blocking, scheduling, and handing off control between threads. It can be a very efficient technique to ensure predictable mutation of internal state (like ‘counter’ above) or other simple actions.

However, synchronization can also expose a developer to a new class of bugs wherein threads are indefinitely waiting on a data change or on acquiring the lock object. Two common types of locking problems are livelocks and deadlocks.

A livelock occurs when threads are alive, but the code is waiting for some data change in order to proceed. The system will wake up a thread, the thread checks if the data is in the correct state, and then goes back to sleep when it sees no changes have occurred. If the responsible process or thread is unable to make its update, the system is in livelock.

A deadlock occurs when two or more threads are mutually blocked/halted on a statement that is synchronized on a lock object held by the other thread. For example, imagine a process exists where a thread acquires exclusive access to a Person, then queries the system for their siblings in order to update them together. If two threads are each tasked with doing this work for a pair of siblings, a deadlock can occur. Thread A acquires the lock for Person A. Thread B acquires the lock for Person B, sibling to Person A. Each now wishes to query and lock the siblings of their person. Thread A will be waiting for Thread B to be “finished” with Person B, and Thread B is likewise waiting for Thread A to be “finished” with Person A. Deadlock. A detailed example of a deadlock will follow below.

Futures as containers

Common examples of operations that are represented by futures are:

  • an RPC to a remote host

  • a long computation in another thread

  • reading from disk

Note that these operations are all fallible: remote hosts could crash, computations might throw an exception, disks could fail, etc. A Future[T], then, occupies exactly one of three states:

  • Empty (pending)

  • Succeeded (with a result of type T)

  • Failed (with a Throwable)

While it is possible to directly query this state, this is rarely useful. Instead, a callback may be registered to receive the results once they are made available:

import com.twitter.util.Future

val f: Future[Int] = ???

f.onSuccess { res: Int =>
  println("The result is " + res)
}

which will be invoked only on success. Callbacks may also be registered to account for failures:

import com.twitter.util.Future

val f: Future[Int] = ???

f.onFailure { cause: Throwable =>
  println("f failed with " + cause)
}

Sequential composition

Registering callbacks is useful but presents a cumbersome API. The power of Futures lie in how they compose. Most operations can be broken up into smaller operations which in turn constitute the composite operation. Futures makes it easy to create such composite operations.

Consider the simple example of fetching a representative thumbnail from a website (ala Pinterest). This typically involves:

  1. Fetching the homepage

  2. Parsing that page to find the first image link

  3. Fetching the image link

This is an example of sequential composition: in order to do the next step, we must have successfully completed the previous one. With Futures, this is called flatMap 3. The result of flatMap is a Future representing the result of this composite operation. Given some helper methods — fetchUrl fetches the given URL, findImageUrls parses an HTML page to find image links — we can implement our Pinterest-style thumbnail extract like this:

import com.twitter.util.Future

def fetchUrl(url: String): Future[Array[Byte]] = ???
def findImageUrls(bytes: Array[Byte]): Seq[String] = ???

val url = "https://www.google.com"

val f: Future[Array[Byte]] = fetchUrl(url).flatMap { bytes =>
  val images = findImageUrls(bytes)
  if (images.isEmpty)
    Future.exception(new Exception("no image"))
  else
    fetchUrl(images.head)
}

f.onSuccess { image =>
  println("Found image of size " + image.size)
}

f represents the composite operation. It is the result of first retrieving the web page, and then the first image link. If either of the smaller operations fail (the first or second fetchUrl or if findImageUrls doesn’t successfully find any images), the composite operation also fails.

The astute reader may have noticed something peculiar: this is typically the job of the semicolon! That is not far from the truth: semicolons sequence two statements, and with traditional I/O operations, have the same effect as flatMap does above (the exception mechanism takes the role of a failed future). Futures are much more versatile, however, as we’ll see.

Concurrent composition

It is also possible to compose Futures concurrently. We can extend our above example to demonstrate: let’s fetch all the images. Concurrent composition is provided by Future.collect:

import com.twitter.util.Future

val collected: Future[Seq[Array[Byte]]] =
  fetchUrl(url).flatMap { bytes =>
    val fetches = findImageUrls(bytes).map { url => fetchUrl(url) }
    Future.collect(fetches)
  }

Here we have combined both concurrent and sequential composition: first we fetch the web page, then we collect the results of fetching all of the underlying images.

As with sequential composition, concurrent composition propagates failures: the future collected will fail if any of the underlying futures do 2.

It is also simple to write your own combinators that operate over Futures. This is quite useful, and gives rise to a great amount of modularity in distributed systems as common patterns can be cleanly abstracted.

Parallel composition

Collect is specialized for when you do the same operation many times, returning the same result, and want to know when they’re all complete. However, we often trigger different operations that we can do in parallel, and may return different kinds of results. When we need to use the results of a few different kinds of computations, it can be useful to wait for all of the results to come back before continuing. From a classical thread programming model, the analogous idea would be calling join on a forked thread. This is where Future.join comes into play!

There are actually four different modes of Future.join. Although they were originally written for Scala, the methods on the Future object also have Java-friendly versions at Futures.join. The method on the Future instance should be usable from Java without any problem.

There’s Future#join, which is a method directly on the Future class, which accepts another Future as an argument and will return a Future that will be satisfied once both this and the argument passed to join are satisfied, and will contain the contents of both Futures.

import com.twitter.util.Future

val numFollowers: Future[Int] = ???
val profileImageURL: Future[String] = ???

val userProfileData: Future[(Int, String)] = numFollowers.join(profileImageURL)

There’s also Future.join, which can be used for many different results. There are many Future.join methods to support many different numbers of futures that need to be joined.

import com.twitter.util.Future

val numFollowers: Future[Int] = ???
val profileImageURL: Future[String] = ???
val followersYouKnow: Future[Seq[User]] = ???

val userProfileData: Future[(Int, String, Seq[User])] =
  Future.join(numFollowers, profileImageURL, followersYouKnow)

A common thing to do after calling Future#join is to immediately transform the result. As a minor optimization, we can avoid allocating the Tuple2 instance by using Future#joinWith.

import com.twitter.util.Future

val numFollowers: Future[Int] = ???
val profileImageURL: Future[String] = ???
val constructUserProfile: (Int, String) => UserProfile

val userProfile: Future[UserProfile] =
  numFollowers.joinWith(profileImageURL)(constructUserProfile)

The last Future.join is a bit of an odd one out–like Future.collect it operates on a Seq[Future[A]], but it only returns Future[Unit] at the end–namely, whether all of the components pieces succeeded or not. This method is used for implementing the other Future.join methods, and is exposed as a minor optimization for uses cases where all you need to know is success or failure, and not what the actual results was.

import com.twitter.util.Future

val numFollowers: Future[Int] = ???
val profileImageURL: Future[String] = ???
val followersYouKnow: Future[Seq[User]] = ???

val profileDataIsReady: Future[Unit] =
  Future.join(Seq(numFollowers, profileImageUrl, followersYouKnow))

Synchronization within composition

As teased above, synchronization can introduce a new class of bugs in a concurrent environment. A real world example of a deadlock can be found here: https://github.com/twitter/util/commit/b3b6…

Before the patch, the methods fail(..), release(), and the interrupt handler are all synchronized on this while completing a Promise. This can result in deadlocks if we have two threads interacting with two separate AsyncSemaphores. Here is a toy example that sets up cross-semaphore interaction. It will look a bit too obviously-broken to really happen, but isolates the misbehavior that could reasonably happen by accident:

val semaphore1, semaphore2 = new AsyncSemaphore(1)
// The semaphores have already been taken:
val permitForSemaphore1 = await(semaphore1.acquire())
val permitForSemaphore2 = await(semaphore2.acquire())

// The semaphores have had continuations attached as follows:
semaphore1.acquire().flatMap { permit =>
  val otherWaiters = semaphore2.numWaiters // synchronizing method
  permit.release()
  otherWaiters
}

semaphore2.acquire().flatMap { permit =>
  val otherWaiters = semaphore1.numWaiters // synchronizing method
  permit.release()
  otherWaiters
}

Now we can trigger a deadlock.

val threadOne = new Thread {
  override def run() {
    permitForSemaphore1.release()
  }
}

val threadTwo = new Thread {
  override def run() {
    permitForSemaphore2.release()
  }
}

threadOne.start
threadTwo.start

In this situation threadOne and threadTwo may potentially deadlock. It isn’t obvious from the release() calls that this is possible. The reason is the acquire() method returns a Promise and we’ve loaded continuations on them. When the threads call the Permit.release() method the AsyncSemaphore implementation synchronizes on the lock object (this), and gives the permit to the next waiting Promise before exiting the synchronized block. That executes the continuation which calls a method on the other AsyncSemaphore which attempts to synchronize. This is akin to the descriptive siblings program example above. The instances semaphore1 and semaphore2 aggressively locks their AsyncSemaphore then block until they can acquire the lock on the other semaphore.

The resolution is presented in the patch linked above, but presented here is a succinct description of the resolution. Promises provide some methods which, used together, are similar to compareAndSet semantics (a well known, safe pattern). The new structure of the release() method on the Permit:

// old implementation
// def release(): Unit = self.synchronized {
//  val next = waitq.pollFirst()
//  if (next != null) next.setValue(this) // <- nogo: still synchronized
//  else availablePermits += 1
// }

// new implementation

// Here we define a specific lock object, rather than use the `self` of `this`
// reference. It is synonymous with the internal Queue as that is what is
// driving our need for synchronization, but using this special-purpose reference
// gives us an opportunity in the future to refactor more easily.
private[this] final def lock: Object = waitq

@tailrec def release(): Unit = {
// we pass the Promise outside of the lock
val waiter = lock.synchronized {
  val next = waitq.pollFirst()
  if (next == null) {
    availablePermits += 1
  }
  next
}

if (waiter != null) {
  // since we are no longer synchronized with the interrupt handler
  // we leverage the atomic state of the Promise to do the right
  // thing if we race.
  if (!waiter.updateIfEmpty(Return(this))) {
    release()
  }
}

The new implementation is more complex and no longer synchronizes on the interrupt handler. This exposes the developer to a new consideration; any race between interrupting the Promise and giving it the Permit.

Using synchronization and Promises together requires a significant amount of care ensure a program is not at risk of deadlock. Some low risk uses of synchronization are:

  • Mutate or access a field.

  • Push or pop an element from a private ArrayDeque.

Some examples of risky actions to take in a synchronized block are:

  • Calling a function injected by a caller; the function could block, acquire locks of its own, compute pi to a billion digits, etc.

  • Calling methods on a trait of unknown origin: this is essentially the same thing as calling a user-injected function.

  • Completing a Promise: an example above with dangerous continuations.

Recovering from failure

Composed futures fail whenever any of their constituent futures fail. However it is often useful to recover from such failures. The rescue combinator on Future is the dual to flatMap: whereas flatMap operates over values, rescue operates over exceptions. They are otherwise identical. It is often desirable to handle only a subset of possible exceptions. To accommodate for this rescue accepts a PartialFunction, mapping a Throwable to a Future:

trait Future[A] {
  ..
  def rescue[B >: A](f: PartialFunction[Throwable, Future[B]]): Future[B]
  ..
}

The following retries a request infinitely should it fail with a TimeoutException:

import com.twitter.util.Future
import com.twitter.finagle.http
import com.twitter.finagle.TimeoutException

def fetchUrl(url: String): Future[http.Response] = ???

def fetchUrlWithRetry(url: String): Future[http.Response] =
  fetchUrl(url).rescue {
    case exc: TimeoutException => fetchUrlWithRetry(url)
  }

Racing Futures

Sometimes we don’t care which Future finishes first. There are three common cases for this:

  1. Backup or hedged requests, where we send two identical requests in the hope that if one of them is slow, the second one won’t be. This is usually a good opportunity to use backup requests.

  2. Concurrent work where you want to queue up more work as the work is completed.

  3. Concurrent work where there’s processing that needs to be done after each future is satisfied, and which future is satisfied first doesn’t matter.

For cases where backup requests aren’t easy to use, Future.select is often the right tool.

There are three modes of Future.select. Although they were originally written for Scala, there is also an implementation of Futures.select. The methods on the Future instance should be usable from Java without any problem.

The simplest is the methods on the Future instance, which will simply return the future that’s returned first, Future#select and Future#or which have identical behavior.

import com.twitter.util.Future

val original: Future[Tweet] = ???
val hedged: Future[Tweet] = ???
// Future#select[U >: A](Future[U]): Future[U]
val fasterTweet = original.select(hedged)

The more powerful one is Future.select or Future.selectIndex. Future.select accepts a collection of Futures. It then returns a Future which will contain a tuple of the contents of the first satisfied Future, and a collection of the rest of the futures. It can be quite useful to have the rest of the futures. You can interrupt the rest if they’re unneeded, or you can inspect whether other futures have also been satisfied and can be process immediately without yielding, or you can select again on the futures and yield until one of them is satisfied. Processing the results of Future.select typically takes advantage of Future recursion. Here are some examples of the different modes:

Interrupt the rest:

import com.twitter.util.Future
import com.twitter.util.Try
import java.util.concurrent.CancellationException

val doWork: () => Future[Tweet] = ???
val tweets: Seq[Future[Tweet]] = Seq.fill(10)(doWork)
// Future.select[A](Seq[Future[A]]): Future[(Try[A], Seq[Future[A]])]
val first: Future[Tweet] = Future.select(tweets).flatMap {
  case (first, rest) =>
    val cancelEx = new CancellationException("lost the race")
    rest.foreach { f => f.raise(cancelEx) }
    Future.const(first)
}

Process as much as you can as eagerly as you can:

import com.twitter.util.Future
import com.twitter.util.Try

val doWork: () => Future[Tweet]
val tweets: Seq[Future[Tweet]] = Seq.fill(10)(doWork)
def tweetSentiment(tweet: Tweet): Int = ???
def aggregateTweetSentiment(f: Future[(Try[Tweet], Seq[Future[Tweet]])]): Future[Seq[Int]] = f match {
  case (first, rest) =>
    val (finished, unfinished) = (Future.const(first) +: rest).foldLeft((Seq[Tweet](), Seq[Future[Tweet]]())) {
      case ((complete, incomplete), f) => f.poll match {
        case Some(Return(tweet)) => (complete :+ tweet, incomplete)
        case None => (complete, incomplete :+ f)
        case _ => (complete, incomplete) // failed future
      }
    }
    val sentiments = finished.map(tweetSentiment)
    if (unfinished.isEmpty) Future.value(sentiments)
    else Future.select(unfinished).flatMap(aggregateTweetSentiment).map(sentiments ++ _)
}
// Future.select[A](Seq[Future[A]]): Future[(Try[A], Seq[Future[A]])]
val avgTweetSentiment: Future[Int] = Future.select(tweets).flatMap(aggregateTweetSentiment).map { seq =>
  if (seq.isEmpty) 0 else (seq.sum / seq.length)
}

Keep going until the first successful result:

import com.twitter.util.Future
import com.twitter.util.Try

val doWork: () => Future[Tweet]
val tweets: Seq[Future[Tweet]] = Seq.fill(10)(doWork)
def raceTheTweets(f: Future[(Try[Tweet], Seq[Future[Tweet]])]): Future[Tweet] = f match {
  case (Throw(_), rest) if rest.length > 1 => // There was a failure, but there are more futures to await
    Future.select(rest).flatMap(raceTheTweets _)
  case (Throw(_), Seq(last)) => // Only one remaining, we will return it regardless of success
    last
  case (result, _) => // This is either successful or the last Future has failed
    Future.const(result)
}
// Future.select[A](Seq[Future[A]]): Future[(Try[A], Seq[Future[A]])]
val first: Future[Tweet] = Future.select(tweets).flatMap(raceTheTweets _)

An even more powerful, but slightly more cumbersome API is Future.selectIndex. From an IndexedSeq[Future], it simply returns which one was satisfied first. This has two benefits. The first is that if you have additional information about the sequence of the collection you passed in, you can exploit that in making decisions about what to do. In comparison, you don’t know which Future Future.select returns. The second benefit is that it avoids returning a complex type, and simply returns the index of the array.

import com.twitter.util.Future
import com.twitter.util.Try
import java.util.concurrent.CancellationException

val doWork: () => Future[Tweet]
val tweets: IndexedSeq[Future[Tweet]] = IndexedSeq.fill(10)(doWork)
// Future.selectIndex[A](IndexedSeq[Future[A]]): Future[Int]
val first: Future[Tweet] = Future.selectIndex(tweets).flatMap { idx =>
  val cancelEx = new CancellationException("lost the race")
  for (i < 0 until 10 if i != idx) tweets(i).raise(cancelEx)
  tweets(idx)
}

Other resources

Footnotes

1

Finagle uses its own Future implementation by a variety of reasons (fewer context switches, interruptibility, support for continuation-local variables, tail-call elimination), but mostly because it’s preceded SIP-14 by over a year.

2

Use Future.collectToTry to concurrently collect a sequence of futures while accumulating errors instead of failing fast.

3

The name flatMap may seem strange and unrelated to our present discussion, but its etymology is impeccable: it derives from a deeper relationship between the sort of sequential composition we do with futures, to a similar sort of composition we can perform over collections. See the this page for more details.