Heroku での Celery の使用
最終更新日 2024年04月26日(金)
Heroku での Celery の使用
Celery は、アプリケーションで非同期タスクを実行するためのフレームワークです。Celery は Python で記述されており、これを利用すると、Web アプリの同期リクエストのライフサイクルからタスクワーカーのプールに処理をオフロードして非同期でジョブを実行することが容易になります。
Celery は Heroku で完全にサポートされており、Heroku のアドオンプロバイダーのいずれかを使用するだけでメッセージブローカーと結果ストアを実装できます。
アーキテクチャ
Celery をデプロイするには、1 つ以上の ワーカープロセスを実行します。これらのプロセスはメッセージブローカーに接続して、ジョブリクエストをリッスンします。メッセージブローカーは、リッスンしているすべてのワーカーにジョブリクエストを無作為に振り分けます。ジョブリクエストを開始するには、Celery ライブラリのコードを呼び出します。ジョブへの引数のマーシャリングや、ブローカーへのジョブリクエストの発行はこのコードによって処理されます。
ブローカーに接続するワーカープロセスの実行数を増やすだけで、ワーカープールを柔軟にスケーリングできます。各ワーカーは、他のすべてのワーカーと並列してジョブを実行できます。
ブローカーの選択
Heroku では、パートナー企業が提供するアドオンを通じて、優れた Celery ブローカーが 数多くサポートされています。一般的には、Celery 内で最もサポートされているブローカーエンジンには、Redis と RabbitMQ があります。Amazon SQS、IronMQ、MongoDB、CouchDB など、他のブローカーエンジンもサポートされていますが、これらのブローカーを使用すると一部の機能が欠落する場合があります。Celery によってブローカー実装の詳細が抽象化されるため、ブローカーの変更は比較的容易です。そのため、後になって、別のブローカーの方がニーズに適していると判断した場合は、自由に切り替えてください。
ブローカーを選択したら、Heroku アプリを作成してアドオンをアプリにアタッチします。以下の例では Heroku Data for Redis を Redis プロバイダーとして使用しますが、他にも多くの Redis プロバイダーが Heroku Elements Marketplace に用意されています。
$ heroku apps:create
$ heroku addons:create heroku-redis -a sushi
RabbitMQ と通信するためのライブラリは Celery に付属しますが、他のブローカーを使用する場合はそのブローカー用のライブラリをインストールする必要があります。たとえば、Redis を使用する場合は次のようにします。
$ pip install redis
結果ストアの選択
Celery でタスクの結果を保存可能にすることが必要な場合、結果ストアを選択する必要があります。不要な場合は、次のセクションに進んでください。優れたメッセージブローカーの特性は、優れた結果ストアの特性と必ずしも一致しません。たとえば、RabbitMQ はサポートされている最良のメッセージブローカーですが、一度求められた後は結果を破棄するため、結果ストアとしては決して使用しないでください。Redis と Memcache はどちらも結果ストアの有力な候補です。
メッセージブローカーと同じ結果ストアを選択する場合、2 つのアドオンをアタッチする必要はありません。そうでない場合は、結果ストアのアドオンがアタッチされていることを確認してください。
Celery アプリの作成
まず、Celery 自体がインストールされていることを確認します。
$ pip install celery
次に、Celery アプリ用の Python モジュールを作成します。小規模な Celery アプリの場合、次のようにして、tasks.py
という名前のモジュールにすべてをまとめるのが一般的です。
import celery
app = celery.Celery('example')
タスクの定義
Celery アプリを用意したら、アプリで実行できることをアプリに指示する必要があります。Celery では、コードの基本単位はタスクです。これは単なる Python 関数であり、非同期で呼び出せるように Celery に登録します。Celery には、タスクを定義するさまざまなデコレーターと、それらのタスクを呼び出すためのいくつかのメソッド呼び出しがあります。単純なタスクを tasks.py
モジュールに追加します。
@app.task
def add(x, y):
return x + y
最初の Celery タスクが作成されました。ただし、このタスクをワーカーで実行する前に、少しの設定を行う必要があります。
Celery アプリの設定
Celery には多くの設定オプションがありますが、正常稼働のために必要なものは数個だけです。
BROKER_URL
: メッセージブローカーに接続する方法を Celery に伝える URL。 (これは通常、ブローカーとして選択されたアドオンによって提供されます。)CELERY_RESULT_BACKEND
: BROKER_URL
と同じ形式の URL で、 結果ストアに接続する方法を Celery に伝えます。(結果を保存しない場合は、この設定を無視してください。)
Celery アプリに渡すことができる環境設定が、Heroku アドオンによってアプリケーションに提供されます。次に例を示します。
import os
app.conf.update(BROKER_URL=os.environ['REDIS_URL'],
CELERY_RESULT_BACKEND=os.environ['REDIS_URL'])
Celery アプリで定義するすべてのタスクに対して、選択したブローカーと結果ストアを Celery アプリが使用するようになりました。
ローカルでの実行
Celery ワーカーをローカルで実行する前に、メッセージブローカーと結果ストア用に選択したアプリケーションをインストールする必要があります。インストールしたら、どちらも正常に稼働していることを確認します。次に、Heroku Local がワーカープロセスを起動するために使用できる Procfile を作成します。Procfile は次のようになります。
worker: celery worker --app=tasks.app
次に、.env
という名前のファイルを追加して、設定する環境変数を Heroku Local に指示します。
たとえば、Redis をそのデフォルト設定でメッセージブローカーと結果ストアの両方に使用する場合は、アドオンによって提供されるものと同じ名前で環境変数を追加するだけです。
REDIS_URL=redis://
ワーカープロセスを開始するには、次のようにして Heroku Local を実行します。
$ heroku local
次に、Python シェルで次を実行します。
import tasks
tasks.add.delay(1, 2)
delay
は、現在のプロセスではなく非同期でタスクを実行することを
Celery に指示します。
タスクを受け取って実行したという内容のワーカープロセスログが表示されます。
Heroku へのデプロイ
前述の Procfile
が作成済みで、メッセージブローカーと結果ストアの適切なアドオンがアタッチ済みの場合、残りの作業は、アプリをプッシュしてスケーリングすることだけです。
$ git push heroku master
$ heroku ps:scale worker=1
もちろん、いつでも任意の数の Worker dyno にスケーリングできます。ここで、ローカルで実行したときと同じようにタスクを実行します。
$ heroku run python
>>> import tasks
>>> tasks.add.delay(1, 2)
実行中のタスクがアプリケーションのログに出力されます。
$ heroku logs -t -d worker
Celery と Django
Celery のドキュメントにある概要に従って、
Celery を Django と統合します。概念は同じですが、Django の再利用可能なアプリのアーキテクチャは、Django アプリごとのタスクのモジュールに適しています。INSTALLED_APPS
設定は、Celery のタスク自動検出機能と組み合わせて利用できます。
from django.conf import settings
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
その後、Celery は各 Django アプリの tasks.py
ファイルを調べて、見つかったすべてのタスクを Celery のレジストリに追加します。
Celery のベストプラクティス
非同期処理の管理
Celery タスクは非同期で実行されます。具体的には、タスクの実行を求めるメッセージリクエストがブローカーに送信された直後に、呼び出し側プロセスの Celery 関数呼び出しが戻ります。タスクから結果を再取得する方法は 2 つあります。1 つは、タスクの結果をデータベースなどの持続的ストレージに書き込む方法です。
Celery タスクから結果を再取得するもう 1 つの方法は、結果ストアの使用です。 Celery 結果ストアは任意です。多くの場合、タスクを発行して非同期で実行し、その結果をデータベースに保存するだけで十分です。その場合は、Celery によって保存された結果を無視しても安全です。
シリアライザーの選択
Celery タスクに渡されるすべての情報と、結果として発生するすべての情報をシリアル化および逆シリアル化する必要があります。Celery タスクを設計するときは、シリアル化に伴う一連の問題を考慮する必要があります。
デフォルトでは、Celery は pickle を使用してメッセージをシリアル化します。pickle には、初期設定のままでも “それなりに機能する” という利点がありますが、将来的には多くの問題を引き起こす可能性があります。コードに変更があると、処理中で、古い形式のままのシリアル化オブジェクトの逆シリアル化時に問題が発生する可能性があります。
JSON を使用するように Celery を変更すると、開発者によるタスク引数の取り扱いが必然的に適切になるだけでなく、pickle の無作為なコード実行がもたらす可能性があるセキュリティリスクも低減されます。JSON をデフォルトのタスクシリアライザーとして使用するには、次のように環境変数を設定します。
CELERY_TASK_SERIALIZER=json
小規模で短期間のタスク
Celery ワーカープールでは、複数のワーカーが不定の数のタスクを並列処理します。このため、マルチスレッドアプリケーションに似たタスク設計を検討することは理にかなっています。可能な限り効率的に処理を分散できるよう、各タスクではできるだけ少量で有用な処理を行うようにします。
タスクをシリアル化してネットワーク経由でメッセージブローカーに送信し、そのタスクをネットワーク経由でワーカーに送り返して逆シリアル化する一連の処理に伴うオーバーヘッドは、スレッド間のプロセス内キューでメッセージを送信する場合よりもはるかに大きいため、当然ながら “有用” の基準も上がります。単一行をデータベースに書き込む以外に何もしないタスクを発行することは、おそらくリソースの最適な使い方ではありません。ただし、複数ではなく 1 つの API 呼び出しを同じタスクで実行すれば、大きな差が生まれる可能性があります。
短期間のタスクはデプロイと再起動を容易にします。dyno を停止すると、Heroku はプロセスに停止を求めた後、短い時間でプロセスを強制終了します。これは、再起動、スケールダウン、またはデプロイの結果として発生する可能性があります。その結果、すぐに終了するタスクは、長く実行されるタスクほど頻繁には失敗しません。
タスクのタイムアウトの設定
何百回、何千回とタスクを実行するうちに、ネットワークアクティビティが原因でタスクがスタックし、キューが後続のタスクを処理できなくなることがあります。これは、すべてのタスクにハードタイムアウトとソフトタイムアウトを設定することで容易に回避できます。
タイムアウトについての詳細は、Celery のドキュメントを参照してください。
べき等のタスク
さまざまな理由により、Celery のタスクが失敗するか中断される可能性があります。タスクを設計するときは、分散システムで問題になる可能性があるすべての要因を予測しようとするのではなく、べき等性を追求する必要があります。タスクの開始時にシステムが現在の状態であるという想定は禁物です。変更する外部状態をできるだけ少なくしてください。
これは科学というよりは技巧ですが、悪影響を伴わずに再実行できるタスクが多いほど、分散システムはより簡単に自己修復できます。たとえば、予期しないエラーが発生したとき、べき等のタスクであれば Celery にタスクの再試行を指示するだけで解決できます。エラーが一時的だった場合、べき等性により、システムはユーザーの介入なしに自己回復できます。
Using REMAP_SIGTERM
Heroku で実行中のプロセスが終了されると、プロセスがシャットダウンされる 10 秒前に SIGTERM
シグナルがプロセスに送信されます。バージョン 4 以降の Celery には Heroku 専用の特別な機能が組み込まれており、初期設定の状態でこの機能をサポートします。
$ REMAP_SIGTERM=SIGQUIT celery -A proj worker -l info
acks_late の使用
Celery ワーカーは、メッセージブローカーからタスクを受け取ると、受信確認を送り返します。ブローカーは通常、タスクをキューから削除することで受信確認に応答します。ただし、ワーカーがタスクの途中で異常終了し、すでにタスクの受信を確認していた場合、タスクが再び実行されない可能性があります。Celery はソフトシャットダウン時に未完了のタスクをブローカーに送り返すことでこのリスクを軽減しようとしますが、ネットワーク障害、ハードウェア全体の障害、またはその他のシナリオが原因でそれが不可能な場合があります。
タスクが完了 (成功または失敗) するまでタスクの受信確認を待つように Celery を設定できます。この機能は、不定期のタスクを失うことが許容できない場合に大いに役立ちます。ただし、タスクがべき等 (以前の試行が途中まで進んだ可能性がある) かつ短期間 (ブローカーは通常、タスクを再びキューに入れる前に一定時間タスクを “予約” する) であることが条件となります。acks_late はタスク単位または Celery アプリ全体で有効化できます。
カスタムタスククラス
タスクは関数のように見えるかもしれませんが、Celery のタスクデコレーターは実際には、__call__
を実装するクラスを返しています(これが、タスクは self
にバインド可能で、delay
や apply_async
などの他のメソッドを使用できる理由です)。デコレーターは優れたショートカットですが、タスクのグループが、純粋に機能的に表現することが簡単ではない共通の懸念事項を共有する場合があります。
celery.Task
の抽象サブクラスを作成すると、他のタスクに必要な一連のツールや動作を継承を使って構築することが可能になります。タスクサブクラスの一般的な用途は、レート制限と再試行動作、セットアップまたはティアダウン処理、あるいは、アプリの一部のタスクのみに共有される環境設定の共通グループです。
キャンバスプリミティブ
Celery のドキュメントには、より大きなワークフローにタスクを結合するために使用できる一連のプリミティブの概要があります。
これらのプリミティブをよく理解してください。これらは、前述した設計原則を犠牲にすることなく、重要で複雑な処理を行う方法を提供します。特に役立つのは chord
です。このプリミティブでは、タスクのグループが並列実行され、完了するとその結果が別のタスクに渡されます。
これらには結果ストアが必要です。