Skip Navigation
Show nav
Dev Center
  • Get Started
  • Documentation
  • Changelog
  • Search
  • Get Started
    • Node.js
    • Ruby on Rails
    • Ruby
    • Python
    • Java
    • PHP
    • Go
    • Scala
    • Clojure
    • .NET
  • Documentation
  • Changelog
  • More
    Additional Resources
    • Home
    • Elements
    • Products
    • Pricing
    • Careers
    • Help
    • Status
    • Events
    • Podcasts
    • Compliance Center
    Heroku Blog

    Heroku Blog

    Find out what's new with Heroku on our blog.

    Visit Blog
  • Log inorSign up
View categories

Categories

  • Heroku Architecture
    • Compute (Dynos)
      • Dyno Management
      • Dyno Concepts
      • Dyno Behavior
      • Dyno Reference
      • Dyno Troubleshooting
    • Stacks (operating system images)
    • Networking & DNS
    • Platform Policies
    • Platform Principles
  • Developer Tools
    • Command Line
    • Heroku VS Code Extension
  • Deployment
    • Deploying with Git
    • Deploying with Docker
    • Deployment Integrations
  • Continuous Delivery & Integration (Heroku Flow)
    • Continuous Integration
  • Language Support
    • Node.js
      • Troubleshooting Node.js Apps
      • Working with Node.js
      • Node.js Behavior in Heroku
    • Ruby
      • Rails Support
      • Working with Bundler
      • Working with Ruby
      • Ruby Behavior in Heroku
      • Troubleshooting Ruby Apps
    • Python
      • Working with Python
      • Background Jobs in Python
      • Python Behavior in Heroku
      • Working with Django
    • Java
      • Java Behavior in Heroku
      • Working with Java
      • Working with Maven
      • Working with Spring Boot
      • Troubleshooting Java Apps
    • PHP
      • PHP Behavior in Heroku
      • Working with PHP
    • Go
      • Go Dependency Management
    • Scala
    • Clojure
    • .NET
      • Working with .NET
  • Databases & Data Management
    • Heroku Postgres
      • Postgres Basics
      • Postgres Getting Started
      • Postgres Performance
      • Postgres Data Transfer & Preservation
      • Postgres Availability
      • Postgres Special Topics
      • Migrating to Heroku Postgres
    • Heroku Key-Value Store
    • Apache Kafka on Heroku
    • Other Data Stores
  • AI
    • Model Context Protocol
    • Vector Database
    • Working with AI
    • Heroku Inference
      • Inference Essentials
      • AI Models
      • Inference API
      • Heroku Inference Quick Start Guides
  • Monitoring & Metrics
    • Logging
  • App Performance
  • Add-ons
    • All Add-ons
  • Collaboration
  • Security
    • App Security
    • Identities & Authentication
      • Single Sign-on (SSO)
    • Private Spaces
      • Infrastructure Networking
    • Compliance
  • Heroku Enterprise
    • Enterprise Accounts
    • Enterprise Teams
    • Heroku Connect (Salesforce sync)
      • Heroku Connect Administration
      • Heroku Connect Reference
      • Heroku Connect Troubleshooting
  • Patterns & Best Practices
  • Extending Heroku
    • Platform API
    • App Webhooks
    • Heroku Labs
    • Building Add-ons
      • Add-on Development Tasks
      • Add-on APIs
      • Add-on Guidelines & Requirements
    • Building CLI Plugins
    • Developing Buildpacks
    • Dev Center
  • Accounts & Billing
  • Troubleshooting & Support
  • Integrating with Salesforce
  • Databases & Data Management
  • Apache Kafka on Heroku
  • CA Certificate Rotation for Apache Kafka on Heroku

CA Certificate Rotation for Apache Kafka on Heroku

English — 日本語に切り替える

Last updated January 25, 2021

This article is a work in progress, or documents a feature that is not yet released to all users. This article is unlisted. Only those with the link can access it.

Table of Contents

  • Background
  • sarama
  • ruby-kafka
  • kafka-python
  • Java
  • no-kafka

Background

Connecting and authenticating to brokers on Apache Kafka on Heroku relies on several levels of SSL verification. In order to support that, we provide several config variables to your app:

  • KAFKA_URL the urls specifying the location of the Kafka brokers in the cluster
  • KAFKA_CLIENT_CERT a client cert to use to authenticate your client against the cluster
  • KAFKA_CLIENT_CERT_KEY the key of that client cert
  • KAFKA_TRUSTED_CERT the CA of the server certs on each broker. This is used to verify that you are talking to the right nodes.

