Algomatic Tech Blog

Algomaticの開発チームによる Tech Blog です

Azure Functionsで長時間かかるタスクを実行したいあなたへ 〜Durable Functions入門〜

こんにちは、Algomatic ネオセールスカンパニーCTOのきくち (@_pochi)です。

Azure Functions便利ですよね。そして、Azure Functionsをゴリゴリ使っていると、「長時間実行したい」というニーズが出てくるのではないでしょうか。簡易的には、Functionのタイムアウト設定を伸ばすことである程度対応できるものの、APIサーバーとしてはタイムアウトは短く保ちたいし、そもそも「タイムアウト伸ばしてどうこうする」のは本質的な解決にならないので、避けたいですよね。ではどうするべきでしょうか。

「長い処理」を任せる選択肢

まず、Azure環境で「何らかの処理を長時間実行する」ための選択肢をいくつか挙げてみました。このように選択肢は複数ありますが、今回はDurable Functionsを取り上げます。

他の手段では、Azure Functions以外の異なるAzureリソースの構築が必要で複雑になりがちですが、Durable FunctionsはAzure Functionsのコードを追加実装するだけで長時間実行が実現可能なのです。*1

サービス 特徴 典型ユース
Durable Functions Functions 拡張。状態管理とチェックポイントを自動化 チェーン/並列/待ち合わせ/人手承認
Container App Jobs コンテナを“ジョブ”として 1 回実行し、終わったら終了 データ加工・ML バッチ
Azure Batch VM プールを自動スケールし HPC/大量並列ジョブを実行 CFD・メディアレンダリング
Logic Apps ノーコードのワークフロー。最長 90 日間実行 SaaS 連携・ETL

Durable Functions とは何か

Durable Functions は Azure Functions の拡張機能であり、ステートフル ワークフローを実装できる機能です。まず単なる「タイムアウト無制限のFunction」ではない点に注意が必要です。

Durable Functionsでは、有限時間実行可能な Activity と、そんなActivityを呼び出すOrchestrator を組み合わせることで長時間のタスク実行が可能になります。

実現例

よくあるユースケースとして「HTTPトリガーされたFunctionから、長時間かかる処理を実行する」例を考えると、以下のような概念図になります。*2

構成例

  • HTTP Trigger Function: 起点となるトリガー。Orchestratorを呼び出し*3、すぐに終了する
  • Orchestrator: Activityの呼び出しシーケンスを定義する
  • Activity: 実際に行う処理を実装する(タイムアウトあり)

Activity自体には時間制限はあるものの、OrchestratorからのActivity呼び出しに回数や時間の制限はありません。したがって、「長時間実行したい処理を、タイムアウト時間内に完了するよう分割した上でOrchestratorから実行する」ことで、実質的に長時間タスクを実行できます。

ここから具体例を用いて説明します。(コード例はNode.js環境です)

Durable Functionsの動かし方

すでにAzure Functionsは構築済である前提で解説します。「1. コードを実装」して、「2. Storage Accountを設定する」だけでDurable Functionは利用可能です。

1. コードを実装

既存のFunctionsに、以下の3ファイルを追加します。

  1. simple-long-task.ts: Orchestratorを呼び出すだけのHTTPトリガのFunction
  2. orchestrator.ts: 「どのようにActivityを呼び出すか」を定義するOrchestratorの実装と登録処理
  3. activity.ts: 実際に行いたい処理を記述するActivityの実装と登録処理

simple-long-task.ts

import {
  type HttpRequest,
  type HttpResponseInit,
  type InvocationContext,
  app,
} from "@azure/functions";
import * as df from "durable-functions";
import { orchestratorName } from "./orchestrator.js";

export async function simpleLongTask(
  request: HttpRequest,
  context: InvocationContext,
): Promise<HttpResponseInit> {
  const durableClient = df.getClient(context);
  // 引数は型安全ではないので注意
  await durableClient.startNew(orchestratorName, {
    input: {
      count: 5,
    },
  });
  return {
    status: 200,
    body: "OK",
  };
}

app.http("simple-long-task", {
  methods: ["POST"],
  authLevel: "anonymous",
  extraInputs: [df.input.durableClient()],
  handler: simpleLongTask,
});

