CA Certificate Rotation for Apache Kafka on Heroku
Last updated January 25, 2021
This article is a work in progress, or documents a feature that is not yet released to all users. This article is unlisted. Only those with the link can access it.
Table of Contents
Background
Connecting and authenticating to brokers on Apache Kafka on Heroku relies on several levels of SSL verification. In order to support that, we provide several config variables to your app:
KAFKA_URL
the urls specifying the location of the Kafka brokers in the clusterKAFKA_CLIENT_CERT
a client cert to use to authenticate your client against the clusterKAFKA_CLIENT_CERT_KEY
the key of that client certKAFKA_TRUSTED_CERT
the CA of the server certs on each broker. This is used to verify that you are talking to the right nodes.
In order to keep clusters secure and up to date, from time to time Heroku needs to rotate the CA certificate involved in this authentication. When doing so, the KAFKA_TRUSTED_CERT
environment variable will contain two PEM-encoded certs, for example:
-----BEGIN CERTIFICATE-----
MIIDfzCCAmegAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1l
MjNkNmMxMi02MDdmLTQ5ZTAtYjNkOC01OThjNTNjNTg0YTYwHhcNMTcwNzExMTY1
MDI4WhcNMjcwNzExMTY1MDI4WjAyMTAwLgYDVQQDDCdjYS1lMjNkNmMxMi02MDdm
LTQ5ZTAtYjNkOC01OThjNTNjNTg0YTYwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAw
ggEKAoIBAQDYigb/ktZ8yFWgScvJQrwaZAWDv2Wrk7F2ofKO9mqHVKln7SKDvfu1
s7dtEaRHU4akwwQF3T8BbTyQb7YRyXoyyZnPUAfbuoescx3P6bR9W63jam12Dcyj
JR1dgdJkLIJpERw5mP9dyydZbwrmZIfW8bh61YYe7LG1zgtr9LK0kKO2uBXrsEro
WIR+w3sOnkyCHNDtaN2k1FIWaqhOljnJP+RsrROQ5kro6xqqIzD9xPM23ywRpLeg
WNVK8Muv0MYJp9Y414BbkasoP9sZ0fJzfh8fZwhknD31LnvNpNCRh+hSbL7GRYTP
5GfRdV57OtBNvmfqI2QkSCcqhiyrhnd1AgMBAAGjgZ8wgZwwHQYDVR0OBBYEFP/W
7Zgwd+ramQcbjpCVjgHSfAXoMA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0PAQH/BAQD
AgG2MFoGA1UdIwRTMFGAFP/W7Zgwd+ramQcbjpCVjgHSfAXooTakNDAyMTAwLgYD
VQQDDCdjYS1lMjNkNmMxMi02MDdmLTQ5ZTAtYjNkOC01OThjNTNjNTg0YTaCAQAw
DQYJKoZIhvcNAQELBQADggEBABUGwntMwuy3DMHLKHPv+c1BjhTK1LDUfZ9CIzb4
RHrlSHEhWlwxMMowhDM/euKu5cSPoHm8091luvicnqcGbmvRXKK0X0CjGfRlKIrX
+Qh5WZQqe3DSQ8piwTXoTSIDm/mq8L+bhnye9ZoM3yrLvJ5xxsNPePas0uIZwN+u
nK42B5w7yfhWs2esqqkf1zevQ1xipZSONiTxQRTIv92DnFyNvb/xzlWB7FGt5vc4
7Rg8GOB7I7KB8IdZQIy8IZ73v5Jd0IFkr6Oj/uPzgqsra9XafQnDkIx98v4HH2ZX
Lu00XgWU2FqlcJEAiuyb9hse9ZeMCpe3SC6FYkH6ripxsfc=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIDdjCCAl6gAwIBAgIBADANBgkqhkiG9w0BAQsFADAvMS0wKwYDVQQDDCRlMjNk
NmMxMi02MDdmLTQ5ZTAtYjNkOC01OThjNTNjNTg0YTYwHhcNMTcwNzExMTY1MDI4
WhcNMTgwNzExMTY1MDI4WjAvMS0wKwYDVQQDDCRlMjNkNmMxMi02MDdmLTQ5ZTAt
YjNkOC01OThjNTNjNTg0YTYwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIB
AQDg21SfqLR0RRBf9qMGCdu1xHU7KnQo6/2dizdZ5X5mohkG8baF4RnM1QhSxkff
weD6tIzMpwOaecwRQw868qcn/YaLAVr+tX+x8nOQi8Gy8m+RQhPqTLesaJE7YI9e
oSWagtp8JGh3NNlHD0guHqfHIT9lmJK6RtP+CsqgJjzoRQhOXLb+LFRioO4Ca7ww
XiwYdFSD6/gs/torgglf87Lbj/h2Nuda+1wdZw8GYTCBUaO3GkqFd8dxU4PMvrsQ
EwCmPnmT74P7TlxD7M64vh/vMiHvxqPzaxYUsM6qaF/15ObOSZb0jK2ZRrpR5PuP
Z3//kuV39a60ov4tRocUTWY7AgMBAAGjgZwwgZkwHQYDVR0OBBYEFK3hj4THIL0d
lS9iS/FtMgi502cpMA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0PAQH/BAQDAgG2MFcG
A1UdIwRQME6AFK3hj4THIL0dlS9iS/FtMgi502cpoTOkMTAvMS0wKwYDVQQDDCRl
MjNkNmMxMi02MDdmLTQ5ZTAtYjNkOC01OThjNTNjNTg0YTaCAQAwDQYJKoZIhvcN
AQELBQADggEBAFu1wXP2hsh0IdQoy63QzsFMLTkTgAs9SZurJT3N/ECYyvbHv/cT
LnZjMiXQnXur1odFHau5VuOs2o77N2tWcVHcGcPE/rL+Bpt9C9eXk3c/6okNPMDT
06XcUA5k9QzlyfXVvu07tcGdjqy+X+PxdMyBdv1cH+BrRAWCek/4rAMpk1fD2nJs
qweQDmQkT0nt3rGwU911X+oUDsPl8NlWYjYt0F5Vjv/4KwBonCpdhK4wQEBS1RmT
Dq2NXDKat5G8eRXyt+WAAyAngdExxjrwg7s6ORr4dg7OsZaAair27XExys0QqT3N
UluZHcxuJmnN0gHniedyi2h/lLE+4n5j2Jc=
-----END CERTIFICATE-----
Supporting two PEM-encoded certificates in this environment variable will allow your application to survive these CA certificate rotations without any notable impact.
Do note however, that these certificate rotations will involve rolling broker restarts. For more best practices on making your application robust to those, see our article about Robust Usage of Apache Kafka on Heroku.
sarama
Using multiple CA certificates from sarama, the golang client, is relatively easy. When verifying that server certificates can be trusted, you will need some code like this:
func verifyServerCert(tc *tls.Config, caCert string, url string) (bool, error) {
// Create connection to server
conn, err := tls.Dial("tcp", url, tc)
if err != nil {
return false, err
}
// Pull servers cert
serverCert := conn.ConnectionState().PeerCertificates[0]
roots := x509.NewCertPool()
ok := roots.AppendCertsFromPEM([]byte(caCert))
if !ok {
return false, errors.New("Unable to parse Trusted Cert")
}
// Verify Server Cert
opts := x509.VerifyOptions{Roots: roots}
if _, err := serverCert.Verify(opts); err != nil {
log.Println("Unable to verify Server Cert")
return false, err
}
return true, nil
}
The caCert
above is retrieved from the KAFKA_TRUSTED_CERT
config variable and passed in as a string.
See our demo app for a more worked example.
ruby-kafka
Whilst the ruby-kafka library does support multiple CA certificates, it does not correctly parse the PEM passed in the config vars. Support for this has recently been introduced in a beta version of the library, so you’ll need to use this version of ruby-kafka.
When using that version, you can do the following:
require 'tempfile'
tmp_ca_file = Tempfile.new('ca_certs')
tmp_ca_file.write(ENV.fetch("KAFKA_TRUSTED_CERT"))
kafka = Kafka.new(
seed_brokers: ENV.fetch("KAFKA_URL"),
ssl_ca_cert_file_path: tmp_ca_file.path,
ssl_client_cert: ENV.fetch("KAFKA_CLIENT_CERT"),
ssl_client_cert_key: ENV.fetch("KAFKA_CLIENT_CERT_KEY"),
)
# close the tempfile so it can be garbage collected
tmp_ca_file.close
kafka-python
The python client only supports filesystem-based loading of certificates. This largely depends on the implementation around storing certificates in the filesystem and loading them into the underlying certificate chain:
def get_kafka_ssl_context():
# NOTE: This will bomb if the necessary config vars aren't present; it's assumed their
# existence will be checked for elsewhere !!!
#
# 1. Write the PEM certificates necessary for connecting to the Kafka brokers to physical
# files. The broker connection SSL certs are passed in environment/config variables and
# the python and ssl libraries require them in physical files. The public keys are written
# to short lived NamedTemporaryFile files; the client key is encrypted before writing to
# the short lived NamedTemporaryFile
#
# 2. Create and return an SSLContext for connecting to the Kafka brokers referencing the
# PEM certificates written above
#
# stash the kafka certs in named temporary files for loading into SSLContext. Initialize the
# SSLContext inside the with so when it goes out of scope the files are removed which has them
# existing for the shortest amount of time. As extra caution password
# protect/encrypt the client key
with NamedTemporaryFile(suffix='.crt') as cert_file, \
NamedTemporaryFile(suffix='.key') as key_file, \
NamedTemporaryFile(suffix='.crt') as trust_file:
cert_file.write(os.environ['KAFKA_CLIENT_CERT'])
cert_file.flush()
# setup cryptography to password encrypt/protect the client key so it's not in the clear on
# the filesystem. Use the generated password in the call to load_cert_chain
passwd = os.urandom(33).encode('base64')
private_key = serialization.load_pem_private_key(
os.environ['KAFKA_CLIENT_CERT_KEY'],
password=None,
backend=default_backend()
)
pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(passwd)
)
key_file.write(pem)
key_file.flush()
trust_file.write(os.environ['KAFKA_TRUSTED_CERT'])
trust_file.flush()
# create an SSLContext for passing into the kafka provider using the create_default_context
# function which creates an SSLContext with protocol set to PROTOCOL_SSLv23, OP_NO_SSLv2,
# and OP_NO_SSLv3 when purpose=SERVER_AUTH.
ssl_context = ssl.create_default_context(
purpose=ssl.Purpose.SERVER_AUTH, cafile=trust_file.name)
ssl_context.load_cert_chain(cert_file.name, keyfile=key_file.name, password=passwd)
# Purposely disabling hostname checking. Rely on the ca trust_cert.
ssl_context.check_hostname = False
return ssl_context
Java
The Java client requires creating truststores and keystores manually and writing them to the underlying filesystem. We recommend using env-keystore to do this:
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
try {
EnvKeyStore envTrustStore = EnvKeyStore.createWithRandomPassword("KAFKA_TRUSTED_CERT");
EnvKeyStore envKeyStore = EnvKeyStore.createWithRandomPassword("KAFKA_CLIENT_CERT_KEY", "KAFKA_CLIENT_CERT");
File trustStore = envTrustStore.storeTemp();
File keyStore = envKeyStore.storeTemp();
properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, envTrustStore.type());
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStore.getAbsolutePath());
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, envTrustStore.password());
properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, envKeyStore.type());
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStore.getAbsolutePath());
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, envKeyStore.password());
} catch (Exception e) {
throw new RuntimeException("There was a problem creating the Kafka key stores", e);
}
Once in the filesystem, truststores and keystores must be added into a Properties object, which is then passed into Producers and Consumers.
Note that to handle multiple CA certificates, you have to be on version 0.1.3 of env-keystore.
no-kafka
The no-kafka library does not currently verify CA certificates, and as a result, it is not affected by the changes to the KAFKA_TRUSTED_CERTTRUSTED_CERT
environment variable.