The current version of py_heron is 0.0.1.

Support for developing Heron topologies in Python is provided by a Python library called heronpy.

Python API docs

You can find API docs for the heronpy library here.

Setup

First, you need to install the heronpy library using pip, EasyInstall, or an analogous tool:

$ pip install heronpy
$ easy_install heronpy

Then you can include heronpy in your project files. Here’s an example:

from heronpy import Bolt, Spout, Topology

Writing topologies in Python

Heron topologies are networks of spouts that pull data into a topology and bolts that process that ingested data.

You can see how to create Python spouts in the Implementing Python Spouts guide and how to create Python bolts in the Implementing Python Bolts guide.

Once you’ve defined spouts and bolts for a topology, you can then compose the topology in one of two ways:

  • You can use the TopologyBuilder class inside of a main function.

    Here’s an example:

    from heronpy import TopologyBuilder
    
    if __name__ == '__main__':
        builder = TopologyBuilder("MyTopology")
        # Add spouts and bolts
        builder.build_and_submit()
    
  • You can subclass the Topology class.

    Here’s an example:

    class MyTopology(Topology):
        my_spout = MySpout.spec(par=2)
        my_bolt = MyBolt.spec(par=3,
                              inputs={
                                spout: Grouping.fields('some-input-field')
                              })
    

Defining topologies using the TopologyBuilder class

If you create a Python topology using a TopologyBuilder, you need to instantiate a TopologyBuilder inside of a standard Python main function, like this:

if __name__ == '__main__':
    builder = TopologyBuilder("MyTopology")

Once you’ve created a TopologyBuilder object, you can add bolts using the add_bolt method and spouts using the add_spout method. Here’s an example:

builder = TopologyBuilder("MyTopology")
builder.add_bolt("my_bolt", MyBolt, par=3)
builder.add_spout("my_spout", MySpout, par=2)

Both the add_bolt and add_spout methods return the corresponding HeronComponentSpec object.

The add_bolt method takes four arguments and an optional config parameter:

Argument Data type Description Default
name str The unique identifier assigned to this bolt
bolt_cls class The subclass of Bolt that defines this bolt
par int The number of instances of this bolt in the topology
config dict Specifies the configuration for this spout None

The add_spout method takes three arguments and an optional config parameter:

Argument Data type Description Default
name str The unique identifier assigned to this spout
spout_cls class The subclass of Spout that defines this spout
par int The number of instances of this spout in the topology
inputs dict or list Either a dict mapping from HeronComponentSpec to Grouping or a list of HeronComponentSpecs, in which case the shuffle grouping is used
config dict Specifies the configuration for this spout None

Example

The following is an example implementation of a word count topology in Python that subclasses TopologyBuilder.

from heronpy import TopologyBuilder
from your_spout import WordSpout
from your_bolt import CountBolt

if __name__ == "__main__":
    builder = TopologyBuilder("WordCountTopology")
    word_spout = builder.add_spout("word_spout", WordSpout, par=2)

    count_bolt_input =
    count_bolt = builder.add_bolt("count_bolt", CountBolt, par=2,
                                  inputs={word_spout: Grouping.fields('word')})
    builder.build_and_submit()

Note that arguments to the main method can be passed by providing them in the heron submit command.

Topology-wide configuration

If you’re building a Python topology using a TopologyBuilder, you can specify configuration for the topology using the set_config method. A topology’s config is a dict in which the keys are a series constants from the api_constants module and values are configuration values for those parameters.

Here’s an example:

from heronpy import api_constants, TopologyBuilder

if __name__ == '__main__':
    topology_config = {
        api_constants.TOPOLOGY_ENABLE_ACKING: True,
        api_constants.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS: True
    }
    builder = TopologyBuilder("MyTopology")
    builder.set_config(topology_config)
    # Add bolts and spouts, etc.

Launching the topology

If you want to submit Python topologies to a Heron cluster, they need to be packaged as a PEX file. In order to produce PEX files, we recommend using a build tool like Pants or Bazel.

If you defined your topology by subclassing the TopologyBuilder class and built a word_count.pex file for that topology in the ~/topology folder. You can submit the topology to a cluster called local like this:

$ heron submit local \
  ~/topology/word_count.pex \
  - # No class specified

Note the - in this submission command. If you define a topology by subclassing TopologyBuilder you do not need to instruct Heron where your main method is located.

Example topologies buildable as PEXes

  • See this repo for an example of a Heron topology written in Python and deployable as a Pants-packaged PEX.
  • See this repo for an example of a Heron topology written in Python and deployable as a Bazel-packaged PEX.

Defining a topology by subclassing the Topology class

If you create a Python topology by subclassing the Topology class, you need to create a new topology class, like this:

from heronpy import Grouping, Topology
from my_spout import MySpout
from my_bolt import MyBolt

class MyTopology(Topology):
    my_spout = MySpout.spec(par=2)
    my_bolt_inputs = {
        my_spout: Grouping.fields('some-input-field')
    }
    my_bolt = MyBolt.spec(par=3, inputs=my_bolt_inputs)

All you need to do is place HeronComponentSpecs as the class attributes of your topology class, which are returned by the spec() method of your spout or bolt class. You do not need to run a build method or anything like that; the Topology class will automatically detect which spouts and bolts are included in the topology.

