Quartz および RabbitMQ を使用した Java でのカスタムクロックプロセスによる定期的なジョブ
最終更新日 2019年12月16日(月)
This article is a work in progress, or documents a feature that is not yet released to all users. This article is unlisted. Only those with the link can access it.
Java アプリケーションでバックグラウンドジョブをスケジュールするためのさまざまな方法があります。 この記事では、Quartz ライブラリと RabbitMQ を使用して、Heroku でバックグラウンドジョブをスケジュールするスケーラブルかつ信頼できる方法を作成する Java アプリケーションを設定する方法について説明します。
Java でのバックグラウンド処理のための一般的な方法では、Web 層と同じアプリケーション内でバックグラウンドジョブを実行することを提唱しています。 このアプローチでは、スケーラビリティと信頼性が制約されます。
もっと良いアプローチは、バックグラウンドジョブを別のプロセスに移動することによって、Web 層をバックグラウンド処理層から明確に分離することです。 これにより、Web 層を Web 要求の処理専用にすることができます。 また、ジョブをキューに入れるジョブのスケジュール機能も独自の層にします。 その後、ワーカー処理層を残りのアプリケーションから独立してスケーリングすることができます。
この記事のリファレンスアプリケーションのソースは GitHub から入手できます。
前提条件
- Heroku CLI がインストールされていること。
- Maven 3.0.4 がインストールされていること。
- Heroku ユーザーアカウント。 無料ですぐにサインアップできます。
サンプルプロジェクトをローカルコンピューターにクローンするには、次を実行します。
$ git clone https://github.com/heroku/devcenter-java-quartz-rabbitmq.git
Cloning into devcenter-java-quartz-rabbitmq...
$ cd devcenter-java-quartz-rabbitmq/
Quartz を使用したジョブのスケジューリング
ジョブを作成してキューに追加するために、カスタムクロックプロセスを使用します。 カスタムクロックプロセスを設定するために、Quartz ライブラリを使用します。 Maven では、依存関係を次のようにして宣言します。
Maven の完全なビルド定義については、リファレンスアプリの pom.xml を参照してください。
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.1.5</version>
</dependency>
これで、Java アプリケーションを使用してジョブをスケジュールできます。 次に例を示します。
package com.heroku.devcenter;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.SimpleScheduleBuilder.repeatSecondlyForever;
import static org.quartz.TriggerBuilder.newTrigger;
public class SchedulerMain {
final static Logger logger = LoggerFactory.getLogger(SchedulerMain.class);
public static void main(String[] args) throws Exception {
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
scheduler.start();
JobDetail jobDetail = newJob(HelloJob.class).build();
Trigger trigger = newTrigger()
.startNow()
.withSchedule(repeatSecondlyForever(2))
.build();
scheduler.scheduleJob(jobDetail, trigger);
}
public static class HelloJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
logger.info("HelloJob executed");
}
}
}
この単純な例では、メッセージをログに記録するだけの HelloJob
が 2 秒おきに作成されます。 Quartz には Trigger
スケジュールを作成するための非常に多彩な API があります。
このアプリケーションをローカルでテストするために、Maven ビルドを実行してから SchedulerMain
Java クラスを実行することができます。
$ mvn package
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building devcenter-java-quartz-rabbitmq 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
$ java -cp target/classes:target/dependency/* com.heroku.devcenter.SchedulerMain
...
66 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler 'DefaultQuartzSchedule initialized from default resource file in Quartz package: 'quartz.properties'
66 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler version: 2.1.5
66 [main] INFO org.quartz.core.QuartzScheduler - Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED started.
104 [DefaultQuartzScheduler_Worker-1] INFO com.heroku.devcenter.SchedulerMain - HelloJob executed
2084 [DefaultQuartzScheduler_Worker-2] INFO com.heroku.devcenter.SchedulerMain - HelloJob executed
Ctrl-C
を押してアプリを終了します。
HelloJob
が実際に機能する場合、ランタイムボトルネックが発生します。これは、重複するジョブがスケジュールされるのを防ぐ一方で、スケジューラーをスケーリングできないためです。 Quartz には、データベースを使用してジョブが重複するのを防ぐ JDBC モジュールがありますが、もっと簡単なアプローチは、スケジューラーのインスタンスを 1 つだけ実行し、ジョブワーカープロセスによって並列に処理できるメッセージキューに、スケジュールされたジョブを追加することです。
RabbitMQ によるジョブのキューイング
RabbitMQ はメッセージキューとして使用できるため、スケジューラープロセスはジョブをキューに追加するためだけに使用でき、ワーカープロセスはキューからジョブを取得してそれらを処理するために使用できます。 Maven で RabbitMQ クライアントライブラリを依存関係として追加するには、pom.xml
ファイルの依存関係ブロックに以下を指定します。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>2.8.2</version>
</dependency>
これをローカルでテストする場合は、RabbitMQ をインストールし、RabbitMQ サーバーへの接続情報をアプリケーションに提供する環境変数を設定します。
Windows の場合:
$ set CLOUDAMQP_URL="amqp://guest:guest@localhost:5672/%2f"
Mac/Linux の場合:
$ export CLOUDAMQP_URL="amqp://guest:guest@localhost:5672/%2f"
CLOUDAMQP_URL
環境変数は、共有メッセージキューに接続するためにスケジューラープロセスおよびワーカープロセスによって使用されます。 この例ではその環境変数を使用していますが、これは CloudAMQP Heroku アドオンがアプリケーションに接続情報を提供する方法がこのようなものであるためです。
SchedulerMain
クラスを更新して、HelloJob
が実行されるたび新しいメッセージをキューに追加するようにする必要があります。 サンプルプロジェクトの SchedulerMain.java ファイル の新しい SchedulerMain
クラスおよび HelloJob
クラスは以下のようになります。
package com.heroku.devcenter;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.SimpleScheduleBuilder.repeatSecondlyForever;
import static org.quartz.TriggerBuilder.newTrigger;
public class SchedulerMain {
final static Logger logger = LoggerFactory.getLogger(SchedulerMain.class);
final static ConnectionFactory factory = new ConnectionFactory();
public static void main(String[] args) throws Exception {
factory.setUri(System.getenv("CLOUDAMQP_URL"));
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
scheduler.start();
JobDetail jobDetail = newJob(HelloJob.class).build();
Trigger trigger = newTrigger()
.startNow()
.withSchedule(repeatSecondlyForever(5))
.build();
scheduler.scheduleJob(jobDetail, trigger);
}
public static class HelloJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = "work-queue-1";
Map<String, Object> params = new HashMap<String, Object>();
params.put("x-ha-policy", "all");
channel.queueDeclare(queueName, true, false, false, params);
String msg = "Sent at:" + System.currentTimeMillis();
byte[] body = msg.getBytes("UTF-8");
channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, body);
logger.info("Message Sent: " + msg);
connection.close();
}
catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
}
この例では、HelloJob
が実行されるたび、RabbitMQ メッセージキューにメッセージを追加します。メッセージには、String が作成された時刻を含む String が含まれています。 更新された SchedulerMain
を実行すると、5 秒ごとに新しいメッセージがキューに追加されます。
ジョブの処理
次に、キューからメッセージを取り込んで処理する Java アプリケーションを作成します。 このアプリケーションでも RabbitFactoryUtil
を使用して RabbitMQ への接続を CLOUDAMQP_URL
環境変数から取得します。 プロジェクト例の WorkerMain.java ファイル の WorkerMain
クラスは以下のようになります。
package com.heroku.devcenter;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class WorkerMain {
final static Logger logger = LoggerFactory.getLogger(WorkerMain.class);
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri(System.getenv("CLOUDAMQP_URL"));
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = "work-queue-1";
Map<String, Object> params = new HashMap<String, Object>();
params.put("x-ha-policy", "all");
channel.queueDeclare(queueName, true, false, false, params);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery != null) {
String msg = new String(delivery.getBody(), "UTF-8");
logger.info("Message Received: " + msg);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
}
このクラスは単純に、メッセージキューで新しいメッセージを待機し、受信したときにログに記録します。 この例は、ビルドを行って WorkerMain
クラスを実行することによってローカルで実行できます。
$ mvn package
$ java -cp target/classes:target/dependency/* com.heroku.devcenter.WorkerMain
また、この例の複数のインスタンスをローカルで実行して、ジョブの処理を水平方向に分散させる方法を確認することもできます。
Heroku 上での実行
すべてがローカルで動作するようになったら、これを Heroku 上で実行できます。 最初に、以下を含む Procfile
という名前の新しいファイルでプロセスモデルを宣言します。
scheduler: java $JAVA_OPTS -cp target/classes:target/dependency/* com.heroku.devcenter.SchedulerMain
worker: java $JAVA_OPTS -cp target/classes:target/dependency/* com.heroku.devcenter.WorkerMain
これにより、Heroku 上で実行できる 2 つのプロセスタイプが定義されます。1 つは SchedulerMain
アプリのための scheduler
という名前のもので、もう 1 つは WorkerMain
アプリのための worker
という名前のものです。
Heroku 上で実行するには、Maven ビルド記述子、ソースコード、および Procfile
を含む Git リポジトリを Heroku にプッシュする必要があります。 プロジェクト例を複製した場合、Git リポジトリはすでに存在します。 これらのファイルを含む新しい Git リポジトリを作成する必要がある場合は、次を実行します。
$ git init
$ git add src pom.xml Procfile
$ git commit -m init
プロジェクトのルートディレクトリから Heroku 上の新しいアプリケーションを作成します。
$ heroku create
Creating furious-cloud-2945... done, stack is cedar-14
http://furious-cloud-2945.herokuapp.com/ | git@heroku.com:furious-cloud-2945.git
Git remote heroku added
次に、CloudAMQP アドオンをアプリケーションに追加します。
$ heroku addons:create cloudamqp
Adding cloudamqp to furious-cloud-2945... done, v2 (free)
cloudamqp documentation available at: https://devcenter.heroku.com/articles/cloudamqp
ここで、Git リポジトリを Heroku にプッシュします。
$ git push heroku master
Counting objects: 165, done.
Delta compression using up to 2 threads.
...
-----> Heroku receiving push
-----> Java app detected
...
-----> Discovering process types
Procfile declares types -> scheduler, worker
-----> Compiled slug size is 1.4MB
-----> Launching... done, v5
http://furious-cloud-2945.herokuapp.com deployed to Heroku
これによってプロジェクトの Maven ビルドが Heroku 上で実行され、アプリケーションの実行可能アセットを含む slug ファイルが作成されます。 アプリケーションを実行するには、各プロセスタイプを実行するように dyno を割り当てる必要があります。 ジョブスケジュールの重複を防ぐために、scheduler
プロセスタイプを実行するために割り当てる dyno は 1 つだけとする必要があります。 worker
プロセスタイプには必要な数だけ dyno を割り当てることができます。これは、このプロセスタイプがイベント駆動型であり、RabbitMQ メッセージキューを通じて並列化可能であるためです。
scheduler
プロセスタイプに 1 つの dyno を割り当てるには、次を実行します。
$ heroku ps:scale scheduler=1
Scaling scheduler processes... done, now running 1
これにより、キューへの 5 秒ごとのメッセージの追加が開始されます。worker
プロセスタイプに 2 つの dyno を割り当てるには、次を実行します。
$ heroku ps:scale worker=2
Scaling worker processes... done, now running 2
これによって 2 つの dyno がプロビジョニングされ、それぞれ WorkerMain
アプリを実行して、処理するメッセージをキューから取得します。アプリケーションの Heroku ログを監視して、これが発生していることを確認できます。 ログのフィードを開くには、次を実行します。
$ heroku logs --tail
2012-06-26T22:26:47+00:00 app[scheduler.1]: 100223 [DefaultQuartzScheduler_Worker-1] INFO com.heroku.devcenter.SchedulerMain - Message Sent: Sent at:1340749607126
2012-06-26T22:26:47+00:00 app[worker.2]: 104798 [main] INFO com.heroku.devcenter.WorkerMain - Message Received: Sent at:1340749607126
2012-06-26T22:26:52+00:00 app[scheduler.1]: 105252 [DefaultQuartzScheduler_Worker-2] INFO com.heroku.devcenter.SchedulerMain - Message Sent: Sent at:1340749612155
2012-06-26T22:26:52+00:00 app[worker.1]: 109738 [main] INFO com.heroku.devcenter.WorkerMain - Message Received: Sent at:1340749612155
この実行例では、2 つの異なる worker dyno (worker.1
および worker.2
) によって処理される 2 つのメッセージがスケジューラーによって作成されます。 作業がスケジュールされて正しく分散されていることが示されています。
より深い学習
このアプリケーション例では、バックグラウンドジョブをスケジュールして処理するためのスケーラブルで信頼性の高いシステムを構築するための基本事項を示したに過ぎません。 詳細については、次を参照してください。