In order to keep clusters secure and up to date, from time to time Heroku needs to rotate the CA certificate involved in this authentication. When doing so, the KAFKA_TRUSTED_CERT environment variable will contain two PEM-encoded certs, for example:

 -----BEGIN CERTIFICATE-----
MIIDfzCCAmegAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1l
MjNkNmMxMi02MDdmLTQ5ZTAtYjNkOC01OThjNTNjNTg0YTYwHhcNMTcwNzExMTY1
MDI4WhcNMjcwNzExMTY1MDI4WjAyMTAwLgYDVQQDDCdjYS1lMjNkNmMxMi02MDdm
LTQ5ZTAtYjNkOC01OThjNTNjNTg0YTYwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAw
ggEKAoIBAQDYigb/ktZ8yFWgScvJQrwaZAWDv2Wrk7F2ofKO9mqHVKln7SKDvfu1
s7dtEaRHU4akwwQF3T8BbTyQb7YRyXoyyZnPUAfbuoescx3P6bR9W63jam12Dcyj
JR1dgdJkLIJpERw5mP9dyydZbwrmZIfW8bh61YYe7LG1zgtr9LK0kKO2uBXrsEro
WIR+w3sOnkyCHNDtaN2k1FIWaqhOljnJP+RsrROQ5kro6xqqIzD9xPM23ywRpLeg
WNVK8Muv0MYJp9Y414BbkasoP9sZ0fJzfh8fZwhknD31LnvNpNCRh+hSbL7GRYTP
5GfRdV57OtBNvmfqI2QkSCcqhiyrhnd1AgMBAAGjgZ8wgZwwHQYDVR0OBBYEFP/W
7Zgwd+ramQcbjpCVjgHSfAXoMA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0PAQH/BAQD
AgG2MFoGA1UdIwRTMFGAFP/W7Zgwd+ramQcbjpCVjgHSfAXooTakNDAyMTAwLgYD
VQQDDCdjYS1lMjNkNmMxMi02MDdmLTQ5ZTAtYjNkOC01OThjNTNjNTg0YTaCAQAw
DQYJKoZIhvcNAQELBQADggEBABUGwntMwuy3DMHLKHPv+c1BjhTK1LDUfZ9CIzb4
RHrlSHEhWlwxMMowhDM/euKu5cSPoHm8091luvicnqcGbmvRXKK0X0CjGfRlKIrX
+Qh5WZQqe3DSQ8piwTXoTSIDm/mq8L+bhnye9ZoM3yrLvJ5xxsNPePas0uIZwN+u
nK42B5w7yfhWs2esqqkf1zevQ1xipZSONiTxQRTIv92DnFyNvb/xzlWB7FGt5vc4
7Rg8GOB7I7KB8IdZQIy8IZ73v5Jd0IFkr6Oj/uPzgqsra9XafQnDkIx98v4HH2ZX
Lu00XgWU2FqlcJEAiuyb9hse9ZeMCpe3SC6FYkH6ripxsfc=
-----END CERTIFICATE-----

