Heroku での Kafka Streams
この記事の英語版に更新があります。ご覧の翻訳には含まれていない変更点があるかもしれません。
最終更新日 2022年11月28日(月)
Table of Contents
Kafka Streams は、Apache Kafka の基礎となるコンポーネントを使用してストリーミングデータを処理する Java クライアントライブラリです。Kafka Streams を使用すると、軽量で、拡張性の高い、フォールトトレラントなストリーム処理アプリを簡単に開発できます。
Kafka Streams は、Heroku では、専用 Kafka プランと基本 Kafka プランの両方でサポートされています (基本プランにはいくつかの追加の設定が必要)。
Kafka Streams を使用して構築されたアプリケーションは、無限で、再生可能な、順序付けられた、フォールトトレラントなイベントのシーケンスであるストリームのデータを生成および消費します。ストリームは、Kafka トピックとして表されるか (KStream
)、または圧縮されたトピックとして具体化されます (KTable
)。デフォルトでは、このライブラリでは、アプリケーションがストリームイベントを 1 つずつ処理しながら、到着が遅いイベントや順序が正しくないイベントを処理する機能も提供することが保証されます。
基本的な例
Kafka Streams API を使用すると、数行のコードだけでアプリケーションを開発できます。次のサンプルは、単語数を保持する従来のユースケースを示しています。
words
.groupBy((key, word) -> word)
.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10)))
.count(Materialized.as("windowed-counts"))
.toStream()
.process(PostgresSink::new);
このコードは、次の処理を行います。
- 単語の入力ストリームを取得する
- その入力を単語でグループ化する
- 10 秒のタンブリングウィンドウ内の各単語の頻度をカウントする
- 断続的な結果をローカルストアに保存する
- 各ウィンドウ境界で結果の単語数を出力する
この例は、標準的な Kafka Streams アプリケーションで作成するロジックの大部分を示しています。アプリケーションの残りの部分は、主に設定で構成されています。Kafka Streams では、アプリケーションのロジックを基礎となるインフラストラクチャから切り離すことによって開発が簡略化されます。ここで、このライブラリは、透過的にワークロードを分散し、エラーを処理し、その他の低レベルのタスクを実行します。
アプリケーションの構成
Kafka Streams アプリケーションは、Java のさまざまな実装を使用して Heroku 上で実行できる通常の Java サービスです。Maven と Gradle のための Heroku の buildpack はどちらもサポートされています。
Gradle で複数プロジェクトの設定を使用すると、それぞれが異なる Kafka Streams サービスを表す複数の Gradle サブプロジェクトを作成できます。これらのサービスは独立して動作することも、相互接続することもできます。
各サブプロジェクトは、その上で ./gradlew stage
タスクが実行されると、Gradle プラグインを使用して独自の実行可能ファイルを生成します。これらの実行可能ファイルは、アプリケーションの build/libs/
ディレクトリに sub-project-name-all.jar
として指定された名前で作成されます。その後、Procfile
でワーカープロセスタイプを宣言することによって、Heroku ランタイムでこれらの実行可能ファイルを実行できます。
aggregator_worker: java -jar build/libs/streams-aggregator-all.jar
1 つのアプリケーション内での複数の Kafka Streams サービスの設定に関する詳細情報は、kafka-streams-on-heroku リポジトリで見つけることができます。
アプリケーションの接続
Heroku で Kafka ブローカーに接続するには SSL が必要です。これを行うには、次の手順が必要になります。
- アプリの
KAFKA_URL
環境設定に保存されている URI を解析します。 - env-keystore を使用して Kafka の
TRUSTED_CERT
、CLIENT_CERT_KEY
、CLIENT_CERT
環境設定を読み取り、トラストストアとキーストアの両方を作成します。 - トラストストアとキーストアの関連する SSL 設定を追加します。
private Properties buildHerokuKafkaConfigVars() throws URISyntaxException, CertificateException,
NoSuchAlgorithmException, KeyStoreException, IOException {
Properties properties = new Properties();
List<String> bootstrapServerList = Lists.newArrayList();
Iterable<String> kafkaUrl = Splitter.on(",")
.split(Preconditions.checkNotNull(System.getenv(HEROKU_KAFKA_URL)));
for (String url : kafkaUrl) {
URI uri = new URI(url);
bootstrapServerList.add(String.format("%s:%d", uri.getHost(), uri.getPort()));
switch (uri.getScheme()) {
case "kafka":
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
break;
case "kafka+ssl":
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
EnvKeyStore envTrustStore = EnvKeyStore.createWithRandomPassword(
HEROKU_KAFKA_TRUSTED_CERT);
EnvKeyStore envKeyStore = EnvKeyStore.createWithRandomPassword(
HEROKU_KAFKA_CLIENT_CERT_KEY, HEROKU_KAFKA_CLIENT_CERT);
File trustStoreFile = envTrustStore.storeTemp();
File keyStoreFile = envKeyStore.storeTemp();
properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, envTrustStore.type());
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
trustStoreFile.getAbsolutePath());
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, envTrustStore.password());
properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, envKeyStore.type());
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreFile.getAbsolutePath());
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, envKeyStore.password());
break;
default:
throw new URISyntaxException(uri.getScheme(), "Unknown URI scheme");
}
}
bootstrapServers = Joiner.on(",").join(bootstrapServerList);
return properties;
}
内部トピックとコンシューマーグループの管理
Kafka Streams は、フォールトトレランスと再パーティション分割のために内部トピックを使用します。これらのトピックは、Kafka Streams アプリケーションが正しく動作するために必要です。
Kafka Streams の内部トピックの作成は、Kafka の auto.create.topics.enable
設定とは関連がありません。代わりに、Kafka Streams は、管理者クライアント経由でクラスターと直接通信します。
専用 Kafka プラン
専用 Kafka プランは、ユーザー間で分離されています。このため、専用プランでの内部の Kafka Streams トピックに追加の設定は必要ありません。
専用プランに関する詳細情報は、専用プランと設定のページで見つけることができます。
基本 Kafka プラン
基本 Kafka プランは、複数の Heroku ユーザーを同じ一連の基礎となるリソースで共同ホストします。ユーザーデータとアクセス権限は、Kafka アクセス制御リスト (ACL) によって分離されます。さらに、名前の衝突を防ぐために、トピックとコンシューマーグループの名前は自動生成されたプレフィックスで名前空間処理されます。
基本プランで Kafka Streams アプリケーションを実行するには、application.id
の適切な設定と、内部トピックとコンシューマーグループの事前作成という 2 つの事前の手順が必要です。
application.id
の設定
各 Kafka Streams アプリケーションには、そのアプリケーションとそれに関連付けられたトポロジーを識別する application.id
という名前の重要な一意識別子があります。Kafka Basic プランを使用している場合は、各 application.id
が割り当てられたプレフィックスで始まっていることを確認する必要があります。
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, String.format("%saggregator-app", HEROKU_KAFKA_PREFIX));
内部トピックとコンシューマーグループの事前作成
Heroku での Kafka Basic プランは ACL を使用するため、Kafka Streams アプリケーションは、適切な ACL のないトピックとコンシューマーグループとは対話できません。Kafka Streams は、内部の管理者クライアントを使用して内部トピックとコンシューマーグループを実行時に透過的に作成するため、これは問題になります。これは主に、Kafka Streams でのプロセッサーに影響を与えます。
プロセッサーは、process
メソッドを実装するクラスです。これはストリームから入力イベントを受信し、それらのイベントを処理した後、オプションでダウンストリームプロセッサーへの出力イベントを生成します。ステートフルプロセッサーは、以降のイベントを処理するときに、前のイベントによって生成された状態を使用するプロセッサーです。Kafka Streams には、この状態を保存するための組み込みの機能が用意されています。
アプリケーション内のステートフルプロセッサーごとに、changelog
用に 1 つと repartition
用に 1 つの 2 つの内部トピックを作成します。
たとえば、前に示した基本的な例には、ストリームからの単語をカウントする 1 つのステートフルプロセッサーが含まれています。
words
.groupBy((key, word) -> word)
.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10)))
.count(Materialized.as("windowed-counts"))
.toStream()
.process(PostgresSink::new);
このアプリケーションでは、count
演算子に対して 2 つの内部トピックが必要です。
$ heroku kafka:topics:create aggregator-app-windowed-counts-changelog —app sushi
$ heroku kafka:topics:create aggregator-app-windowed-counts-repartition —app sushi
さらに、application.id
に一致するアプリケーションに対して 1 つのコンシューマーグループを作成する必要があります。
$ heroku kafka:consumer-groups:create mobile-1234.aggregator-app —app sushi
基本プランに関する詳細情報は、基本プランと設定のページで見つけることができます。
アプリケーションのスケーリング
並列処理モデル
パーティションは、Kafka トピックの並列処理の基本単位です。Kafka Streams アプリケーションには、多数のアプリケーションインスタンスが存在します。Kafka Streams アプリケーションは通常の Java アプリケーションであるため、Heroku ランタイムの dyno 内で実行されます。
Kafka Streams アプリケーションの各インスタンスには、いくつかのストリームスレッドが含まれています。これらのスレッドは、1 つ以上のストリームタスクを実行する役割を担っています。Kafka Streams では、ストリームタスクは並列処理の基本単位です。Kafka Streams では、すべてのイベントを消費および処理できるように、入力パーティションがストリームタスク間で均等に分散されることが透過的に保証されます。
垂直方向のスケーリング
デフォルトでは、Kafka Streams は、アプリケーションインスタンスあたり 1 つのストリームスレッドを作成します。各ストリームスレッドは 1 つ以上のストリームタスクを実行します。アプリケーションインスタンスは、そのストリームスレッドの数をスケーリングすることによってスケーリングできます。それを行うには、アプリケーションで num.stream.threads
設定値を変更します。アプリケーションは、各アプリケーションインスタンス内のスレッド間でワークロードを透過的に再分散します。
水平方向のスケーリング
Kafka Streams は、アプリケーションインスタンスの数が変更されると、それらのインスタンス間でワークロードとローカル状態を再分散します。これは、同じ application.id
を持つインスタンス間でワークロードとローカル状態を分散することによって透過的に機能します。Kafka Streams アプリケーションは、dyno の数をスケーリングすることによって水平方向にスケーリングできます。
$ heroku ps:scale aggregator_worker=2 —app sushi
入力パーティションの数は、実質的に並列処理の上限です。ストリームタスクの数が入力パーティションの数を超えないという点に注意することが重要です。そうしないと、このオーバープロビジョニングによりアプリケーションインスタンスがアイドル状態になります。
注意
RocksDB の永続性
dyno は一時的なファイルシステムによってバックアップされるため、耐久性のある保存を基礎となるディスクに依存することは現実的でありません。これは、Heroku で Kafka Streams と共に RocksDB を使用するための課題を提示しています。ただし、RocksDB はハード要件ではありません。Kafka Streams は RocksDB をライトスルーキャッシュとして扱います。ここで、信頼できるソースは実際に、基礎となる Changelog 内部トピックです。基礎となる RocksDB ストアが存在しない場合、状態は、起動時に Changelog トピックから直接再生されます。
デフォルトでは、状態を Changelog トピックから直接再生すると、アプリケーションインスタンスを再分散するか、または dyno が再起動される場合に追加のレイテンシーが発生します。レイテンシーを最小限に抑えるには、ストリームタスクをそれに関連付けられているスタンバイタスクにフェールオーバーするように Kafka Streams を設定できます。
スタンバイタスクは、状態の完全に複製されたコピーを保持する、ストリームタスクのレプリカです。dyno は、状態が Changelog トピックから再構築されるのを待たなくても、スタンバイタスクを使用してただちに作業を再開できます。
スタンバイタスクの数を変更するには、アプリケーションで num.standby.replicas
設定を変更できます。