Examples¶
The integration tests serve as a good collection of example Finatra Kafka Streams servers.
Word Count Server¶
We can build a lightweight server which counts the unique words from an input topic, storing the results in RocksDB.
class WordCountRocksDbServer extends KafkaStreamsTwitterServer {
override val name = "wordcount"
private val countStoreName = "CountsStore"
override protected def configureKafkaStreams(builder: StreamsBuilder): Unit = {
builder.asScala
.stream[Bytes, String]("TextLinesTopic")(Consumed.`with`(Serdes.Bytes, Serdes.String))
.flatMapValues(_.split(' '))
.groupBy((_, word) => word)(Serialized.`with`(Serdes.String, Serdes.String))
.count()(Materialized.as(countStoreName))
.toStream
.to("WordsWithCountsTopic")(Produced.`with`(Serdes.String, ScalaSerdes.Long))
}
}
Queryable State¶
We can then expose a Thrift endpoint enabling clients to directly query the state via interactive queries.
class WordCountRocksDbServer extends KafkaStreamsTwitterServer with QueryableState {
...
final override def configureThrift(router: ThriftRouter): Unit = {
router
.add(
new WordCountQueryService(
queryableFinatraKeyValueStore[String, Long](
storeName = countStoreName,
primaryKeySerde = Serdes.String
)
)
)
}
}
In this example, WordCountQueryService
is an underlying Thrift service.