Extending Finagle

In this section we’ll examine the internals of Finagle by implementing a client and server for a simple newline-delimited, string-based protocol. We’ll build a client capable of sending simple strings to a server, which in turn may reply with another.

While we’re going to dig a bit deeper here than in the rest of the user’s guide — implementing even a simple protocol requires a deeper appreciation of Finagle’s internals — the material presented here is not necessary to understand how to use Finagle productively. However, expert users may find the material useful.

If you haven’t yet, first read the quickstart to understand some of Finagle’s core concepts (i.e. Futures, Services and Filters).

The entire example is available, together with a self-contained script to launch sbt, in the Finagle git repository:

$ git clone https://github.com/twitter/finagle.git
$ cd finagle/doc/src/sphinx/code/client-server-anatomy
$ ./sbt run

Stack

Finagle’s clients and servers comprise many relatively simple components, arranged together in a stack. Each component is a ServiceFactory that composes other service factories. We’ve seen this before: filters are a kind of component.

This allows us to create simple components that we arrange into a sophisticated whole. For example, we define timeout behavior as a separate module; we then arrange to insert this module into the composite stack wherever timeout behavior is required. Some of the components included in Finagle’s clients are discussed in the client documentation.

While we can imagine stacking such modules together in a manual fashion—for example, by building it bottoms-up, passing each ServiceFactory into the constructor of the next—this technique quickly becomes onerous; it become very difficult to:

  • parameterize each component, for example to set the sizes of connection pools
  • inject dependencies such as StatsReceiver implementations
  • rearrange any part of the stack, for example to remove an unused timeout filter
  • modify the stack for a specific use, for example to replace the connection pooling implementation or the load balancer

Traditionally, the ClientBuilder and ServerBuilder performed this duty in the manner just described. While it worked well for a while, it ultimately proved inflexible. With the needs and requirements of the various protocol implementations like Mux’s, as well as more sophisticated new features, the builders became unworkable monoliths.

Stack formalizes the concept of a stackable component and treats a sequence of stackable components as a first-class (immutable) value that may be manipulated like any other collection. For example, modules may be inserted or removed from the stack; you can map one stack to another, for example when parameterizing individual modules.

The stack abstraction also formalizes the concept of a parameter—i.e. a map of values used to construct an object. Stack.Params is a kind of type-safe map used to hold parameters.

Stack and Param work together like this: Stack represents the stack of modules. These modules aren’t materialized. Rather, they represent a constructor for a ServiceFactory that accepts a Params map. Thus, to extract the final ServiceFactory that represents the entirety of the stack, we simply call stack.make(params).

Finagle defines default (polymorphic) stacks for both clients and servers.

We’ll now discuss the constituent parts of Finagle’s clients and servers.

Transport Layer

Finagle represents the OSI transport layer as a typed stream that may be read from and written to asynchronously. The noteworthy methods in the interface are defined as such:

trait Transport[In, Out] {
  def read(): Future[Out]
  def write(req: In): Future[Unit]
  ...
}

Most Transports are implemented using Netty for I/O multiplexing and protocol codecs.

You have a choice of whether to use Netty 3 or Netty 4 for your protocol implementation. As we have migrated our existing protocols to Netty 4, (which is currently in beta and should be considered experimental), our best-practice recommendations have evolved, and those will be covered in the following section.

Decoding in the Transport

In the following sections, we use Netty-provided encoders/decoders in the Pipeline, but it would be possible to have an empty pipeline and do this work in a Transport.

If you were to write your own custom decoding code, which we strongly recommend, it’s preferable to put that logic in a Transport. There are two motivations for this:

1. Being decoupled from an I/O multiplexer means that it can be easily swapped out (e.g., for a new version of Netty).

2. Finagle and Netty present different programming models and it isn’t straight forward to reason about logic split amongst them.

In this case, the Pipeline will only contain a handler that decodes to/from the Netty buffer type (ChannelBuffer for Netty 3 and ByteBuf for Netty 4) from/to the buffer type used in Finagle, Buf, and a framer (or framing could happen in the Transport).

On the client side, the Pipeline would look like:

import com.twitter.finagle.netty4.codec.BufCodec
import io.netty.channel.ChannelPipeline

object ClientPipeline extends ChannelPipelineFactory {
  def getPipeline = {
    val pipeline = Channels.pipeline()
    pipeline.addLast("bufCodec", new BufCodec)
    pipeline.addLast("framer", new MyFramer)
  }
}

We’ll cover two different approaches to decoding in the Transport.

