Heron processing topologies can be written using an API called the Heron Functional API. The Heron Functional API is currently available for the following languages:

Although the original topology API can still be used with Heron (which means that all of your older topologies will still run) we strongly recommend creating all new topologies using the Heron Functional API, for reasons outlined in the section below.

The Heron Functional API vs. the topology API

When Heron was first released, all Java topologies needed to be written using an API based on the Storm topology API. Although this API is quite powerful (and can still be used), the Heron Functional API enables you to create topologies without needing to implement spouts and bolts or connect spouts and bolts together.

Here are some crucial differences between the two APIs:

Domain Original topology API Heron Functional API
Programming style Procedural, processing component based Functional
Abstraction level Low level. Developers must think in terms of "physical" spout and bolt implementation logic. High level. Developers can write processing logic in an idiomatic fashion in the language of their choice, without needing to write and connect spouts and bolts.
Processing model Spout and bolt logic must be created explicitly, and connecting spouts and bolts is the responsibility of the developer Spouts and bolts are created for you automatically on the basis of the processing graph that you build

The two APIs also have a few things in common:

  • Topologies' logical and physical plans are automatically created by Heron
  • Topologies are managed in the same way using the heron CLI tool

Getting started

In order to use the Heron Functional API for Java, you'll need to install the heron-api library.

Maven setup

In order to use the heron-api library, add this to the dependencies block of your pom.xml configuration file:

<dependency>
    <groupId>com.twitter.heron</groupId>
    <artifactId>heron-api</artifactId>
    <version>0.17.0</version>
</dependency>

Compiling a JAR with dependencies

In order to run a Java Functional API topology in a Heron cluster, you'll need to package your topology as a "fat" JAR with dependencies included. You can use the Maven Assembly Plugin to generate JARs with dependencies. To install the plugin and add a Maven goal for a single JAR, add this to the plugins block in your pom.xml:

<plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
        <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
        <archive>
            <manifest>
                <mainClass></mainClass>
            </manifest>
        </archive>
    </configuration>
    <executions>
        <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
                <goal>single</goal>
            </goals>
        </execution>
    </executions>
</plugin>

Once your pom.xml is properly set up, you can compile the JAR with dependencies using this command:

$ mvn assembly:assembly

By default, this will add a JAR in your project's target folder with the name PROJECT-NAME-VERSION-jar-with-dependencies.jar. Here's an example topology submission command using a compiled JAR:

$ mvn assembly:assembly
$ heron submit local \
  target/my-project-1.2.3-jar-with-dependencies.jar \
  com.example.Main \
  MyTopology arg1 arg2

Java Streamlet API starter project

If you'd like to up and running quickly with the Heron Streamlet API for Java, you can clone this repository, which includes an example topology built using the Functional API as well as the necessary Maven configuration. To build a JAR with dependencies of this example topology:

