Apache Kafka on Heroku の CA 証明書ローテーション
最終更新日 2021年01月25日(月)
This article is a work in progress, or documents a feature that is not yet released to all users. This article is unlisted. Only those with the link can access it.
Table of Contents
背景
Apache Kafka on Heroku でのブローカーへの接続と認証は、複数のレベルの SSL 検証に依存しています。それをサポートするために、いくつかの環境設定がアプリに提供されています。
KAFKA_URL
: クラスター内の Kafka ブローカーの場所を指定する URLKAFKA_CLIENT_CERT
: クラスターに対してクライアントを認証するために使用するクライアント証明書KAFKA_CLIENT_CERT_KEY
: そのクライアント証明書のキーKAFKA_TRUSTED_CERT
: 各ブローカーのサーバー証明書の CA。これは、正しいノードと通信していることを確認するために使用されます。
クラスターを安全かつ最新の状態に保つために、Heroku では時々、この認証に関係する CA 証明書をローテーションする必要があります。その処理を行うとき、KAFKA_TRUSTED_CERT
環境変数の内容は、PEM でエンコードされた 2 つの証明書になります。次に例を示します。
-----BEGIN CERTIFICATE-----
MIIDfzCCAmegAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1l
MjNkNmMxMi02MDdmLTQ5ZTAtYjNkOC01OThjNTNjNTg0YTYwHhcNMTcwNzExMTY1
MDI4WhcNMjcwNzExMTY1MDI4WjAyMTAwLgYDVQQDDCdjYS1lMjNkNmMxMi02MDdm
LTQ5ZTAtYjNkOC01OThjNTNjNTg0YTYwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAw
ggEKAoIBAQDYigb/ktZ8yFWgScvJQrwaZAWDv2Wrk7F2ofKO9mqHVKln7SKDvfu1
s7dtEaRHU4akwwQF3T8BbTyQb7YRyXoyyZnPUAfbuoescx3P6bR9W63jam12Dcyj
JR1dgdJkLIJpERw5mP9dyydZbwrmZIfW8bh61YYe7LG1zgtr9LK0kKO2uBXrsEro
WIR+w3sOnkyCHNDtaN2k1FIWaqhOljnJP+RsrROQ5kro6xqqIzD9xPM23ywRpLeg
WNVK8Muv0MYJp9Y414BbkasoP9sZ0fJzfh8fZwhknD31LnvNpNCRh+hSbL7GRYTP
5GfRdV57OtBNvmfqI2QkSCcqhiyrhnd1AgMBAAGjgZ8wgZwwHQYDVR0OBBYEFP/W
7Zgwd+ramQcbjpCVjgHSfAXoMA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0PAQH/BAQD
AgG2MFoGA1UdIwRTMFGAFP/W7Zgwd+ramQcbjpCVjgHSfAXooTakNDAyMTAwLgYD
VQQDDCdjYS1lMjNkNmMxMi02MDdmLTQ5ZTAtYjNkOC01OThjNTNjNTg0YTaCAQAw
DQYJKoZIhvcNAQELBQADggEBABUGwntMwuy3DMHLKHPv+c1BjhTK1LDUfZ9CIzb4
RHrlSHEhWlwxMMowhDM/euKu5cSPoHm8091luvicnqcGbmvRXKK0X0CjGfRlKIrX
+Qh5WZQqe3DSQ8piwTXoTSIDm/mq8L+bhnye9ZoM3yrLvJ5xxsNPePas0uIZwN+u
nK42B5w7yfhWs2esqqkf1zevQ1xipZSONiTxQRTIv92DnFyNvb/xzlWB7FGt5vc4
7Rg8GOB7I7KB8IdZQIy8IZ73v5Jd0IFkr6Oj/uPzgqsra9XafQnDkIx98v4HH2ZX
Lu00XgWU2FqlcJEAiuyb9hse9ZeMCpe3SC6FYkH6ripxsfc=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIDdjCCAl6gAwIBAgIBADANBgkqhkiG9w0BAQsFADAvMS0wKwYDVQQDDCRlMjNk
NmMxMi02MDdmLTQ5ZTAtYjNkOC01OThjNTNjNTg0YTYwHhcNMTcwNzExMTY1MDI4
WhcNMTgwNzExMTY1MDI4WjAvMS0wKwYDVQQDDCRlMjNkNmMxMi02MDdmLTQ5ZTAt
YjNkOC01OThjNTNjNTg0YTYwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIB
AQDg21SfqLR0RRBf9qMGCdu1xHU7KnQo6/2dizdZ5X5mohkG8baF4RnM1QhSxkff
weD6tIzMpwOaecwRQw868qcn/YaLAVr+tX+x8nOQi8Gy8m+RQhPqTLesaJE7YI9e
oSWagtp8JGh3NNlHD0guHqfHIT9lmJK6RtP+CsqgJjzoRQhOXLb+LFRioO4Ca7ww
XiwYdFSD6/gs/torgglf87Lbj/h2Nuda+1wdZw8GYTCBUaO3GkqFd8dxU4PMvrsQ
EwCmPnmT74P7TlxD7M64vh/vMiHvxqPzaxYUsM6qaF/15ObOSZb0jK2ZRrpR5PuP
Z3//kuV39a60ov4tRocUTWY7AgMBAAGjgZwwgZkwHQYDVR0OBBYEFK3hj4THIL0d
lS9iS/FtMgi502cpMA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0PAQH/BAQDAgG2MFcG
A1UdIwRQME6AFK3hj4THIL0dlS9iS/FtMgi502cpoTOkMTAvMS0wKwYDVQQDDCRl
MjNkNmMxMi02MDdmLTQ5ZTAtYjNkOC01OThjNTNjNTg0YTaCAQAwDQYJKoZIhvcN
AQELBQADggEBAFu1wXP2hsh0IdQoy63QzsFMLTkTgAs9SZurJT3N/ECYyvbHv/cT
LnZjMiXQnXur1odFHau5VuOs2o77N2tWcVHcGcPE/rL+Bpt9C9eXk3c/6okNPMDT
06XcUA5k9QzlyfXVvu07tcGdjqy+X+PxdMyBdv1cH+BrRAWCek/4rAMpk1fD2nJs
qweQDmQkT0nt3rGwU911X+oUDsPl8NlWYjYt0F5Vjv/4KwBonCpdhK4wQEBS1RmT
Dq2NXDKat5G8eRXyt+WAAyAngdExxjrwg7s6ORr4dg7OsZaAair27XExys0QqT3N
UluZHcxuJmnN0gHniedyi2h/lLE+4n5j2Jc=
-----END CERTIFICATE-----
PEM でエンコードされた 2 つの証明書がこの環境変数でサポートされることにより、これらの CA 証明書のローテーションの間も、アプリケーションは特に目立った影響を受けることなく動作を続けることができます。
ただし、これらの証明書のローテーションに伴い、ブローカーの再起動が繰り返し発生することに注意してください。アプリケーションをそれらに対して堅牢にするためのベストプラクティスについては、「Apache Kafka on Heroku の堅牢な使用法」を参照してください。
sarama
golang クライアントである sarama から複数の CA 証明書を使用するのは比較的簡単です。サーバー証明書が信頼できることを確認するときは、次のようなコードが必要です。
func verifyServerCert(tc *tls.Config, caCert string, url string) (bool, error) {
// Create connection to server
conn, err := tls.Dial("tcp", url, tc)
if err != nil {
return false, err
}
// Pull servers cert
serverCert := conn.ConnectionState().PeerCertificates[0]
roots := x509.NewCertPool()
ok := roots.AppendCertsFromPEM([]byte(caCert))
if !ok {
return false, errors.New("Unable to parse Trusted Cert")
}
// Verify Server Cert
opts := x509.VerifyOptions{Roots: roots}
if _, err := serverCert.Verify(opts); err != nil {
log.Println("Unable to verify Server Cert")
return false, err
}
return true, nil
}
上記の caCert
は KAFKA_TRUSTED_CERT
環境設定から取得され、文字列として渡されます。
より詳細な例については、デモアプリを参照してください。
ruby-kafka
ruby-kafka ライブラリは複数の CA 証明書をサポートしますが、環境設定で渡された PEM を正しく解析しません。ライブラリのベータバージョンで、この件に関してのサポートが最近導入されたため、このバージョンの ruby-kafka を使用する必要があります。
このバージョンを使用すると、以下のことができます。
require 'tempfile'
tmp_ca_file = Tempfile.new('ca_certs')
tmp_ca_file.write(ENV.fetch("KAFKA_TRUSTED_CERT"))
kafka = Kafka.new(
seed_brokers: ENV.fetch("KAFKA_URL"),
ssl_ca_cert_file_path: tmp_ca_file.path,
ssl_client_cert: ENV.fetch("KAFKA_CLIENT_CERT"),
ssl_client_cert_key: ENV.fetch("KAFKA_CLIENT_CERT_KEY"),
)
# close the tempfile so it can be garbage collected
tmp_ca_file.close
kafka-python
この Python クライアントは、ファイルシステムベースの証明書の読み込みのみをサポートします。これは、ファイルシステムへの証明書の保存と、基礎となる証明書チェーンにそれらの証明書を読み込むことに関連した実装に大部分が依存します。
def get_kafka_ssl_context():
# NOTE: This will bomb if the necessary config vars aren't present; it's assumed their
# existence will be checked for elsewhere !!!
#
# 1. Write the PEM certificates necessary for connecting to the Kafka brokers to physical
# files. The broker connection SSL certs are passed in environment/config variables and
# the python and ssl libraries require them in physical files. The public keys are written
# to short lived NamedTemporaryFile files; the client key is encrypted before writing to
# the short lived NamedTemporaryFile
#
# 2. Create and return an SSLContext for connecting to the Kafka brokers referencing the
# PEM certificates written above
#
# stash the kafka certs in named temporary files for loading into SSLContext. Initialize the
# SSLContext inside the with so when it goes out of scope the files are removed which has them
# existing for the shortest amount of time. As extra caution password
# protect/encrypt the client key
with NamedTemporaryFile(suffix='.crt') as cert_file, \
NamedTemporaryFile(suffix='.key') as key_file, \
NamedTemporaryFile(suffix='.crt') as trust_file:
cert_file.write(os.environ['KAFKA_CLIENT_CERT'])
cert_file.flush()
# setup cryptography to password encrypt/protect the client key so it's not in the clear on
# the filesystem. Use the generated password in the call to load_cert_chain
passwd = os.urandom(33).encode('base64')
private_key = serialization.load_pem_private_key(
os.environ['KAFKA_CLIENT_CERT_KEY'],
password=None,
backend=default_backend()
)
pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(passwd)
)
key_file.write(pem)
key_file.flush()
trust_file.write(os.environ['KAFKA_TRUSTED_CERT'])
trust_file.flush()
# create an SSLContext for passing into the kafka provider using the create_default_context
# function which creates an SSLContext with protocol set to PROTOCOL_SSLv23, OP_NO_SSLv2,
# and OP_NO_SSLv3 when purpose=SERVER_AUTH.
ssl_context = ssl.create_default_context(
purpose=ssl.Purpose.SERVER_AUTH, cafile=trust_file.name)
ssl_context.load_cert_chain(cert_file.name, keyfile=key_file.name, password=passwd)
# Purposely disabling hostname checking. Rely on the ca trust_cert.
ssl_context.check_hostname = False
return ssl_context
Java
Java クライアントでは、トラストストアとキーストアを手動で作成し、基礎となるファイルシステムに書き込む必要があります。これを行うには、env-keystore を使用することをお勧めします。
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
try {
EnvKeyStore envTrustStore = EnvKeyStore.createWithRandomPassword("KAFKA_TRUSTED_CERT");
EnvKeyStore envKeyStore = EnvKeyStore.createWithRandomPassword("KAFKA_CLIENT_CERT_KEY", "KAFKA_CLIENT_CERT");
File trustStore = envTrustStore.storeTemp();
File keyStore = envKeyStore.storeTemp();
properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, envTrustStore.type());
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStore.getAbsolutePath());
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, envTrustStore.password());
properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, envKeyStore.type());
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStore.getAbsolutePath());
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, envKeyStore.password());
} catch (Exception e) {
throw new RuntimeException("There was a problem creating the Kafka key stores", e);
}
ファイルシステムにアクセスしたら、トラストストアとキーストアを Properties オブジェクトに追加する必要があります。その後、このオブジェクトがプロデューサーとコンシューマーに渡されます。
複数の CA 証明書を処理するには、env-keystore のバージョン 0.1.3 を使用する必要があることに注意してください。
no-kafka
no-kafka ライブラリは現在、CA 証明書を検証しないため、KAFKA_TRUSTED_CERTTRUSTED_CERT
環境変数の変更による影響を受けません。