The first, and simplest approach, is best if your decoding logic is stateless and has a one-to-one mapping with the frames received. If your client or server can decode one frame into one message, without knowing anything about the previous or next frame, this approach will work for you.

On your message type object (in this case, Message), define encoding and decoding methods:

object Message {
  def encode(msg: Message): Buf = {
  }

  def decode(buf: Buf): Message = {
    ...
  }
}

class Message {
  ...
}

Then, on the client side, we can simply map over the Transport with the encoding and decoding functions to produce a new Transport that will do decoding and encoding for us (to do the same thing on the server side, we’d use a ServerDispatcher ).

protected def newDispatcher(
    transport: Transport[ChannelBuffer, ChannelBuffer]): Service[Message, Message] =
  new SerialClientDispatcher(transport.map(Message.encode, Message.decode))

The second approach to doing decoding in the Transport applies if your client or server decodes one frame into multiple messages (or multiple frames into a single message), or needs to maintain some state when decoding. In that case, you can use a TransportProxy to wrap the underlying Transport and implement your own read/write methods.

A custom client transport might look like the following; where a Decoder class maintains state and returns a Message when sufficient frames have been read, or null otherwise.

class ClientTransport(
  underlying: Transport[Buf, Buf]
) extends TransportProxy {

  val decoder = new MyDecoder()
  val encoder = new MyEncoder()

  private[this] val decode: Buf => Future[Message] = buf => {
    val msg: Message = decoder.decode(buf)
    if (msg != null) {
      msg
    } else {
      readLoop()
    }
  }

  private[this] def readLoop(): Future[Message] = underlying.read().flatMap(decode)

  def read(): Future[Response] = readLoop()

  def write(msg: Message): Future[Unit] = {
    val buf: Buf = encoder.encode(msg)
    underlying.write(buf)
  }
}

Server Protocol

To frame data received over the network with respect to our protocol, we use a Netty Channel Pipeline. Our server pipeline defines a UTF-8 text-based newline delimited protocol (since Netty already provides a StringEncoder and StringDecoder, we’ll put this decoding in the Pipeline). The types have different namespaces but look much the same in Netty 3 and Netty 4.

Using a Netty 3 ChannelPipeline:

import org.jboss.netty.handler.codec.string.{StringEncoder, StringDecoder}
import org.jboss.netty.channel._
import org.jboss.netty.handler.codec.frame.{Delimiters, DelimiterBasedFrameDecoder}
import org.jboss.netty.util.CharsetUtil

object StringServerPipeline extends ChannelPipelineFactory {
  def getPipeline = {
    val pipeline = Channels.pipeline()
    pipeline.addLast("line", new DelimiterBasedFrameDecoder(100, Delimiters.lineDelimiter: _*))
    pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8))
    pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8))
    pipeline
  }
}

Using a Netty 4 ChannelPipeline:

import io.netty.handler.codec.string.{StringEncoder, StringDecoder}
import io.netty.channel._
import io.netty.handler.codec.frame.{Delimiters, DelimiterBasedFrameDecoder}
import io.netty.util.CharsetUtil

object StringServerPipeline extends ChannelPipelineFactory {
  def getPipeline = {
    val pipeline = Channels.pipeline()
    pipeline.addLast("line", new DelimiterBasedFrameDecoder(100, Delimiters.lineDelimiter: _*))
    pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8))
    pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8))
    pipeline
  }
}

Listener

The mechanics of listening over a network socket and translating our pipeline into a typed transport are defined by the Netty Listener. Finagle provides a Netty3Listener and Netty4Listener.

Here we define a Netty4Listener in our server implementation:

protected def newListener(): Listener[String, String] =
  Netty4Listener(StringServerPipeline, params)

This implements the Listener interface that exposes a listen method:

def listen(addr: SocketAddress)(serveTransport: Transport[In, Out] => Unit)

That is, given a socket address to bind and listen, serveTransport is dispatched for each new connection established.

For example, here is a simple echo server using a Netty4Listener:

val address = new java.net.InetSocketAddress("localhost", 8080)
val listener = Netty4Listener(StringServerPipeline, StackServer.defaultParams)
val echoServer = listener.listen(address) { transport =>
   transport.read() flatMap { transport.write(_) } ensure transport.close()
 }

We can now send requests over this socket and have them echoed back:

> echo "hello" | nc localhost 8080
> hello

The serveTransport function defined above is primitive. For example, it closes each connection after one read and write. Finagle provides tools to provision a transport with more sophisticated behavior.