$ git clone https://github.com/streamlio/heron-java-streamlet-api-example
$ cd heron-java-streamlet-api-example
$ mvn assembly:assembly
$ ls target/*.jar
target/heron-java-streamlet-api-example-latest-jar-with-dependencies.jar
target/heron-java-streamlet-api-example-latest.jar

If you're running a local Heron cluster, you can submit the built example topology like this:

$ heron submit local target/heron-java-streamlet-api-example-latest-jar-with-dependencies.jar \
  io.streaml.heron.streamlet.IntegerProcessingTopology \
  IntegerProcessingTopology

Selecting delivery semantics

Heron enables you to apply one of three delivery semantics to any Heron topology. For the example topology above, you can select the delivery semantics when you submit the topology with the topology's second argument. This command, for example, would apply effectively-once to the example topology:

$ heron submit local target/heron-java-streamlet-api-example-latest-jar-with-dependencies.jar \
  io.streaml.heron.streamlet.IntegerProcessingTopology \
  IntegerProcessingTopology \
  effectively-once

The other options are at-most-once and at-least-once. If you don't explicitly select the delivery semantics, at-least-once semantics will be applied.

Streamlet API topology configuration

Every Streamlet API topology needs to be configured using a Config object. Here's an example baseline configuration:

import com.twitter.heron.streamlet.Config;
import com.twitter.heron.streamlet.Runner;

Config topologyConfig = new Config();

// Apply topology configuration using the topologyConfig object

Runner topologyRunner = new Runner();
topologyRunner.run("name-for-topology", conf, topologyBuilder);

You can set the following in the configuration: the topology's delivery semantics, the number of containers to be used by the topology, and the total CPU and RAM resources for the topology.

Delivery semantics

You can apply delivery semantics to a Functional API topology like this:

topologyConfig
        .setDeliverySemantics(Config.DeliverySemantics.EFFECTIVELY_ONCE);

The other available options are ATMOST_ONCE and ATLEAST_ONCE.

Containers and resources

You can set the number of containers to be used by a Functional API topology (as an integer) like so:

topologyConfig.setNumContainers(2);

You can set the CPU per container (as a float) and RAM per container (as a long expressing the number of bytes) like this:

import com.twitter.heron.streamlet.Resources;

Resources resources = new Resources();
resources.withCpu(2.0f);
resources.withRam(104857600);

topologyConfig.setContainerResources(resources);

The default CPU is 1.0f while the default RAM is 104857600 (100 megabytes).

There's also a ByteAmount helper class that you can use to convert byte formats:

import com.twitter.heron.common.basics.ByteAmount;

resources.withRam(ByteAmount.fromMegabytes(192).asMegabytes());

Streamlets

In the Heron Functional API for Java, processing graphs consist of streamlets. One or more supplier streamlets inject data into your graph to be processed by downstream operators.

Operations

Operation Description Example
map Create a new streamlet by applying the supplied mapping function to each element in the original streamlet Add 1 to each element in a streamlet of integers
flatMap Like a map operation but with the important difference that each element of the streamlet is flattened Flatten a sentence into individual words
join Create a new streamlet by combining two separate key-value streamlets into one on the basis of each element's key Combine key-value pairs listing current scores (e.g. ("h4x0r", 127)) for each user into a single per-user stream
filter Create a new streamlet containing only the elements that satisfy the supplied filtering function Remove all inappropriate words from a streamlet of strings
repartition Create a new streamlet by applying a new parallelism level to the original streamlet Increase the parallelism of a streamlet from 5 to 10
mapToKV Create a new key-value streamlet by converting each element in the original streamlet into a KeyValue object Convert each element of a string streamlet into a KeyValue object where the current time is the key and the original string is the value
log Logs the final results of a processing graph to stdout. This must be the last step in the graph.

Map operations

Map operations create a new streamlet by applying the supplied mapping function to each element in the original streamlet. Here's an example:

builder.newSource(() -> 1)
    .map(i -> i + 12);

In this example, a supplier streamlet emits an indefinite series of 1s. The map operation then adds 12 to each incoming element, producing a streamlet of 13s.

FlatMap operations

FlatMap operations are like map operations but with the important difference that each element of the streamlet is "flattened" into another type. In this example, a supplier streamlet emits the same sentence over and over again; the flatMap operation transforms each sentence into a Java List of individual words:

builder.newSource(() -> "I have nothing to declare but my genius")
    .flatMap((sentence) -> Arrays.asList(sentence.split("\\s+")));

The effect of this operation is to transform the Streamlet<String> into a Streamlet<List<String>>.

One of the core differences between map and flatMap operations is that flatMap operations typically transform non-collection types into collection types.

Join operations

Join operations unify two streamlets on a key (join operations thus require KV streamlets). Each KeyValue object in a streamlet has, by definition, a key. When a join operation is added to a processing graph,

import com.twitter.heron.streamlet.WindowConfig;

Builder builder = Builder.CreateBuilder();

KVStreamlet<String, String> streamlet1 =
        builder.newKVSource(() -> new KeyValue<>("heron-api", "topology-api"));

builder.newSource(() -> new KeyValue<>("heron-api", "functional-api"))
    .join(streamlet1, WindowConfig.TumblingCountWindow(10), KeyValue::create);

In this case, the resulting streamlet would consist of an indefinite stream with two KeyValue objects with the key heron-api but different values (topology-api and functional-api).

The effect of a join operation is to create a new streamlet for each key.

Filter operations

Filter operations retain elements in a streamlet, while potentially excluding some or all elements, on the basis of a provided filtering function. Here's an example:

Builder builder = Builder.CreateBuilder();

builder.newSource(() -> ThreadLocalRandom.current().nextInt(1, 11))

Repartition operations

When you assign a number of partitions to a processing step, each step that comes after it inherits that number of partitions. Thus, if you assign 5 partitions to a map operation, then any mapToKV, flatMap, filter, etc. operations that come after it will also be assigned 5 partitions. But you can also change the number of partitions for a processing step (as well as the number of partitions for downstream operations) using repartition. Here's an example:

import java.util.concurrent.ThreadLocalRandom;

Builder builder = Builder.CreateBuilder();

builder.newSource(() -> ThreadLocalRandom.current().nextInt(1, 11))
        .setNumPartitions(5)
        .map(i -> i + 1)
        .repartition(2)
        .filter(i -> i > 7)
        .filter(i -> i < 2)
        .log();

In this example, the supplier streamlet emits random integers between one and ten. That operation is assigned 5 partitions. After the map operation, the repartition function is used to assign 2 partitions to all downstream operations.

Map-to-KV operations

Map-to-KV operations enable you to transform a streamlet into a KV streamlet, for example a Streamlet<String> into a Streamlet<KeyValue<String, Boolean>>, using a mapping function. Here's an example:

Builder builder = Builder.CreateBuilder();

builder.newSource(() -> randomIntGenerator(1, 100))
        .mapToKV((num) -> new KeyValue<>(num, num > 50));

In this example processing graph, the supplier streamlet is a function that continuously emits random integers between 1 and 100. The mapToKV operation then returns a KeyValue<Integer, Boolean> object, for each integer in the streamlet, in which the key is the number and the value is whether the integer is even.