HTTPトリガーで呼び出されるFunctionであり、後述のOrchestratorを呼び出すだけです。1点 だけ、app.http の呼び出しにおいて extraInputs の指定が必須である点に注意してください。

orchestrator.ts

import * as df from "durable-functions";
import type {
  OrchestrationContext,
  OrchestrationHandler,
} from "durable-functions";
import { activityName } from "./activity.js";

const orchestrator: OrchestrationHandler = function* (
  context: OrchestrationContext,
) {
  context.log("Orchestrator started");
  const args = context.df.getInput<OrchestratorArgs>();
  for (let i = 0; i < args.count; i++) {
    context.log(`Calling activity with times: ${i}`);
    yield context.df.callActivity(activityName, {
      times: i,
    });
    context.log(`Activity called with times: ${i}`);
  }
  context.log("Orchestrator completed");
};

// Orchestratorの引数の型
export type OrchestratorArgs = {
  count: number;
};
// オーケストレーター名
export const orchestratorName = "long-task-orchestrator";
// Orchestratorを登録
df.app.orchestration(orchestratorName, orchestrator);

Orchestratorを定義するファイルです。Orchestratorの実処理を定義する OrchestrationHandler の実装と、 df.app.orchestration の呼び出しによるOrchestratorの登録を行なっています。Orchestratorの登録を行うことで、上述の simple-long-task.ts 内の startNew によってこのOrchestratorを実行することができるようになります。

activity.ts

import type { ActivityHandler } from "durable-functions";
import type { InvocationContext } from "@azure/functions";
import * as df from "durable-functions";

const longTaskActivity: ActivityHandler = async (
  args: ActivityArgs,
  context: InvocationContext,
) => {
  // 長いタスクをここに実装する
  context.log(`Running long task (${args.times})`);
  await new Promise((resolve) => setTimeout(resolve, 1000 * args.times));
  context.log(`Completed long task (${args.times})`);
};

// Activity名
export const activityName = "long-task-activity";

// Activityを登録
df.app.activity(activityName, { handler: longTaskActivity });

// Activityの引数の型
export type ActivityArgs = {
  times: number;
};

Activityを定義するファイルです。Activityの実処理を定義する ActivityHandler の実装と、 df.app.activity の呼び出しによるActivityの登録を行なっています。

2. Storage Accountを接続

Durable Functionsを利用するには、Orchestratorがステートを保存するための、Storage Accountへの接続が必要です。環境変数 AzureWebJobsStorage に、任意のStorage Accountの接続文字列を登録しましょう。

以上でDurable Functionsの実装は完了です。事前にStorageAccountの設定だけしてしまえば、あとは Functions にOrchestratorActivity を足すだけです。追加のAzure リソースを構築したりパイプラインを定義する必要はありません。便利ですよね!

ローカルで試しに動かす場合は、 azurite を利用してください。

learn.microsoft.com

実装における注意点

Activityにはタイムアウトが存在する

Activityに実処理を実装するわけですが、Activityにもタイムアウトが存在します。(Functions のタイムアウト時間準拠) したがってActivityには、タイムアウト時間を考慮し、適切なサイズに処理を分割する必要がある点に注意が必要です。

Orchestratorが「決定論的な処理」である必要がある

Durable Functions最大の注意点として、Orchestratorが「決定論的な処理」である必要があります。同一インプットを与えたら、必ず同一の値を返す処理でなければなりません。I/Oや乱数利用は行えません。これは、Orchestratorの仕組みを理解すれば自然な制約ですので、以下に解説します。

const orchestrator: OrchestrationHandler = function* (
  context: OrchestrationContext,
) {
  context.log("Orchestrator started");
  const args = context.df.getInput<OrchestratorArgs>();
  for (let i = 0; i < args.count; i++) {
    context.log(`Calling activity with times: ${i}`);
    yield context.df.callActivity(activityName, {
      times: i,
    });
    context.log(`Activity called with times: ${i}`);
  }
  context.log("Orchestrator completed");
};