-----BEGIN CERTIFICATE-----
MIIDdjCCAl6gAwIBAgIBADANBgkqhkiG9w0BAQsFADAvMS0wKwYDVQQDDCRlMjNk
NmMxMi02MDdmLTQ5ZTAtYjNkOC01OThjNTNjNTg0YTYwHhcNMTcwNzExMTY1MDI4
WhcNMTgwNzExMTY1MDI4WjAvMS0wKwYDVQQDDCRlMjNkNmMxMi02MDdmLTQ5ZTAt
YjNkOC01OThjNTNjNTg0YTYwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIB
AQDg21SfqLR0RRBf9qMGCdu1xHU7KnQo6/2dizdZ5X5mohkG8baF4RnM1QhSxkff
weD6tIzMpwOaecwRQw868qcn/YaLAVr+tX+x8nOQi8Gy8m+RQhPqTLesaJE7YI9e
oSWagtp8JGh3NNlHD0guHqfHIT9lmJK6RtP+CsqgJjzoRQhOXLb+LFRioO4Ca7ww
XiwYdFSD6/gs/torgglf87Lbj/h2Nuda+1wdZw8GYTCBUaO3GkqFd8dxU4PMvrsQ
EwCmPnmT74P7TlxD7M64vh/vMiHvxqPzaxYUsM6qaF/15ObOSZb0jK2ZRrpR5PuP
Z3//kuV39a60ov4tRocUTWY7AgMBAAGjgZwwgZkwHQYDVR0OBBYEFK3hj4THIL0d
lS9iS/FtMgi502cpMA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0PAQH/BAQDAgG2MFcG
A1UdIwRQME6AFK3hj4THIL0dlS9iS/FtMgi502cpoTOkMTAvMS0wKwYDVQQDDCRl
MjNkNmMxMi02MDdmLTQ5ZTAtYjNkOC01OThjNTNjNTg0YTaCAQAwDQYJKoZIhvcN
AQELBQADggEBAFu1wXP2hsh0IdQoy63QzsFMLTkTgAs9SZurJT3N/ECYyvbHv/cT
LnZjMiXQnXur1odFHau5VuOs2o77N2tWcVHcGcPE/rL+Bpt9C9eXk3c/6okNPMDT
06XcUA5k9QzlyfXVvu07tcGdjqy+X+PxdMyBdv1cH+BrRAWCek/4rAMpk1fD2nJs
qweQDmQkT0nt3rGwU911X+oUDsPl8NlWYjYt0F5Vjv/4KwBonCpdhK4wQEBS1RmT
Dq2NXDKat5G8eRXyt+WAAyAngdExxjrwg7s6ORr4dg7OsZaAair27XExys0QqT3N
UluZHcxuJmnN0gHniedyi2h/lLE+4n5j2Jc=
-----END CERTIFICATE-----

Supporting two PEM-encoded certificates in this environment variable will allow your application to survive these CA certificate rotations without any notable impact.

Do note however, that these certificate rotations will involve rolling broker restarts. For more best practices on making your application robust to those, see our article about Robust Usage of Apache Kafka on Heroku.

sarama

Using multiple CA certificates from sarama, the golang client, is relatively easy. When verifying that server certificates can be trusted, you will need some code like this:

func verifyServerCert(tc *tls.Config, caCert string, url string) (bool, error) {
    // Create connection to server
    conn, err := tls.Dial("tcp", url, tc)
    if err != nil {
        return false, err
    }

    // Pull servers cert
    serverCert := conn.ConnectionState().PeerCertificates[0]

    roots := x509.NewCertPool()
    ok := roots.AppendCertsFromPEM([]byte(caCert))
    if !ok {
        return false, errors.New("Unable to parse Trusted Cert")
    }

    // Verify Server Cert
    opts := x509.VerifyOptions{Roots: roots}
    if _, err := serverCert.Verify(opts); err != nil {
        log.Println("Unable to verify Server Cert")
        return false, err
    }

    return true, nil
}

The caCert above is retrieved from the KAFKA_TRUSTED_CERT config variable and passed in as a string.

See our demo app for a more worked example.

ruby-kafka

Whilst the ruby-kafka library does support multiple CA certificates, it does not correctly parse the PEM passed in the config vars. Support for this has recently been introduced in a beta version of the library, so you’ll need to use this version of ruby-kafka.

When using that version, you can do the following:

require 'tempfile'

tmp_ca_file = Tempfile.new('ca_certs')
tmp_ca_file.write(ENV.fetch("KAFKA_TRUSTED_CERT"))

kafka = Kafka.new(
  seed_brokers: ENV.fetch("KAFKA_URL"),
  ssl_ca_cert_file_path: tmp_ca_file.path,
  ssl_client_cert: ENV.fetch("KAFKA_CLIENT_CERT"),
  ssl_client_cert_key: ENV.fetch("KAFKA_CLIENT_CERT_KEY"),
)

# close the tempfile so it can be garbage collected
tmp_ca_file.close

kafka-python

The python client only supports filesystem-based loading of certificates. This largely depends on the implementation around storing certificates in the filesystem and loading them into the underlying certificate chain:

