Kafka Streams on Heroku
Last updated November 28, 2022
Table of Contents
Kafka Streams is a Java client library that uses underlying components of Apache Kafka to process streaming data. You can use Kafka Streams to easily develop lightweight, scalable, and fault-tolerant stream processing apps.
Kafka Streams is supported on Heroku with both dedicated and basic Kafka plans (with some additional setup required for basic plans).
Applications built using Kafka Streams produce and consume data from Streams, which are unbounded, replayable, ordered, and fault-tolerant sequences of events. A Stream is represented either as a Kafka topic (KStream
) or materialized as compacted topics (KTable
). By default, the library ensures that your application handles Stream events one at a time, while also providing the ability to handle late-arriving or out-of-order events.
Basic example
You can use Kafka Streams APIs to develop applications with just a few lines of code. The following sample illustrates the traditional use case of maintaining a word count:
words
.groupBy((key, word) -> word)
.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10)))
.count(Materialized.as("windowed-counts"))
.toStream()
.process(PostgresSink::new);
This code:
- Takes in an input stream of words
- Groups the input by word
- Counts each word’s frequency within a tumbling window of 10 seconds
- Saves intermittent results in a local store
- Outputs the resulting word counts on each window boundary.
The example illustrates the bulk of the logic you create for a typical Kafka Streams application. The rest of the application consists primarily of configuration. Kafka Streams simplifies development by decoupling your application’s logic from the underlying infrastructure, where the library transparently distributes workload, handles failures, and performs other low-level tasks.
Organizing your application
Kafka Stream applications are normal Java services that you can run on Heroku with a variety of Java implementations. Heroku’s buildpacks for Maven and Gradle are both supported.
Using a multi-project setup with Gradle, you can create multiple Gradle sub-projects that each represent a different Kafka Streams service. These services can operate independently or be interconnected.
Each sub-project produces its own executable via Gradle plugins when the ./gradlew stage
task is executed on it. These executables are created in your application’s build/libs/
directory, with naming specified as sub-project-name-all.jar
. You can then run these executables on the Heroku Runtime by declaring worker process types in your Procfile
:
aggregator_worker: java -jar build/libs/streams-aggregator-all.jar
More information on setting up multiple Kafka Streams services within a single application can be found in the kafka-streams-on-heroku repo.
Connecting your application
Connecting to Kafka brokers on Heroku requires SSL. This involves the following steps:
- Parse the URI stored in your app’s
KAFKA_URL
config var. - Use env-keystore to read in the Kafka
TRUSTED_CERT
,CLIENT_CERT_KEY
, andCLIENT_CERT
config vars and create both a truststore and a keystore. - Add related SSL configs for truststore and keystore.
private Properties buildHerokuKafkaConfigVars() throws URISyntaxException, CertificateException,
NoSuchAlgorithmException, KeyStoreException, IOException {
Properties properties = new Properties();
List<String> bootstrapServerList = Lists.newArrayList();
Iterable<String> kafkaUrl = Splitter.on(",")
.split(Preconditions.checkNotNull(System.getenv(HEROKU_KAFKA_URL)));
for (String url : kafkaUrl) {
URI uri = new URI(url);
bootstrapServerList.add(String.format("%s:%d", uri.getHost(), uri.getPort()));
switch (uri.getScheme()) {
case "kafka":
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
break;
case "kafka+ssl":
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
EnvKeyStore envTrustStore = EnvKeyStore.createWithRandomPassword(
HEROKU_KAFKA_TRUSTED_CERT);
EnvKeyStore envKeyStore = EnvKeyStore.createWithRandomPassword(
HEROKU_KAFKA_CLIENT_CERT_KEY, HEROKU_KAFKA_CLIENT_CERT);
File trustStoreFile = envTrustStore.storeTemp();
File keyStoreFile = envKeyStore.storeTemp();
properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, envTrustStore.type());
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
trustStoreFile.getAbsolutePath());
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, envTrustStore.password());
properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, envKeyStore.type());
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreFile.getAbsolutePath());
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, envKeyStore.password());
break;
default:
throw new URISyntaxException(uri.getScheme(), "Unknown URI scheme");
}
}
bootstrapServers = Joiner.on(",").join(bootstrapServerList);
return properties;
}
Managing internal topics and consumer groups
Kafka Streams uses internal topics for fault tolerance and repartitioning. These topics are required for Kafka Streams applications to work properly.
Creation of Kafka Streams internal topics are unrelated to Kafka’s auto.create.topics.enable
config. Rather, Kafka Streams communicates with clusters directly through an admin client.
Dedicated Kafka plans
Dedicated Kafka plans are isolated among users. Because of this, internal Kafka Streams topics on dedicated plans require no additional configuration.
More information on dedicated plans can be found on the dedicated plans and configurations page.
Basic Kafka plans
Basic Kafka plans co-host multiple Heroku users on the same set of underlying resources. User data and access privileges are isolated by Kafka Access Control Lists (ACLs). Additionally, topic and consumer group names are namespaced with an auto-generated prefix to prevent naming collisions.
Running Kafka Streams applications on basic plans requires two preliminary steps: properly setting up the application.id
and pre-creating internal topics and consumer groups.
Setting up your application.id
Each Kafka Streams application has an important unique identifier called the application.id
that identifies it and its associated topology. If you have a Kafka Basic plan, you must ensure that each application.id
begins with your assigned prefix:
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, String.format("%saggregator-app", HEROKU_KAFKA_PREFIX));
Pre-creating internal topics and consumer groups
Because Kafka Basic plans on Heroku use ACLs, Kafka Streams applications cannot interact with topics and consumer groups without the proper ACLs. This is problematic because Kafka Streams uses an internal admin client to transparently create internal topics and consumer groups at runtime. This primarily affects processors in Kafka Streams.
Processors are classes that implement a process
method. They receive input events from a stream, process those events, and optionally produce output events to downstream processors. Stateful processors are processors that use state produced by previous events when processing subsequent ones. Kafka Streams provides built-in functionality for storage of this state.
For each stateful processor in your application, create two internal topics: one for the changelog
and one for repartition
.
For example, the basic example shown earlier includes a single stateful processor that counts words from a stream:
words
.groupBy((key, word) -> word)
.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10)))
.count(Materialized.as("windowed-counts"))
.toStream()
.process(PostgresSink::new);
This application requires two internal topics for the count
operator:
$ heroku kafka:topics:create aggregator-app-windowed-counts-changelog —app sushi
$ heroku kafka:topics:create aggregator-app-windowed-counts-repartition —app sushi
Additionally, you must create a single consumer group for your application that matches the application.id
:
$ heroku kafka:consumer-groups:create mobile-1234.aggregator-app —app sushi
More information on basic plans can be found on the basic plans and configurations page.
Scaling your application
Parallelism model
Partitions are a Kafka topic’s fundamental unit of parallelism. In Kafka Streams applications, there are many application instances. Because Kafka Streams applications are normal Java applications, they run in dynos on the Heroku Runtime.
Each instance of a Kafka Streams application contains a number of Stream Threads. These threads are responsible for running one or more Stream Tasks. In Kafka Streams, Stream Tasks are the fundamental unit of processing parallelism. Kafka Streams transparently ensures that input partitions are spread evenly across Stream Tasks so that all events can be consumed and processed.
Vertical scaling
By default, Kafka Streams creates one Stream Thread per application instance. Each Stream Thread runs one or more Stream Tasks. You can scale an application instance by scaling its number of Stream Threads. To do so, modify the num.stream.threads
config value in your application. The application will transparently rebalance workload across threads within each application instance.
Horizontal scaling
Kafka Streams rebalances workload and local state across instances as the number of application instances changes. This works transparently by distributing workload and local state across instances with the same application.id
. You can scale Kafka Streams applications horizontally by scaling the number of dynos:
$ heroku ps:scale aggregator_worker=2 —app sushi
The number of input partitions is effectively the upper bound for parallelism. It’s important to remember that the number of Stream Tasks doesn’t exceed the number of input partitions. Otherwise, this over-provisioning results in idle application instances.
Caveats
RocksDB persistence
Because dynos are backed by an ephemeral filesystem, it isn’t practical to rely on the underlying disk for durable storage. This presents a challenge for using RocksDB with Kafka Streams on Heroku. However, RocksDB isn’t a hard requirement. Kafka Streams treats RocksDB as a write-through cache, where the source of truth is actually the underlying changelog internal topic. If there’s no underlying RocksDB store, then state is replayed directly from changelog topics on startup.
By default, replaying state directly from changelog topics incurs additional latency when rebalancing your application instances or when dynos are restarted. To minimize latency, you can configure Kafka Streams to fail over Stream Tasks to their associated Standby Tasks.
Standby Tasks are replicas of Stream Tasks that maintain fully replicated copies of state. Dynos make use of Standby Tasks to resume work immediately instead of having to wait for state to be rebuilt from changelog topics.
You can modify the num.standby.replicas
config in your application to change the number of Standby Tasks.