Protocols

The core of Finagle is protocol-agnostic, meaning its internals provide an extensible RPC subsystem without defining any details of specific client-server protocols. Thus in order to provide usable APIs for clients and servers, there are a number of Finagle subprojects that implement common protocols. A few of these protocol implementations are documented below.

Thrift

Apache Thrift is an interface definition language. With its associated code generator(s) and binary communication protocol, Thrift facilitates the development of scalable RPC systems. By “scalable”, we specifically mean that IDLs can be shared, allowing developers to define schemas and expose network services that clients access using code generated for their preferred programming language.

The IDL is the core of Thrift. IDLs provide clear service specifications that we can use to implement clients and servers. This means that different implementations of servers can be swapped out transparently, since they all expose the same interface, regardless of language.

Thrift was originally built at Facebook. For more details on the original design, check out the whitepaper.

finagle-thrift is used extensively within Twitter, but to meet the needs of our service-oriented architecture we had to extend the Thrift protocol. We introduced the notion of “Twitter-upgraded Thrift”, or TTwitter, which augments the protocol with support for our internal infrastructure. Specifically, we tack on a request header containing Zipkin tracing info, Finagle ClientId strings, and Wily delegations. In order to maintain backwards compatibility, TTwitter clients perform protocol negotiation upon connection and will downgrade to raw TBinary Thrift if servers are not using the upgraded protocol. By default, finagle-thrift uses the Thrift framed codec and the binary protocol for serialization.

Note

Some Thrift server implementations do not handle this protocol negotiation well. If your client should run into this, you can disable it by calling Thrift.client.withNoAttemptTTwitterUpgrade.

Using finagle-thrift

At Twitter, we use our open-source Thrift code-generator called Scrooge. Scrooge is written in Scala and can generate source code in Scala or Java. Given the following IDL:

service LoggerService {
  string log(1: string message, 2: i32 logLevel) throws (1: WriteException writeEx);
  i32 getLogSize() throws (1: ReadException readEx);
}

Scrooge will generate code that can be used by finagle-thrift with the following rich APIs:

Serving the IDL:

val server = Thrift.server.serveIface(
  "localhost:1234",
  new LoggerService[Future] {
    def log(message: String, logLevel: Int): Future[String] = {
      println(s"[$logLevel] Server received: '$message'")
      Future.value(s"You've sent: ('$message', $logLevel)")
    }

    var counter = 0
    // getLogSize throws ReadExceptions every other request.
    def getLogSize(): Future[Int] = {
      counter += 1
      if (counter % 2 == 1) {
        println(s"Server: getLogSize ReadException")
        Future.exception(new ReadException())
      } else {
        println(s"Server: getLogSize Success")
        Future.value(4)
      }
    }
  })

Construct a client:

val clientServiceIface: LoggerService.ServiceIface =
  Thrift.client.newServiceIface[LoggerService.ServiceIface]("localhost:1234", "thrift_client")

A ServiceIface is a collection of Services, one for each Thrift method. Call the log method:

val result: Future[Log.SuccessType] = clientServiceIface.log(Log.Args("hello", 1))

Thrift services can be combined with Filters.

val uppercaseFilter = new SimpleFilter[Log.Args, Log.SuccessType] {
  def apply(req: Log.Args, service: Service[Log.Args, Log.SuccessType]): Future[Log.SuccessType] = {
    val uppercaseRequest = req.copy(message = req.message.toUpperCase)
    service(uppercaseRequest)
  }
}

def timeoutFilter[Req, Rep](duration: Duration) = {
  val exc = new IndividualRequestTimeoutException(duration)
  val timer = DefaultTimer.twitter
  new TimeoutFilter[Req, Rep](duration, exc, timer)
}
val filteredLog = timeoutFilter(2.seconds)
  .andThen(uppercaseFilter)
  .andThen(clientServiceIface.log)

filteredLog(Log.Args("hello", 2))
// [2] Server received: 'HELLO'

Here’s an example of a retry policy that retries on Thrift exceptions:

val retryPolicy = RetryPolicy.tries[Try[GetLogSize.Result]](3,
{
  case Throw(ex: ReadException) => true
})

val retriedGetLogSize =
  new RetryExceptionsFilter(retryPolicy, DefaultTimer.twitter)
    .andThen(clientServiceIface.getLogSize)

retriedGetLogSize(GetLogSize.Args())

Another way to construct Thrift clients is using the method interface:

