CloudKarafka
Last updated April 08, 2021
Table of Contents
CloudKarafka is an add-on that provides Apache Kafka as a service. Apache Kafka is a message bus optimized for high-ingress data streams and replay written in Scala and Java.
Installing the add-on
CloudKarafka can be installed to a Heroku application via the CLI:
A list of all plans available can be found here.
$ heroku addons:create cloudkarafka
-----> Adding cloudkarafka to sharp-mountain-4005... done, v18 (free)
Example for the “Developer Duck” plan:
$ heroku addons:create cloudkarafka:ducky
After you provision CloudKarafka, the CLOUDKARAFKA_BROKERS
config var is available in your app’s configuration. It contains the canonical URL used to access the Apache Kafka cluster. You can confirm this with the heroku config
command:
$ heroku config | grep CLOUDKARAFKA_BROKERS
CLOUDKAFKA_BROKERS => "dogsled-01.srvs.cloudkarafka.com:9094,dogsled-02.srvs.cloudkarafka.com:9094,dogsled-03.srvs.cloudkarafka.com:9094"
Also available in the Heroku config are the following variables:
Variable name | Description |
---|---|
CLOUDKARAFKA_USERNAME |
Your username for authenticate with SASL to the broker |
CLOUDKARAFKA_PASSWORD |
Your password for authenticate with SASL to the broker |
CLOUDKARAFKA_TOPIC_PREFIX |
A unique prefix that distinguishes topics on shared instances. |
After you provision CloudKarafka, your application should be configured to fully integrate with the add-on.
About CloudKarafka
CloudKarafka provides managed Apache Kafka servers in the cloud. Kafka is a distributed publish-subscribe messaging system that is designed to be fast, scalable, and durable. It’s an open-source message broker written in Scala that can support a large number of consumers and retain large amounts of data with very little overhead.
CloudKarafka automates every part of setup, running, and scaling of Apache Kafka. We have support staff available 24/7 to help in any event or with any questions. CloudKarafka lets developers focus on the core part of their applications, instead of managing and maintaining servers.
We offer five different plans for different needs. You can try CloudKarafka for free with the Developer Duck plan.
All plans are billed by the second, meaning you can try out even the largest instance types without spending much. As soon as you delete your instance, you won’t be charged for it anymore. Billing occurs at the end of each month, and you are only charged for the time an instance was active.
Instance details and console
Your Kafka instance is immediately created after provisioning the add-on. You can view your instance’s details from the Heroku CLI like so:
$ heroku addons:open cloudkarafka
The dashboard contains useful information about your instance, along with configuration options for Kafka topics, and more. The following sections describe features of the CloudKarafka dashboard.
Add and delete topics
When you create your instance, a default Kafka topic is created for you. You can add up to 5 topics to your free instance. Add new topics to your instance from the Topics tab of your CloudKarafka dashboard.
Log
The Apache Kafka log stream is available only for dedicated instances.
This dashboard tab shows the live log from Kafka.
Metrics
Server metrics are available only for dedicated instances.
The Metrics tab helps you to measure performance metrics from your server. CloudKarafka shows monitoring for CPU Usage, Memory Usage, and Disk Usage.
CloudKarafka management
CloudKarafka has a tool for management of Apache Kafka. It is enabled for new instances by default.
Ruby
For ruby the recommended library is rdkafka. Itis a wrapper around the excellent librdkafka and supports all the features in the latest Kafka release. The main feature that we need from a library is support for authentication with SASL/SCRAM since this is what CloudKarafka uses.
Add rdkafka
as a dependency to your Gemfile
and execute bundle install
.
First thing that you have to do is connect to the Kafka server. You can get all the connection variables you need from Heroku to create the CloudKarafka instance.
The following code snippet demonstrates how to publish and subscribe:
#Producer.rb
require 'rdkafka'
config = {
:"bootstrap.servers" => ENV['CLOUDKARAFKA_BROKERS'],
:"group.id" => "cloudkarafka-example",
:"sasl.username" => ENV['CLOUDKARAFKA_USERNAME'],
:"sasl.password" => ENV['CLOUDKARAFKA_PASSWORD'],
:"security.protocol" => "SASL_SSL",
:"sasl.mechanisms" => "SCRAM-SHA-256"
}
topic = "#{ENV['CLOUDKARAFKA_TOPIC_PREFIX']}test"
rdkafka = Rdkafka::Config.new(config)
producer = rdkafka.producer
100.times do |i|
puts "Producing message #{i}"
producer.produce(
topic: topic,
payload: "Payload #{i}",
key: "Key #{i}"
).wait
end
#Consumer.rb
require 'rdkafka'
config = {
:"bootstrap.servers" => ENV['CLOUDKARAFKA_BROKERS'],
:"group.id" => "cloudkarafka-example",
:"sasl.username" => ENV['CLOUDKARAFKA_USERNAME'],
:"sasl.password" => ENV['CLOUDKARAFKA_PASSWORD'],
:"security.protocol" => "SASL_SSL",
:"sasl.mechanisms" => "SCRAM-SHA-256"
}
topic = "#{ENV['CLOUDKARAFKA_TOPIC_PREFIX']}test"
rdkafka = Rdkafka::Config.new(config)
consumer = rdkafka.consumer
consumer.subscribe(topic)
begin
consumer.each do |message|
puts "Message received: #{message}"
end
rescue Rdkafka::RdkafkaError => e
retry if e.is_partition_eof?
raise
end
See the full example here: https://github.com/CloudKarafka/ruby-kafka-example
Python
The Confluent Python client can be installed as follows:
$ pip install confluent-kafka
First, connect to the Kafka server. You can get all the connection variables you need from Heroku when you create your CloudKarafka instance. The following code snippet demonstrates how to publish and consume messages:
import sys
import os
from confluent_kafka import Producer
if __name__ == '__main__':
topic = "%s.test" % os.environ['CLOUDKARAFKA_TOPIC_PREFIX']
conf = {
'bootstrap.servers': os.environ['CLOUDKARAFKA_BROKERS'],
'session.timeout.ms': 6000,
'default.topic.config': {'auto.offset.reset': 'smallest'},
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'SCRAM-SHA-256',
'sasl.username': os.environ['CLOUDKARAFKA_USERNAME'],
'sasl.password': os.environ['CLOUDKARAFKA_PASSWORD']
}
p = Producer(**conf)
for line in sys.stdin:
try:
p.produce(topic, line.rstrip())
except BufferError as e:
sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
len(p))
p.poll(0)
p.flush()
See the full example here: https://github.com/CloudKarafka/python-kafka-example
Go
There are two very good libraries for Go, one is Sarama which is a pure Go library. Then we have confluent-kafka-go which is a wrapper around the excellent librdkafka.
For this example we are going to use confluent_kafka_go since it support authentication with SASL SCRAM which is what we use at CloudKarafka.
package main
import (
"fmt"
"os"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
config := &kafka.ConfigMap{
"metadata.broker.list": os.Getenv("CLOUDKARAFKA_BROKERS"),
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "SCRAM-SHA-256",
"sasl.username": os.Getenv("CLOUDKARAFKA_USERNAME"),
"sasl.password": os.Getenv("CLOUDKARAFKA_PASSWORD"),
"group.id": os.Getenv("CLOUDKARAFKA_GROUPID"),
"default.topic.config": kafka.ConfigMap{"auto.offset.reset": "earliest"},
}
topic := os.Getenv("CLOUDKARAFKA_TOPIC_PREFIX") + ".test"
p, err := kafka.NewProducer(config)
if err != nil {
fmt.Printf("Failed to create producer: %s\n", err)
os.Exit(1)
}
fmt.Printf("Created Producer %v\n", p)
deliveryChan := make(chan kafka.Event)
for i := 0; i < 10; i++ {
value := fmt.Sprintf("[%d] Hello Go!", i+1)
err = p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(value)}, deliveryChan)
e := <-deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
}
close(deliveryChan)
}
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
config := &kafka.ConfigMap{
"metadata.broker.list": os.Getenv("CLOUDKARAFKA_BROKERS"),
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "SCRAM-SHA-256",
"sasl.username": os.Getenv("CLOUDKARAFKA_USERNAME"),
"sasl.password": os.Getenv("CLOUDKARAFKA_PASSWORD"),
"group.id": "cloudkarafka-example",
"go.events.channel.enable": true,
"go.application.rebalance.enable": true,
"default.topic.config": kafka.ConfigMap{"auto.offset.reset": "earliest"},
}
topic := os.Getenv("CLOUDKARAFKA_TOPIC_PREFIX") + "test"
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
c, err := kafka.NewConsumer(config)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
os.Exit(1)
}
fmt.Printf("Created Consumer %v\n", c)
err = c.Subscribe(topic, nil)
run := true
counter := 0
commitAfter := 1000
for run == true {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
case ev := <-c.Events():
switch e := ev.(type) {
case kafka.AssignedPartitions:
c.Assign(e.Partitions)
case kafka.RevokedPartitions:
c.Unassign()
case *kafka.Message:
fmt.Printf("%% Message on %s: %s\n", e.TopicPartition, string(e.Value))
counter++
if counter > commitAfter {
c.Commit()
counter = 0
}
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
run = false
}
}
}
fmt.Printf("Closing consumer\n")
c.Close()
}
See the full example here: https://github.com/CloudKarafka/go-kafka-example
Java
To get started with Apache Kafka, add the kafka-client dependency to your project’s pom.xml
file:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.Date;
import java.util.Properties;
public class KafkaExample {
private final String topic;
private final Properties props;
public KafkaExample(String brokers, String topic, String username, String password) {
this.topic = topic;
String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
String jaasCfg = String.format(jaasTemplate, username, password);
String serializer = StringSerializer.class.getName();
String deserializer = StringDeserializer.class.getName();
props = new Properties();
props.put("bootstrap.servers", brokers);
props.put("group.id", "newer");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", deserializer);
props.put("value.deserializer", deserializer);
props.put("key.serializer", serializer);
props.put("value.serializer", serializer);
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("sasl.jaas.config", jaasCfg);
}
public void consume() {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
}
public void produce() {
Thread one = new Thread() {
public void run() {
try {
Producer<String, String> producer = new KafkaProducer<>(props);
int i = 0;
while(true) {
Date d = new Date();
producer.send(new ProducerRecord<>(topic, Integer.toString(i), d.toString()));
Thread.sleep(1000);
i++;
}
//producer.close();
} catch (InterruptedException v) {
System.out.println(v);
}
}
};
one.start();
}
public static void main(String[] args) {
String topic = System.getenv("CLOUDKARAFKA_TOPIC_PREFIX") + ".test";
KafkaExample c = new KafkaExample(
System.getenv("CLOUDKARAFKA_BROKERS"), topic,
System.getenv("CLOUDKARAFKA_USERNAME"), System.getenv("CLOUDKARAFKA_PASSWORD"));
c.produce();
c.consume();
}
}
See the full example here: https://github.com/CloudKarafka/java-kafka-example
Node.js
The best Kafka library for node.js right now is Blizzard’s node-rdkafka.
You can install the node-rdkafka module by using npm: npm install node-rdkafka
Consumer
var Kafka = require("node-rdkafka");
var kafkaConf = {
"group.id": "cloudkarafka-example",
"metadata.broker.list": process.env.CLOUDKARAFKA_BROKERS.split(","),
"socket.keepalive.enable": true,
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "SCRAM-SHA-256",
"sasl.username": process.env.CLOUDKARAFKA_USERNAME,
"sasl.password": process.env.CLOUDKARAFKA_PASSWORD
};
const prefix = process.env.CLOUDKARAFKA_USERNAME;
const topics = [`${prefix}.test`];
const consumer = new Kafka.KafkaConsumer(kafkaConf, {
"auto.offset.reset": "beginning"
});
const numMessages = 5;
let counter = 0;
consumer.on("error", function(err) {
console.error(err);
});
consumer.on("ready", function(arg) {
console.log(`Consumer ${arg.name} ready`);
consumer.subscribe(topics);
consumer.consume();
});
consumer.on("data", function(m) {
counter++;
if (counter % numMessages === 0) {
console.log("calling commit");
consumer.commit(m);
}
console.log(m.value.toString());
});
consumer.on("disconnected", function(arg) {
process.exit();
});
consumer.connect();
setTimeout(function() {
consumer.disconnect();
}, 300000);
Producer
const Kafka = require("node-rdkafka");
const kafkaConf = {
"group.id": "cloudkarafka-example",
"metadata.broker.list": process.env.CLOUDKARAFKA_BROKERS.split(","),
"socket.keepalive.enable": true,
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "SCRAM-SHA-256",
"sasl.username": process.env.CLOUDKARAFKA_USERNAME,
"sasl.password": process.env.CLOUDKARAFKA_PASSWORD
};
const prefix = process.env.CLOUDKARAFKA_TOPIC_PREFIX;
const topic = `${prefix}.test`;
const producer = new Kafka.Producer(kafkaConf);
const maxMessages = 20;
const genMessage = i => new Buffer(`Kafka example, message number ${i}`);
producer.on("ready", function(arg) {
console.log(`producer ${arg.name} ready.`);
for (var i = 0; i < maxMessages; i++) {
producer.produce(topic, -1, genMessage(i), i);
}
setTimeout(() => producer.disconnect(), 0);
});
producer.on("disconnected", function(arg) {
process.exit();
});
producer.connect();
See the full example here: https://github.com/CloudKarafka/nodejs-kafka-example
Alternative client libraries include franz-kafka.
Removing the add-on
You can remove the add-on directly from the Heroku CLI:
$ heroku addon:destroy cloudkarafka --confirm <name of app>
Private Spaces
It is possible to install CloudKarafka in a Heroku Private Space, but VPC-peering is currently not available for non-Heroku add-ons.
GDPR
Information about GDPR can be found here: https://www.cloudkarafka.com/gdpr.html.
For all our customers who collect personal data from individuals in the EU, we offer a DPA. Our DPA offers terms that meet GDPR requirements and that reflects our data privacy and security commitments to our customers and their data.
You can view the DPA here: https://www.cloudkarafka.com/terms_of_service.html#dpa, and you need to send us an email if you need a signed version.
Support
All CloudKarafka support and runtime issues should be logged with Heroku Support at support.heroku.com. Any non-support related issues or product feedback is welcome at support@cloudkarafka.com. For our plans Bouncing Bat and larger, we provide 24/7 critical support with a 30-minutes maximum initial response time. For the plans Fast Fox and larger we also provide direct phone numbers.