こんにちは、Algomatic ネオセールスカンパニーCTOのきくち (@_pochi)です。
Azure Functions便利ですよね。そして、Azure Functionsをゴリゴリ使っていると、「長時間実行したい」というニーズが出てくるのではないでしょうか。簡易的には、Functionのタイムアウト設定を伸ばすことである程度対応できるものの、APIサーバーとしてはタイムアウトは短く保ちたいし、そもそも「タイムアウト伸ばしてどうこうする」のは本質的な解決にならないので、避けたいですよね。ではどうするべきでしょうか。
「長い処理」を任せる選択肢
まず、Azure環境で「何らかの処理を長時間実行する」ための選択肢をいくつか挙げてみました。このように選択肢は複数ありますが、今回はDurable Functionsを取り上げます。
他の手段では、Azure Functions以外の異なるAzureリソースの構築が必要で複雑になりがちですが、Durable FunctionsはAzure Functionsのコードを追加実装するだけで長時間実行が実現可能なのです。*1
サービス | 特徴 | 典型ユース |
---|---|---|
Durable Functions | Functions 拡張。状態管理とチェックポイントを自動化 | チェーン/並列/待ち合わせ/人手承認 |
Container App Jobs | コンテナを“ジョブ”として 1 回実行し、終わったら終了 | データ加工・ML バッチ |
Azure Batch | VM プールを自動スケールし HPC/大量並列ジョブを実行 | CFD・メディアレンダリング |
Logic Apps | ノーコードのワークフロー。最長 90 日間実行 | SaaS 連携・ETL |
Durable Functions とは何か
Durable Functions は Azure Functions の拡張機能であり、ステートフル ワークフローを実装できる機能です。まず単なる「タイムアウト無制限のFunction」ではない点に注意が必要です。
Durable Functionsでは、有限時間実行可能な Activity と、そんなActivityを呼び出すOrchestrator を組み合わせることで長時間のタスク実行が可能になります。
実現例
よくあるユースケースとして「HTTPトリガーされたFunctionから、長時間かかる処理を実行する」例を考えると、以下のような概念図になります。*2
- HTTP Trigger Function: 起点となるトリガー。Orchestratorを呼び出し*3、すぐに終了する
- Orchestrator: Activityの呼び出しシーケンスを定義する
- Activity: 実際に行う処理を実装する(タイムアウトあり)
Activity自体には時間制限はあるものの、OrchestratorからのActivity呼び出しに回数や時間の制限はありません。したがって、「長時間実行したい処理を、タイムアウト時間内に完了するよう分割した上でOrchestratorから実行する」ことで、実質的に長時間タスクを実行できます。
ここから具体例を用いて説明します。(コード例はNode.js環境です)
Durable Functionsの動かし方
すでにAzure Functionsは構築済である前提で解説します。「1. コードを実装」して、「2. Storage Accountを設定する」だけでDurable Functionは利用可能です。
1. コードを実装
既存のFunctionsに、以下の3ファイルを追加します。
- simple-long-task.ts: Orchestratorを呼び出すだけのHTTPトリガのFunction
- orchestrator.ts: 「どのようにActivityを呼び出すか」を定義するOrchestratorの実装と登録処理
- activity.ts: 実際に行いたい処理を記述するActivityの実装と登録処理
simple-long-task.ts
import { type HttpRequest, type HttpResponseInit, type InvocationContext, app, } from "@azure/functions"; import * as df from "durable-functions"; import { orchestratorName } from "./orchestrator.js"; export async function simpleLongTask( request: HttpRequest, context: InvocationContext, ): Promise<HttpResponseInit> { const durableClient = df.getClient(context); // 引数は型安全ではないので注意 await durableClient.startNew(orchestratorName, { input: { count: 5, }, }); return { status: 200, body: "OK", }; } app.http("simple-long-task", { methods: ["POST"], authLevel: "anonymous", extraInputs: [df.input.durableClient()], handler: simpleLongTask, });
HTTPトリガーで呼び出されるFunctionであり、後述のOrchestratorを呼び出すだけです。1点 だけ、app.http
の呼び出しにおいて extraInputs
の指定が必須である点に注意してください。
orchestrator.ts
import * as df from "durable-functions"; import type { OrchestrationContext, OrchestrationHandler, } from "durable-functions"; import { activityName } from "./activity.js"; const orchestrator: OrchestrationHandler = function* ( context: OrchestrationContext, ) { context.log("Orchestrator started"); const args = context.df.getInput<OrchestratorArgs>(); for (let i = 0; i < args.count; i++) { context.log(`Calling activity with times: ${i}`); yield context.df.callActivity(activityName, { times: i, }); context.log(`Activity called with times: ${i}`); } context.log("Orchestrator completed"); }; // Orchestratorの引数の型 export type OrchestratorArgs = { count: number; }; // オーケストレーター名 export const orchestratorName = "long-task-orchestrator"; // Orchestratorを登録 df.app.orchestration(orchestratorName, orchestrator);
Orchestratorを定義するファイルです。Orchestratorの実処理を定義する OrchestrationHandler
の実装と、 df.app.orchestration
の呼び出しによるOrchestratorの登録を行なっています。Orchestratorの登録を行うことで、上述の simple-long-task.ts
内の startNew
によってこのOrchestratorを実行することができるようになります。
activity.ts
import type { ActivityHandler } from "durable-functions"; import type { InvocationContext } from "@azure/functions"; import * as df from "durable-functions"; const longTaskActivity: ActivityHandler = async ( args: ActivityArgs, context: InvocationContext, ) => { // 長いタスクをここに実装する context.log(`Running long task (${args.times})`); await new Promise((resolve) => setTimeout(resolve, 1000 * args.times)); context.log(`Completed long task (${args.times})`); }; // Activity名 export const activityName = "long-task-activity"; // Activityを登録 df.app.activity(activityName, { handler: longTaskActivity }); // Activityの引数の型 export type ActivityArgs = { times: number; };
Activityを定義するファイルです。Activityの実処理を定義する ActivityHandler
の実装と、 df.app.activity
の呼び出しによるActivityの登録を行なっています。
2. Storage Accountを接続
Durable Functionsを利用するには、Orchestratorがステートを保存するための、Storage Accountへの接続が必要です。環境変数 AzureWebJobsStorage
に、任意のStorage Accountの接続文字列を登録しましょう。
以上でDurable Functionsの実装は完了です。事前にStorageAccountの設定だけしてしまえば、あとは Functions にOrchestrator
と Activity
を足すだけです。追加のAzure リソースを構築したりパイプラインを定義する必要はありません。便利ですよね!
ローカルで試しに動かす場合は、 azurite
を利用してください。
実装における注意点
Activityにはタイムアウトが存在する
Activityに実処理を実装するわけですが、Activityにもタイムアウトが存在します。(Functions のタイムアウト時間準拠) したがってActivityには、タイムアウト時間を考慮し、適切なサイズに処理を分割する必要がある点に注意が必要です。
Orchestratorが「決定論的な処理」である必要がある
Durable Functions最大の注意点として、Orchestratorが「決定論的な処理」である必要があります。同一インプットを与えたら、必ず同一の値を返す処理でなければなりません。I/Oや乱数利用は行えません。これは、Orchestratorの仕組みを理解すれば自然な制約ですので、以下に解説します。
const orchestrator: OrchestrationHandler = function* ( context: OrchestrationContext, ) { context.log("Orchestrator started"); const args = context.df.getInput<OrchestratorArgs>(); for (let i = 0; i < args.count; i++) { context.log(`Calling activity with times: ${i}`); yield context.df.callActivity(activityName, { times: i, }); context.log(`Activity called with times: ${i}`); } context.log("Orchestrator completed"); };
上記コードは、先に紹介した実装例のOrchestratorです。httpTriggerによってこのOrchestratorが起動されると、
Orchestrator started
のログを出力Calling activity with times 0
を出力callActivity
が呼ばれ、その内部でActivityを起動- ここでOrchestratorの処理は中断
1回目のOrchestratorの実行ログは以下のようになります。途中で中断していることがわかりますね。
# 1回目のOrchestratorの実行ログ Orchestrator started Calling activity with times: 0
その後、先に呼び出したActivityの完了イベントの通知を受けて再度Orchestratorが起動し、はじめから処理が再実行されます。このとき実行される処理は
Orchestrator started
のログを出力Calling activity with times 0
のログを出力callActivity
が呼ばれるが、呼び出し対象のActiityは既に完了しているため処理は中断せず先に進むActivity called with times: 0
のログを出力- ループ始点に戻る
Activity called with times: 1
のログ出力callActivity
が呼ばれ、その内部でActivityを起動- ここで再びOrchestratorの処理が中断
となります。2回目の実行ログは以下のようになります。
# 2回目のOrchestratorの実行ログ Orchestrator started Calling activity with times: 0 Activity called with times: 0 Calling activity with times: 1
もうおわかりでしょうか。以降はこれまでと同様に、Orchestratorの引数で渡された回数分、Activityの呼び出し→中断→Orchestratorがはじめから実行→…と繰り返します。
5回目位のActivity完了後のOrchestratorは以下のログを出力し、終了します。
# 5回目のOrchestratorの実行ログ Orchestrator started Calling activity with times: 0 Activity called with times: 0 Calling activity with times: 1 Activity called with times: 1 Calling activity with times: 2 Activity called with times: 2 Calling activity with times: 3 Calling activity with times: 4 Activity called with times: 4 Orchestrator completed
このように、Orchestrator関数は何度も何度も繰り返し実行されるため、決定論的な処理である必要があるのです。(なお、Orchestrator内部での利用を想定したHTTP呼び出し機構などは存在しますが、注意点をよく理解して利用する必要があります。)
Durable Functionsのエラーハンドリング
Activity 失敗
Activityの呼び出し時、callActivity
ではなく callActivityWithRetry
を利用することで失敗時のリトライが可能です。リトライ回数やバックオフなども指定可能です。
Orchestrator 失敗
失敗したOrchestratorを、外部から再実行する手段として、 rewind
があります。プレビュー状態ではありますが、エラーを観測して rewind
する仕組みを構築することで、より堅牢な仕組みを構築できます。(なお、これはまだ使ったことがないので使用感などはわかりません)
Durable Functionsのコスト
Durable Functions 自体に追加料金はありません。実行・リソース課金モデルは Azure Functions と同じです。 メッセージ永続化に使う Storage トランザクション料 は別途発生しますが、そこまで気にする量ではなさそうです。
まとめ
「ちょっと時間がかかるAPI」をAzure Functionsでサクッと作りたい…そんなときは、まず Durable Functions を選択肢に入れてみてください。タイムアウトの壁を越えつつ、開発フローもコスト感もそのままにスケールできます。どうですか、だんだんAzureが好きになってきませんか。
積極採用中です!
ここまで読んでいただきありがとうございました。「面白そうだな〜」と思っていただけたエンジニアの方、「こいつは何もわかっちゃいない、俺がもっといい感じにしてやる」と感じたアーキテクトの方などなど、ぜひお話ししましょう!一緒にインフラを支えてくれる方も大募集中です!
*1:Storage Accountの設定だけは必要です
*2:厳密には、Orchestrator が Activity を「直接呼ぶ」わけではない点に注意が必要です。 Orchestrator は Task Hub(Azure Storage 上のキュー・テーブル・Blob で構成)に「Activity を実行してほしい」というメッセージを置き、Activity Function 側がそのメッセージをポーリングして処理します。結果も Task Hub に書き戻され、Orchestrator はそれを受け取って再開します。Microsoft Learn,Microsoft Learn
*3:HTTP Trigger FunctionsからのOrchestrator起動についても「直接呼ぶ」わけではなく、Task Hub経由になります。