Skip Navigation
Show nav
Heroku Dev Center
  • Get Started
  • Documentation
  • Changelog
  • Search
  • Get Started
    • Node.js
    • Ruby on Rails
    • Ruby
    • Python
    • Java
    • PHP
    • Go
    • Scala
    • Clojure
  • Documentation
  • Changelog
  • More
    Additional Resources
    • Home
    • Elements
    • Products
    • Pricing
    • Careers
    • Help
    • Status
    • Events
    • Podcasts
    • Compliance Center
    Heroku Blog

    Heroku Blog

    Find out what's new with Heroku on our blog.

    Visit Blog
  • Log inorSign up
View categories

Categories

  • Heroku Architecture
    • Dynos (app containers)
    • Stacks (operating system images)
    • Networking & DNS
    • Platform Policies
    • Platform Principles
  • Command Line
  • Deployment
    • Deploying with Git
    • Deploying with Docker
    • Deployment Integrations
  • Continuous Delivery
    • Continuous Integration
  • Language Support
    • Node.js
    • Ruby
      • Working with Bundler
      • Rails Support
    • Python
      • Background Jobs in Python
      • Working with Django
    • Java
      • Working with Maven
      • Java Database Operations
      • Working with the Play Framework
      • Working with Spring Boot
      • Java Advanced Topics
    • PHP
    • Go
      • Go Dependency Management
    • Scala
    • Clojure
  • Databases & Data Management
    • Heroku Postgres
      • Postgres Basics
      • Postgres Getting Started
      • Postgres Performance
      • Postgres Data Transfer & Preservation
      • Postgres Availability
      • Postgres Special Topics
    • Heroku Data For Redis
    • Apache Kafka on Heroku
    • Other Data Stores
  • Monitoring & Metrics
    • Logging
  • App Performance
  • Add-ons
    • All Add-ons
  • Collaboration
  • Security
    • App Security
    • Identities & Authentication
    • Compliance
  • Heroku Enterprise
    • Private Spaces
      • Infrastructure Networking
    • Enterprise Accounts
    • Enterprise Teams
    • Heroku Connect (Salesforce sync)
      • Heroku Connect Administration
      • Heroku Connect Reference
      • Heroku Connect Troubleshooting
    • Single Sign-on (SSO)
  • Patterns & Best Practices
  • Extending Heroku
    • Platform API
    • App Webhooks
    • Heroku Labs
    • Building Add-ons
      • Add-on Development Tasks
      • Add-on APIs
      • Add-on Guidelines & Requirements
    • Building CLI Plugins
    • Developing Buildpacks
    • Dev Center
  • Accounts & Billing
  • Troubleshooting & Support
  • Integrating with Salesforce
  • Add-ons
  • All Add-ons
  • CloudKarafka
CloudKarafka

This add-on is operated by 84codes AB

Message streaming as a service powered by Apache Kafka

CloudKarafka

Last updated April 08, 2021

Table of Contents

  • Installing the add-on
  • About CloudKarafka
  • Ruby
  • Python
  • Go
  • Java
  • Node.js
  • Removing the add-on
  • Private Spaces
  • GDPR
  • Support

CloudKarafka is an add-on that provides Apache Kafka as a service. Apache Kafka is a message bus optimized for high-ingress data streams and replay written in Scala and Java.

Installing the add-on

CloudKarafka can be installed to a Heroku application via the CLI:

A list of all plans available can be found here.

$ heroku addons:create cloudkarafka
-----> Adding cloudkarafka to sharp-mountain-4005... done, v18 (free)

Example for the “Developer Duck” plan:

$ heroku addons:create cloudkarafka:ducky

After you provision CloudKarafka, the CLOUDKARAFKA_BROKERS config var is available in your app’s configuration. It contains the canonical URL used to access the Apache Kafka cluster. You can confirm this with the heroku config command:

$ heroku config | grep CLOUDKARAFKA_BROKERS
CLOUDKAFKA_BROKERS    => "dogsled-01.srvs.cloudkarafka.com:9094,dogsled-02.srvs.cloudkarafka.com:9094,dogsled-03.srvs.cloudkarafka.com:9094"