上記コードは、先に紹介した実装例のOrchestratorです。httpTriggerによってこのOrchestratorが起動されると、

  1. Orchestrator started のログを出力
  2. Calling activity with times 0 を出力
  3. callActivity が呼ばれ、その内部でActivityを起動
  4. ここでOrchestratorの処理は中断

1回目のOrchestratorの実行ログは以下のようになります。途中で中断していることがわかりますね。

# 1回目のOrchestratorの実行ログ
Orchestrator started
Calling activity with times: 0

その後、先に呼び出したActivityの完了イベントの通知を受けて再度Orchestratorが起動し、はじめから処理が再実行されます。このとき実行される処理は

  1. Orchestrator startedのログを出力
  2. Calling activity with times 0のログを出力
  3. callActivity が呼ばれるが、呼び出し対象のActiityは既に完了しているため処理は中断せず先に進む
  4. Activity called with times: 0 のログを出力
  5. ループ始点に戻る
  6. Activity called with times: 1のログ出力
  7. callActivity が呼ばれ、その内部でActivityを起動
  8. ここで再びOrchestratorの処理が中断

となります。2回目の実行ログは以下のようになります。

# 2回目のOrchestratorの実行ログ
Orchestrator started
Calling activity with times: 0
Activity called with times: 0
Calling activity with times: 1

もうおわかりでしょうか。以降はこれまでと同様に、Orchestratorの引数で渡された回数分、Activityの呼び出し→中断→Orchestratorがはじめから実行→…と繰り返します。

5回目位のActivity完了後のOrchestratorは以下のログを出力し、終了します。

# 5回目のOrchestratorの実行ログ
Orchestrator started
Calling activity with times: 0
Activity called with times: 0
Calling activity with times: 1
Activity called with times: 1
Calling activity with times: 2
Activity called with times: 2
Calling activity with times: 3
Calling activity with times: 4
Activity called with times: 4
Orchestrator completed

このように、Orchestrator関数は何度も何度も繰り返し実行されるため、決定論的な処理である必要があるのです。(なお、Orchestrator内部での利用を想定したHTTP呼び出し機構などは存在しますが、注意点をよく理解して利用する必要があります。)

Durable Functionsのエラーハンドリング

Activity 失敗

Activityの呼び出し時、callActivity ではなく callActivityWithRetry を利用することで失敗時のリトライが可能です。リトライ回数やバックオフなども指定可能です。

Orchestrator 失敗

失敗したOrchestratorを、外部から再実行する手段として、 rewind があります。プレビュー状態ではありますが、エラーを観測して rewind する仕組みを構築することで、より堅牢な仕組みを構築できます。(なお、これはまだ使ったことがないので使用感などはわかりません)

learn.microsoft.com

Durable Functionsのコスト

Durable Functions 自体に追加料金はありません。実行・リソース課金モデルは Azure Functions と同じです。 メッセージ永続化に使う Storage トランザクション料 は別途発生しますが、そこまで気にする量ではなさそうです。

まとめ

「ちょっと時間がかかるAPI」をAzure Functionsでサクッと作りたい…そんなときは、まず Durable Functions を選択肢に入れてみてください。タイムアウトの壁を越えつつ、開発フローもコスト感もそのままにスケールできます。どうですか、だんだんAzureが好きになってきませんか。

積極採用中です!

ここまで読んでいただきありがとうございました。「面白そうだな〜」と思っていただけたエンジニアの方、「こいつは何もわかっちゃいない、俺がもっといい感じにしてやる」と感じたアーキテクトの方などなど、ぜひお話ししましょう!一緒にインフラを支えてくれる方も大募集中です!

jobs.algomatic.jp

*1:Storage Accountの設定だけは必要です

*2:厳密には、Orchestrator が Activity を「直接呼ぶ」わけではない点に注意が必要です。 Orchestrator は Task Hub(Azure Storage 上のキュー・テーブル・Blob で構成)に「Activity を実行してほしい」というメッセージを置き、Activity Function 側がそのメッセージをポーリングして処理します。結果も Task Hub に書き戻され、Orchestrator はそれを受け取って再開します。Microsoft Learn,Microsoft Learn

*3:HTTP Trigger FunctionsからのOrchestrator起動についても「直接呼ぶ」わけではなく、Task Hub経由になります。