The integration tests serve as a good collection of example Finatra Kafka Streams servers.

Word Count Server

We can build a lightweight server which counts the unique words from an input topic, storing the results in RocksDB.

class WordCountRocksDbServer extends KafkaStreamsTwitterServer {

  override val name = "wordcount"
  private val countStoreName = "CountsStore"

  override protected def configureKafkaStreams(builder: StreamsBuilder): Unit = {
      .stream[Bytes, String]("TextLinesTopic")(Consumed.`with`(Serdes.Bytes, Serdes.String))
      .flatMapValues(_.split(' '))
      .groupBy((_, word) => word)(Serialized.`with`(Serdes.String, Serdes.String))
      .to("WordsWithCountsTopic")(Produced.`with`(Serdes.String, ScalaSerdes.Long))

Queryable State

We can then expose a Thrift endpoint enabling clients to directly query the state via interactive queries.

class WordCountRocksDbServer extends KafkaStreamsTwitterServer with QueryableState {


  final override def configureThrift(router: ThriftRouter): Unit = {
        new WordCountQueryService(
          queryableFinatraKeyValueStore[String, Long](
            storeName = countStoreName,
            primaryKeySerde = Serdes.String

In this example, WordCountQueryService is an underlying Thrift service.