Also available in the Heroku config are the following variables:

Variable name Description
CLOUDKARAFKA_USERNAME Your username for authenticate with SASL to the broker
CLOUDKARAFKA_PASSWORD Your password for authenticate with SASL to the broker
CLOUDKARAFKA_TOPIC_PREFIX A unique prefix that distinguishes topics on shared instances.

After you provision CloudKarafka, your application should be configured to fully integrate with the add-on.

About CloudKarafka

CloudKarafka provides managed Apache Kafka servers in the cloud. Kafka is a distributed publish-subscribe messaging system that is designed to be fast, scalable, and durable. It’s an open-source message broker written in Scala that can support a large number of consumers and retain large amounts of data with very little overhead.

CloudKarafka automates every part of setup, running, and scaling of Apache Kafka. We have support staff available 24/7 to help in any event or with any questions. CloudKarafka lets developers focus on the core part of their applications, instead of managing and maintaining servers.

We offer five different plans for different needs. You can try CloudKarafka for free with the Developer Duck plan.

All plans are billed by the second, meaning you can try out even the largest instance types without spending much. As soon as you delete your instance, you won’t be charged for it anymore. Billing occurs at the end of each month, and you are only charged for the time an instance was active.

Instance details and console

Your Kafka instance is immediately created after provisioning the add-on. You can view your instance’s details from the Heroku CLI like so:

$ heroku addons:open cloudkarafka

The CloudKarafka Dashboard as seen from Heroku

The dashboard contains useful information about your instance, along with configuration options for Kafka topics, and more. The following sections describe features of the CloudKarafka dashboard.

Add and delete topics

When you create your instance, a default Kafka topic is created for you. You can add up to 5 topics to your free instance. Add new topics to your instance from the Topics tab of your CloudKarafka dashboard.

Log

The Apache Kafka log stream is available only for dedicated instances.

This dashboard tab shows the live log from Kafka.

Metrics

Server metrics are available only for dedicated instances.

The Metrics tab helps you to measure performance metrics from your server. CloudKarafka shows monitoring for CPU Usage, Memory Usage, and Disk Usage.

CloudKarafka management

CloudKarafka has a tool for management of Apache Kafka. It is enabled for new instances by default.

Ruby

For ruby the recommended library is rdkafka. Itis a wrapper around the excellent librdkafka and supports all the features in the latest Kafka release. The main feature that we need from a library is support for authentication with SASL/SCRAM since this is what CloudKarafka uses.

Add rdkafka as a dependency to your Gemfile and execute bundle install.

First thing that you have to do is connect to the Kafka server. You can get all the connection variables you need from Heroku to create the CloudKarafka instance.

The following code snippet demonstrates how to publish and subscribe:

#Producer.rb
require 'rdkafka'
config = {
          :"bootstrap.servers" => ENV['CLOUDKARAFKA_BROKERS'],
          :"group.id"          => "cloudkarafka-example",
          :"sasl.username"     => ENV['CLOUDKARAFKA_USERNAME'],
          :"sasl.password"     => ENV['CLOUDKARAFKA_PASSWORD'],
          :"security.protocol" => "SASL_SSL",
          :"sasl.mechanisms"   => "SCRAM-SHA-256"
}
topic = "#{ENV['CLOUDKARAFKA_TOPIC_PREFIX']}test"
rdkafka = Rdkafka::Config.new(config)
producer = rdkafka.producer
100.times do |i|
  puts "Producing message #{i}"
  producer.produce(
      topic:   topic,
      payload: "Payload #{i}",
      key:     "Key #{i}"
  ).wait
end
#Consumer.rb
require 'rdkafka'
config = {
          :"bootstrap.servers" => ENV['CLOUDKARAFKA_BROKERS'],
          :"group.id"          => "cloudkarafka-example",
          :"sasl.username"     => ENV['CLOUDKARAFKA_USERNAME'],
          :"sasl.password"     => ENV['CLOUDKARAFKA_PASSWORD'],
          :"security.protocol" => "SASL_SSL",
          :"sasl.mechanisms"   => "SCRAM-SHA-256"
}
topic = "#{ENV['CLOUDKARAFKA_TOPIC_PREFIX']}test"
rdkafka = Rdkafka::Config.new(config)
consumer = rdkafka.consumer
consumer.subscribe(topic)
begin
  consumer.each do |message|
    puts "Message received: #{message}"
  end