Server Dispatcher

The server dispatcher queues concurrent incoming requests and serially dispatches them over a Transport. The data read from the Transport is funneled through a service object and the resulting value is written back to the Transport. Additionally, the server dispatcher drains existing requests before closing a Transport.

We could translate our serveTransport function to use this facility.

Using Netty 4:

val service = new Service[String, String] {
  def apply(request: String) = Future.value(request)
}
val serveTransport = (t: Transport[String, String]) => new SerialServerDispatcher(t, service)
val listener = Netty4Listener[String, String](StringServerPipeline, StackServer.defaultParams)
val server = listener.listen(address) { serveTransport(_) }

A nice consequence of using a Service to process data received over the transport is the ability to furnish our server with additional behavior via Filters. This is exactly what Finagle’s default server implementation does.

StdStackServer

Finagle’s StdStackServer provides appropriate features for building a robust server. It puts together a Listener and a Dispatcher in much the same way we just did. StdStackServer also layers a Stack on top of it (e.g. to provide timeouts, stats, concurrency control, tracing, etc.) and takes care of graceful shutdown, so that outstanding requests are drained before a server exits. The resulting server is fully parameterized, providing a simple and standard way to receive parameters and dependencies.

Using the listener and dispatcher as above, we define our full server. The abstract type parameters In and Out are used when the type of Listener differs from the type of Server. This is common when some protocol processing is done in the Dispatcher.

We’ll create a server that uses Netty 4:

case class Server(
  stack: Stack[ServiceFactory[String, String]] = StackServer.newStack,
  params: Stack.Params = StackServer.defaultParams)
    extends StdStackServer[String, String, Server] {
  protected type In = String
  protected type Out = String

  protected def copy1(
    stack: Stack[ServiceFactory[String, String]] = this.stack,
    params: Stack.Params = this.params
  ): Server = copy(stack, params)

  protected def newListener(): Listener[String, String] =
    Netty4Listener(StringServerPipeline, params)

  protected def newDispatcher(
    transport: Transport[String, String],
    service: Service[String, String]
  ) =
    new SerialServerDispatcher(transport, service)
}

Finally, we make use of our service:

val service = new Service[String, String] {
  def apply(request: String) = Future.value(request)
}
val server = Echo.serve(":8080", service)
Await.result(server)

To create a server that uses Netty 3, use a Netty3Listener and a Netty 3 ChannelPipeline.

Client Protocol

Again, we’ll use a Netty Channel Pipeline to frame our network traffic. Our client pipeline defines a UTF-8 newline delimited protocol. As with the server pipeline, the types have different namespaces but look much the same in Netty 3 and Netty 4.

Netty 3 ChannelPipeline:

import org.jboss.netty.handler.codec.string.{StringEncoder, StringDecoder}
import org.jboss.netty.channel._
import org.jboss.netty.handler.codec.frame.{Delimiters, DelimiterBasedFrameDecoder}
import org.jboss.netty.util.CharsetUtil

object StringClientPipeline extends ChannelPipelineFactory {
  def getPipeline = {
    val pipeline = Channels.pipeline()
    pipeline.addLast("stringEncode", new StringEncoder(CharsetUtil.UTF_8))
    pipeline.addLast("stringDecode", new StringDecoder(CharsetUtil.UTF_8))
    pipeline.addLast("line", new DelimEncoder('\n'))
    pipeline
  }
}

class DelimEncoder(delim: Char) extends SimpleChannelHandler {
  override def writeRequested(ctx: ChannelHandlerContext, evt: MessageEvent) = {
    val newMessage = evt.getMessage match {
      case m: String => m + delim
      case m => m
    }
    Channels.write(ctx, evt.getFuture, newMessage, evt.getRemoteAddress)
  }
}

Netty 4 ChannelPipeline:

import io.netty.handler.codec.string.{StringEncoder, StringDecoder}
import io.netty.channel._
import io.netty.handler.codec.frame.{Delimiters, DelimiterBasedFrameDecoder}
import io.netty.util.CharsetUtil

object StringClientPipeline extends ChannelPipelineFactory {
  def getPipeline = {
    val pipeline = Channels.pipeline()
    pipeline.addLast("stringEncode", new StringEncoder(CharsetUtil.UTF_8))
    pipeline.addLast("stringDecode", new StringDecoder(CharsetUtil.UTF_8))
    pipeline.addLast("line", new DelimEncoder('\n'))
    pipeline
  }
}