def get_kafka_ssl_context():
    # NOTE: This will bomb if the necessary config vars aren't present; it's assumed their
    #   existence will be checked for elsewhere !!!
    #
    # 1. Write the PEM certificates necessary for connecting to the Kafka brokers to physical
    # files.  The broker connection SSL certs are passed in environment/config variables and
    # the python and ssl libraries require them in physical files. The public keys are written
    # to short lived NamedTemporaryFile files; the client key is encrypted before writing to
    # the short lived NamedTemporaryFile
    #
    # 2. Create and return an SSLContext for connecting to the Kafka brokers referencing the
    # PEM certificates written above
    #

    # stash the kafka certs in named temporary files for loading into SSLContext.  Initialize the
    # SSLContext inside the with so when it goes out of scope the files are removed which has them
    # existing for the shortest amount of time.  As extra caution password
    # protect/encrypt the client key
    with NamedTemporaryFile(suffix='.crt') as cert_file, \
         NamedTemporaryFile(suffix='.key') as key_file, \
         NamedTemporaryFile(suffix='.crt') as trust_file:
        cert_file.write(os.environ['KAFKA_CLIENT_CERT'])
        cert_file.flush()

        # setup cryptography to password encrypt/protect the client key so it's not in the clear on
        # the filesystem.  Use the generated password in the call to load_cert_chain
        passwd = os.urandom(33).encode('base64')
        private_key = serialization.load_pem_private_key(
            os.environ['KAFKA_CLIENT_CERT_KEY'],
            password=None,
            backend=default_backend()
        )
        pem = private_key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.BestAvailableEncryption(passwd)
        )
        key_file.write(pem)
        key_file.flush()

        trust_file.write(os.environ['KAFKA_TRUSTED_CERT'])
        trust_file.flush()

        # create an SSLContext for passing into the kafka provider using the create_default_context
        # function which creates an SSLContext with protocol set to PROTOCOL_SSLv23, OP_NO_SSLv2,
        # and OP_NO_SSLv3 when purpose=SERVER_AUTH.
        ssl_context = ssl.create_default_context(
            purpose=ssl.Purpose.SERVER_AUTH, cafile=trust_file.name)
        ssl_context.load_cert_chain(cert_file.name, keyfile=key_file.name, password=passwd)

        # Purposely disabling hostname checking. Rely on the ca trust_cert.
        ssl_context.check_hostname = False

    return ssl_context

Java

The Java client requires creating truststores and keystores manually and writing them to the underlying filesystem. We recommend using env-keystore to do this:

properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");

try {
    EnvKeyStore envTrustStore = EnvKeyStore.createWithRandomPassword("KAFKA_TRUSTED_CERT");
    EnvKeyStore envKeyStore = EnvKeyStore.createWithRandomPassword("KAFKA_CLIENT_CERT_KEY", "KAFKA_CLIENT_CERT");

    File trustStore = envTrustStore.storeTemp();
    File keyStore = envKeyStore.storeTemp();

    properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, envTrustStore.type());
    properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStore.getAbsolutePath());
    properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, envTrustStore.password());
    properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, envKeyStore.type());
    properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStore.getAbsolutePath());
    properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, envKeyStore.password());
} catch (Exception e) {
    throw new RuntimeException("There was a problem creating the Kafka key stores", e);
}

Once in the filesystem, truststores and keystores must be added into a Properties object, which is then passed into Producers and Consumers.

Note that to handle multiple CA certificates, you have to be on version 0.1.3 of env-keystore.

no-kafka

The no-kafka library does not currently verify CA certificates, and as a result, it is not affected by the changes to the KAFKA_TRUSTED_CERTTRUSTED_CERT environment variable.

Keep reading

  • Apache Kafka on Heroku

Feedback

Log in to submit feedback.

Robust Usage of Apache Kafka on Heroku Connecting to Apache Kafka on Heroku in a Private or Shield Space via PrivateLink

Information & Support

  • Getting Started
  • Documentation
  • Changelog
  • Compliance Center
  • Training & Education
  • Blog
  • Support Channels
  • Status

Language Reference

  • Node.js
  • Ruby
  • Java
  • PHP
  • Python
  • Go
  • Scala
  • Clojure
  • .NET

Other Resources

  • Careers
  • Elements
  • Products
  • Pricing
  • RSS
    • Dev Center Articles
    • Dev Center Changelog
    • Heroku Blog
    • Heroku News Blog
    • Heroku Engineering Blog
  • Twitter
    • Dev Center Articles
    • Dev Center Changelog
    • Heroku
    • Heroku Status
  • Github
  • LinkedIn
  • © 2025 Salesforce, Inc. All rights reserved. Various trademarks held by their respective owners. Salesforce Tower, 415 Mission Street, 3rd Floor, San Francisco, CA 94105, United States
  • heroku.com
  • Legal
  • Terms of Service
  • Privacy Information
  • Responsible Disclosure
  • Trust
  • Contact
  • Cookie Preferences
  • Your Privacy Choices