val client: LoggerService.FutureIface = Thrift.client.newIface[LoggerService.FutureIface]("localhost:1234")
client.log("message", 4) onSuccess { response =>
  println("Client received response: " + response)
}

To convert the Service interface to the method interface use Thrift.newMethodIface:

val filteredMethodIface: LoggerService[Future] =
  Thrift.client.newMethodIface(clientServiceIface.copy(log = filteredLog))
Await.result(filteredMethodIface.log("ping", 3).map(println))

The complete example is at ThriftServiceIfaceExample.scala. Check out the finagle-thrift API for more info.

Mux

What is Mux?

At its core, Mux is a generic RPC multiplexing protocol. Although its primary implementation is as a Finagle subproject, Mux is not Finagle-specific. In the same way that HTTP is an application-layer protocol with numerous implementations in a variety of languages, Mux is a session-layer protocol with a Scala implementation in the finagle-mux package. Also since it is a purely session-layer protocol, Mux can be used in conjunction with protocols from other layers of the OSI model. For example, Finagle currently has an implementation of the Thrift protocol on top of Mux, available in the finagle-thriftmux package.

Much of the future work on Finagle will involve improvements to Mux and feature development targeting services that support it. The wire format and semantics of the Mux protocol are documented in its source code.

Why is RPC multiplexing important?

Some important consequences of multiplexing at the RPC level:

  • One network connection per client-server session
  • Maximization of available bandwidth without incurring the cost of opening additional sockets
  • Elimination of head-of-line blocking
  • Explicit queue management

Mux as a pure session-layer protocol

In OSI terminology, Mux is a pure session layer protocol. As such, it provides a rich set of session control features:

Session Liveness Detection

In the past, Finagle has relied on mechanisms like failure accrual to detect the health of an endpoint. These mechanisms require configuration which tended to vary across disparate services and endpoints. Mux introduces a principled way to gauge session health via ping messages. This allows Finagle to provide more generic facilities for liveness detection that require little-to-no configuration.

Request Cancellation

Without proper protocol support, request cancellation in Finagle has historically been difficult to implement efficiently. For example, something as simple as timing out a request requires closing its TCP connection. Mux gives us granular control over request cancellation without having to go so far as to terminate a session.

Ability to advertise session windows

Mux enables servers to advertise availability on a per-window basis. This is useful for establishing explicit queueing policies, leading the way to intelligent back-pressure, slow start, and GC-avoidance.

Mysql

finagle-mysql is an asynchronous implementation of the MySQL protocol built on top of Finagle. The project provides a simple query API with support for prepared statements and transactions while taking advantage of Finagle’s client stack for connection pooling. The implementation supports both the MySQL binary and string protocols.

A client can be constructed using the Mysql protocol object:

val client = Mysql.client
  .withCredentials("<user>", "<password>")
  .withDatabase("test")
  .configured(DefaultPool.Param(
    low = 0, high = 10,
    idleTime = 5.minutes,
    bufferSize = 0,
    maxWaiters = Int.MaxValue))
  .newClient("127.0.0.1:3306")

We configure the client’s connection pool to be compatible with our MySQL server. The constructor returns a Finagle ServiceFactory from mysql.Request to mysql.Result which we can use to query the db:

val product = client().flatMap { service =>
  // `service` is checked out from the pool.
  service(QueryRequest("SELECT 5*5 AS `product`")) map {
    case rs: ResultSet => rs.rows.map(processRow)
    case _ => Seq.empty
  } ensure {
    // put `service` back into the pool.
    service.close()
  }
}

A ResultSet makes it easy to extract Values based on column names. For example, we can implement the above processRow as a pattern match on expected values:

def processRow(row: Row): Option[Long] =
  row("product").flatMap {
    case LongValue(l) => Some(l)
    case IntValue(i) => Some(i.toLong)
    case _ => None
  }

The ServiceFactory API gives you more fine-grained control over the pool. This isn’t always necessary - to simplify finagle-mysql offers a rich API that wraps the ServiceFactory returned from newClient:

val richClient = Mysql.client
  .withCredentials("<user>", "<password>")
  .withDatabase("test")
  .configured(DefaultPool.Param(
    low = 0, high = 10,
    idleTime = 5.minutes,
    bufferSize = 0,
    maxWaiters = Int.MaxValue))
  .newRichClient("127.0.0.1:3306")

and we can select:

val product = richClient.select("SELECT 5*5 AS `product`")(processRow)

Note that select takes care of checking out the service and returning it to the pool. select and other useful methods are available on mysql.Client which is returned from the call to newRichClient.

For a more involved example see the Finagle example project.