class DelimEncoder(delim: Char) extends SimpleChannelHandler {
  override def writeRequested(ctx: ChannelHandlerContext, evt: MessageEvent) = {
    val newMessage = evt.getMessage match {
      case m: String => m + delim
      case m => m
    }
    Channels.write(ctx, evt.getFuture, newMessage, evt.getRemoteAddress)
  }
}

Transporter

A Transporter is responsible for connecting a Transport to a peer; it establishes a session. Finagle provides a Netty3Transporter and a Netty4Transporter, however the use of other Transporters is fully supported.

Using a Netty4Transporter:

protected def newTransporter(addr: SocketAddress): Transporter[String, String] =
  Netty4Transporter.rawTransporter(StringClientPipeline, addr, params)

Client Dispatcher

A client dispatcher turns a Transport (a stream of objects) into a Service (request-response pairs). It must manage all outstanding requests, pairing incoming responses to their respective requests. The simplest kind of dispatcher is called a SerialClientDispatcher, which allows only a single outstanding request (concurrent requests are queued) [1].

Our client will employ the SerialClientDispatcher.

[1]Note that Finagle also includes a dispatcher that can pipeline requests, i.e., allow more than one outstanding request. It’s possible to create a custom dispatcher as well. For example, Mux, which support true multiplexing, defines a custom dispatcher.

A Basic Client

Given a defined transporter and request dispatching strategy, we can compose the two and create a client. We’ll create a client that uses Netty 4:

val addr = new java.net.InetSocketAddress("localhost", 8080)
val transporter =
  Netty4Transporter[String, String](StringClientPipeline, addr, StackClient.defaultParams)

val bridge: Future[Service[String, String]] =
  transporter(addr) map { transport => new SerialClientDispatcher(transport) }

val client = new Service[String, String] {
  def apply(req: String) = bridge flatMap { svc => svc(req) ensure svc.close() }
}

Finally, we can dispatch requests over our client.

val result = client("hello")
println(Await.result(result))

Assuming we have a server willing to listen, we can expect a response:

$ ./sbt run
> hello

To create a client that uses Netty 4, use a Netty4Transporter.

A Robust Client

Our client is a Service, so we can supply additional behavior to make our client more robust using filters:

val retry = new RetryExceptionsFilter[String, String](
  retryPolicy = RetryPolicy.tries(3),
  timer = DefaultTimer.twitter
)

val timeout = new TimeoutFilter[String, String](
  timeout = 3.seconds,
  timer = DefaultTimer.twitter
)

val maskCancel = new MaskCancelFilter[String, String]

Composing these filters [2] with our basic client demonstrates the composable components used throughout finagle.

val newClient =
  retry andThen
    timeout andThen
    maskCancel andThen client

val result = newClient("hello")
println(Await.result(result))

This client is a good start, but we cannot dispatch concurrent requests to a single host, nor load balance over multiple hosts. A typical Finagle client affords us the ability to dispatch a large number of concurrent requests.

[2]The use of the MaskCancelFilter in the example filter stack ensures that timeout exceptions don’t propagate to our bottom most service which, in this case, represents a dispatcher. Without this guarantee, the service would be closed after the first timeout exception. This becomes unnecessary when we use a StdStackClient because the semantics of Service#close() change with respect to Finagle’s connection pool.

StdStackClient

The StdStackClient combines a Transporter, a Dispatcher, and a Stack to provide a robust, load balanced, resource-managed client. The default stack includes many features including load balancing over multiple hosts and connection pooling per host. See the section on client modules for more details.

Putting together a StdStackClient is simple:

case class Client(
  stack: Stack[ServiceFactory[String, String]] = StackClient.newStack,
  params: Stack.Params = StackClient.defaultParams)
    extends StdStackClient[String, String, Client] {
  protected type In = String
  protected type Out = String

  protected def copy1(
    stack: Stack[ServiceFactory[String, String]],
    params: Stack.Params
  ): Client =
    copy(stack, params)

  protected def newTransporter(addr: SocketAddress): Transporter[String, String] =
    Netty4Transporter.rawTransporter(StringClientPipeline, addr, params)

  protected def newDispatcher(transport: Transport[String, String]): Service[String, String] =
    new SerialClientDispatcher(transport)
}

Armed with this new client, we can connect to a destination Name, representing multiple hosts:

val dest = Resolver.eval(
  "localhost:8080,localhost:8081,localhost:8082")

client.newClient(dest): ServiceFactory[String, String]

Requests sent to this client are load balanced across these hosts and each host maintains a connection pool, thus allowing concurrent dispatches.