If you use this method to define a new Python topology, you do not need to have a main function.

For bolts, the spec method for spouts takes three optional arguments::

Argument Data type Description Default
name str The unique identifier assigned to this bolt or None if you want to use the variable name of the return HeronComponentSpec as the unique identifier for this bolt
par int The number of instances of this bolt in the topology
config dict Specifies the configuration for this bolt None

For spouts, the spec method takes four optional arguments:

Argument Data type Description Default
name str The unique identifier assigned to this spout or None if you want to use the variable name of the return HeronComponentSpec as the unique identifier for this spout None
inputs dict or list Either a dict mapping from HeronComponentSpec to Grouping or a list of HeronComponentSpecs, in which case the shuffle grouping is used
par int The number of instances of this spout in the topology 1
config dict Specifies the configuration for this spout None

Example

Here’s an example topology definition with one spout and one bolt:

from heronpy import Topology
from your_spout import WordSpout
from your_bolt import CountBolt

class WordCount(Topology):
    word_spout = WordSpout.spec(par=2)
    count_bolt = CountBolt.spec(par=2, inputs={word_spout: Grouping.fields('word')})

Launching

If you defined your topology by subclassing the Topology class, your main Python file should not contain a main method. You will, however, need to instruct Heron which class contains your topology definition.

Let’s say that you’ve defined a topology by subclassing Topology and built a PEX stored in ~/topology/dist/word_count.pex. The class containing your topology definition is topology.word_count.WordCount. You can submit the topology to a cluster called local like this:

$ heron submit local \
  ~/topology/dist/word_count.pex \
  topology.word_count.WordCount \ # Specifies the topology class definition
  WordCountTopology

Topology-wide configuration

If you’re building a Python topology by subclassing Topology, you can specify configuration for the topology using the set_config method. A topology’s config is a dict in which the keys are a series constants from the api_constants module and values are configuration values for those parameters.

Here’s an example:

from heronpy import api_constants, Topology

class MyTopology(Topology):
    config = {
        api_constants.TOPOLOGY_ENABLE_ACKING: True,
        api_constants.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS: True
    }
    # Add bolts and spouts, etc.

Multiple streams

To specify that a component has multiple output streams, instead of using a list of strings for outputs, you can specify a list of Stream objects, in the following manner.

class MultiStreamSpout(Spout):
  outputs = [Stream(fields=['normal', 'fields'], name='default'),
             Stream(fields=['error_message'], name='error_stream')]

To select one of these streams as the input for your bolt, you can simply use [] to specify the stream you want. Without any stream specified, the default stream will be used.

class MultiStreamTopology(Topology):
  spout = MultiStreamSpout.spec()
  error_bolt = ErrorBolt.spec(inputs={spout['error_stream']: Grouping.LOWEST})
  consume_bolt = ConsumeBolt.spec(inputs={spout: Grouping.SHUFFLE})

Declaring output fields using the spec() method

In Python topologies, the output fields of your spouts and bolts need to be declared by placing outputs class attributes, as there is no declareOutputFields() method. heronpy enables you to dynamically declare output fields as a list using the optional_outputs argument in the spec() method.

This is useful in a situation like below.

class IdentityBolt(Bolt):
  # Statically declaring output fields is not allowed
  class process(self, tup):
    emit([tup.values])

class DynamicOutputField(Topology):
  spout = WordSpout.spec()
  bolt = IdentityBolt.spec(inputs={spout: Grouping.ALL},
                           optional_outputs=['word'])

You can also declare outputs in the add_spout() and the add_bolt() method for the TopologyBuilder in the same way.

Example topologies

There are a number of example topologies that you can peruse in the heron/examples/src/python directory of the Heron repo:

Topology File Description
Word count word_count_topology.py The WordSpout spout emits random words from a list, while the CountBolt bolt counts the number of words that have been emitted.
Multiple streams multi_stream_topology.py The MultiStreamSpout emits multiple streams to downstream bolts.
Half acking half_acking_topology.py The HalfAckBolt acks only half of all received tuples.
Custom grouping custom_grouping_topology.py The SampleCustomGrouping class provides a custom field grouping.

You can build the respective PEXes for these topologies using the following commands:

$ bazel build heron/examples/src/python:word_count
$ bazel build heron/examples/src/python:multi_stream
$ bazel build heron/examples/src/python:half_acking
$ bazel build heron/examples/src/python:custom_grouping

All built PEXes will be stored in bazel-bin/heron/examples/src/python. You can submit them to Heron like so:

$ heron submit local \
  bazel-bin/heron/examples/src/python/word_count.pex - \
  WordCount
$ heron submit local \
  bazel-bin/heron/examples/src/python/multi_stream.pex \
  heron.examples.src.python.multi_stream_topology.MultiStream
$ heron submit local \
  bazel-bin/heron/examples/src/python/half_acking.pex - \
  HalfAcking
$ heron submit local \
  bazel-bin/heron/examples/src/python/custom_grouping.pex \
  heron.examples.src.python.custom_grouping_topology.CustomGrouping

By default, the submit command also activates topologies. To disable this behavior, set the --deploy-deactivated flag.