rescue Rdkafka::RdkafkaError => e
  retry if e.is_partition_eof?
  raise
end

See the full example here: https://github.com/CloudKarafka/ruby-kafka-example

Python

The Confluent Python client can be installed as follows:

$ pip install confluent-kafka

First, connect to the Kafka server. You can get all the connection variables you need from Heroku when you create your CloudKarafka instance. The following code snippet demonstrates how to publish and consume messages:

import sys
import os

from confluent_kafka import Producer

if __name__ == '__main__':
    topic = "%s.test" % os.environ['CLOUDKARAFKA_TOPIC_PREFIX']
    conf = {
        'bootstrap.servers': os.environ['CLOUDKARAFKA_BROKERS'],
        'session.timeout.ms': 6000,
        'default.topic.config': {'auto.offset.reset': 'smallest'},
        'security.protocol': 'SASL_SSL',
        'sasl.mechanisms': 'SCRAM-SHA-256',
        'sasl.username': os.environ['CLOUDKARAFKA_USERNAME'],
        'sasl.password': os.environ['CLOUDKARAFKA_PASSWORD']
    }
    p = Producer(**conf)
    for line in sys.stdin:
        try:
            p.produce(topic, line.rstrip())
        except BufferError as e:
            sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
                             len(p))
        p.poll(0)
    p.flush()

See the full example here: https://github.com/CloudKarafka/python-kafka-example

Go

There are two very good libraries for Go, one is Sarama which is a pure Go library. Then we have confluent-kafka-go which is a wrapper around the excellent librdkafka.

For this example we are going to use confluent_kafka_go since it support authentication with SASL SCRAM which is what we use at CloudKarafka.

package main

import (
    "fmt"
    "os"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
    config := &kafka.ConfigMap{
        "metadata.broker.list": os.Getenv("CLOUDKARAFKA_BROKERS"),
        "security.protocol":    "SASL_SSL",
        "sasl.mechanisms":      "SCRAM-SHA-256",
        "sasl.username":        os.Getenv("CLOUDKARAFKA_USERNAME"),
        "sasl.password":        os.Getenv("CLOUDKARAFKA_PASSWORD"),
        "group.id":             os.Getenv("CLOUDKARAFKA_GROUPID"),
        "default.topic.config": kafka.ConfigMap{"auto.offset.reset": "earliest"},
    }
    topic := os.Getenv("CLOUDKARAFKA_TOPIC_PREFIX") + ".test"
    p, err := kafka.NewProducer(config)
    if err != nil {
        fmt.Printf("Failed to create producer: %s\n", err)
        os.Exit(1)
    }
    fmt.Printf("Created Producer %v\n", p)
  deliveryChan := make(chan kafka.Event)

    for i := 0; i < 10; i++ {
        value := fmt.Sprintf("[%d] Hello Go!", i+1)
        err = p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(value)}, deliveryChan)
        e := <-deliveryChan
        m := e.(*kafka.Message)
        if m.TopicPartition.Error != nil {
            fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
        } else {
            fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
                *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
        }
    }
    close(deliveryChan)
}
package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
    config := &kafka.ConfigMap{
        "metadata.broker.list":            os.Getenv("CLOUDKARAFKA_BROKERS"),
        "security.protocol":               "SASL_SSL",
        "sasl.mechanisms":                 "SCRAM-SHA-256",
        "sasl.username":                   os.Getenv("CLOUDKARAFKA_USERNAME"),
        "sasl.password":                   os.Getenv("CLOUDKARAFKA_PASSWORD"),
        "group.id":                        "cloudkarafka-example",
        "go.events.channel.enable":        true,
        "go.application.rebalance.enable": true,
        "default.topic.config":            kafka.ConfigMap{"auto.offset.reset": "earliest"},
    }
    topic := os.Getenv("CLOUDKARAFKA_TOPIC_PREFIX") + "test"

    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
    c, err := kafka.NewConsumer(config)
    if err != nil {
        fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
        os.Exit(1)
    }
    fmt.Printf("Created Consumer %v\n", c)
    err = c.Subscribe(topic, nil)
    run := true
    counter := 0
    commitAfter := 1000
    for run == true {
        select {
        case sig := <-sigchan:
            fmt.Printf("Caught signal %v: terminating\n", sig)
            run = false
        case ev := <-c.Events():
            switch e := ev.(type) {
            case kafka.AssignedPartitions:
                c.Assign(e.Partitions)
            case kafka.RevokedPartitions:
                c.Unassign()
            case *kafka.Message:
                fmt.Printf("%% Message on %s: %s\n", e.TopicPartition, string(e.Value))
                counter++
                if counter > commitAfter {
                    c.Commit()
                    counter = 0
                }

            case kafka.PartitionEOF:
                fmt.Printf("%% Reached %v\n", e)
            case kafka.Error:
                fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
                run = false
            }
        }
    }
    fmt.Printf("Closing consumer\n")
    c.Close()
}

See the full example here: https://github.com/CloudKarafka/go-kafka-example

Java

To get started with Apache Kafka, add the kafka-client dependency to your project’s pom.xml file:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>1.0.0</version>
</dependency>
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Date;
import java.util.Properties;

public class KafkaExample {
    private final String topic;
    private final Properties props;

    public KafkaExample(String brokers, String topic, String username, String password) {
        this.topic = topic;

        String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
        String jaasCfg = String.format(jaasTemplate, username, password);

        String serializer = StringSerializer.class.getName();
        String deserializer = StringDeserializer.class.getName();
        props = new Properties();
        props.put("bootstrap.servers", brokers);
        props.put("group.id", "newer");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", deserializer);
        props.put("value.deserializer", deserializer);
        props.put("key.serializer", serializer);
        props.put("value.serializer", serializer);
        props.put("security.protocol", "SASL_SSL");
        props.put("sasl.mechanism", "SCRAM-SHA-256");
        props.put("sasl.jaas.config", jaasCfg);
    }

    public void consume() {
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
        }
    }

    public void produce() {
        Thread one = new Thread() {
            public void run() {
                try {
                    Producer<String, String> producer = new KafkaProducer<>(props);
                    int i = 0;
                    while(true) {
                        Date d = new Date();
                        producer.send(new ProducerRecord<>(topic, Integer.toString(i), d.toString()));
                        Thread.sleep(1000);
                        i++;
                    }
                    //producer.close();
                } catch (InterruptedException v) {
                    System.out.println(v);
                }
            }
        };
        one.start();
    }

    public static void main(String[] args) {
        String topic = System.getenv("CLOUDKARAFKA_TOPIC_PREFIX") + ".test";
        KafkaExample c = new KafkaExample(
                System.getenv("CLOUDKARAFKA_BROKERS"), topic,
                System.getenv("CLOUDKARAFKA_USERNAME"), System.getenv("CLOUDKARAFKA_PASSWORD"));
        c.produce();
        c.consume();
    }
}

See the full example here: https://github.com/CloudKarafka/java-kafka-example

Node.js

The best Kafka library for node.js right now is Blizzard’s node-rdkafka. You can install the node-rdkafka module by using npm: npm install node-rdkafka

Consumer

var Kafka = require("node-rdkafka");
var kafkaConf = {
  "group.id": "cloudkarafka-example",
  "metadata.broker.list": process.env.CLOUDKARAFKA_BROKERS.split(","),
  "socket.keepalive.enable": true,
  "security.protocol": "SASL_SSL",
  "sasl.mechanisms": "SCRAM-SHA-256",
  "sasl.username": process.env.CLOUDKARAFKA_USERNAME,
  "sasl.password": process.env.CLOUDKARAFKA_PASSWORD
};
const prefix = process.env.CLOUDKARAFKA_USERNAME;
const topics = [`${prefix}.test`];
const consumer = new Kafka.KafkaConsumer(kafkaConf, {
  "auto.offset.reset": "beginning"
});
const numMessages = 5;
let counter = 0;
consumer.on("error", function(err) {
  console.error(err);
});
consumer.on("ready", function(arg) {
  console.log(`Consumer ${arg.name} ready`);
  consumer.subscribe(topics);
  consumer.consume();
});
consumer.on("data", function(m) {
  counter++;
  if (counter % numMessages === 0) {
    console.log("calling commit");
    consumer.commit(m);
  }
  console.log(m.value.toString());
});
consumer.on("disconnected", function(arg) {
  process.exit();
});
consumer.connect();
setTimeout(function() {
  consumer.disconnect();
}, 300000);

Producer

const Kafka = require("node-rdkafka");

const kafkaConf = {
  "group.id": "cloudkarafka-example",
  "metadata.broker.list": process.env.CLOUDKARAFKA_BROKERS.split(","),
  "socket.keepalive.enable": true,
  "security.protocol": "SASL_SSL",
  "sasl.mechanisms": "SCRAM-SHA-256",
  "sasl.username": process.env.CLOUDKARAFKA_USERNAME,
  "sasl.password": process.env.CLOUDKARAFKA_PASSWORD
};
const prefix = process.env.CLOUDKARAFKA_TOPIC_PREFIX;
const topic = `${prefix}.test`;
const producer = new Kafka.Producer(kafkaConf);
const maxMessages = 20;
const genMessage = i => new Buffer(`Kafka example, message number ${i}`);
producer.on("ready", function(arg) {
  console.log(`producer ${arg.name} ready.`);
  for (var i = 0; i < maxMessages; i++) {
    producer.produce(topic, -1, genMessage(i), i);
  }
  setTimeout(() => producer.disconnect(), 0);
});
producer.on("disconnected", function(arg) {
  process.exit();
});
producer.connect();

See the full example here: https://github.com/CloudKarafka/nodejs-kafka-example

Alternative client libraries include franz-kafka.

Removing the add-on

You can remove the add-on directly from the Heroku CLI:

$ heroku addon:destroy cloudkarafka --confirm <name of app>

Private Spaces

It is possible to install CloudKarafka in a Heroku Private Space, but VPC-peering is currently not available for non-Heroku add-ons.

GDPR

Information about GDPR can be found here: https://www.cloudkarafka.com/gdpr.html.

For all our customers who collect personal data from individuals in the EU, we offer a DPA. Our DPA offers terms that meet GDPR requirements and that reflects our data privacy and security commitments to our customers and their data.

You can view the DPA here: https://www.cloudkarafka.com/terms_of_service.html#dpa, and you need to send us an email if you need a signed version.

Support

All CloudKarafka support and runtime issues should be logged with Heroku Support at support.heroku.com. Any non-support related issues or product feedback is welcome at support@cloudkarafka.com. For our plans Bouncing Bat and larger, we provide 24/7 critical support with a 30-minutes maximum initial response time. For the plans Fast Fox and larger we also provide direct phone numbers.

Keep reading

  • All Add-ons

Feedback

Log in to submit feedback.

Ziggeo CloudMailIn

Information & Support

  • Getting Started
  • Documentation
  • Changelog
  • Compliance Center
  • Training & Education
  • Blog
  • Podcasts
  • Support Channels
  • Status

Language Reference

  • Node.js
  • Ruby
  • Java
  • PHP
  • Python
  • Go
  • Scala
  • Clojure

Other Resources

  • Careers
  • Elements
  • Products
  • Pricing

Subscribe to our monthly newsletter

Your email address:

  • RSS
    • Dev Center Articles
    • Dev Center Changelog
    • Heroku Blog
    • Heroku News Blog
    • Heroku Engineering Blog
  • Heroku Podcasts
  • Twitter
    • Dev Center Articles
    • Dev Center Changelog
    • Heroku
    • Heroku Status
  • Facebook
  • Instagram
  • Github
  • LinkedIn
  • YouTube
Heroku is acompany

 © Salesforce.com

  • heroku.com
  • Terms of Service
  • Privacy
  • Cookies
  • Cookie Preferences