眠いしお腹すいたし(´・ω・`)

C#関連を主に書きます。掲載内容は個人の見解であり、所属する企業を代表するものではありません。

Durable Functionsで学ぶクラウドデザインパターン -Pipes and Filters Pattern-

前回

tamafuyou.hatenablog.com

に引き続き、今回はPipes and Filters (パイプとフィルターのパターン)をDurable Functionsで実装してみます。

Durable Functionsのご紹介は

Durable Functions - Google 検索

この辺でお願いします。

Pipes and Filters

docs.microsoft.com

クラウドデザインパターンとして紹介されているのですが昔からある古典デザインパターンの一つですね。

著しく雑に表現すると、

モノシリックな大きいロジックは途中にボトルネックなロジックが含まれていても大きいロジックの単位でしか実行されないからリソースの無駄になりやすいよね。だからタスク単位に分割しよう。そうすれば再利用性も向上するし小さい単位での並列実行が可能になるからリソースの無駄を防げるかもね。しかも要件によってタスクの順序が入れ替わるようなケースでも対応できちゃうよね。しかもこのパターンにするとタスク毎に別々のコンピュータリソースで実行が可能だから障害に対する回復性も向上するよね。

って感じです。

C#のプログラム上でTask等の並列処理を使えば一つのコンピュータ上での並列実行は可能なんですけどクラウドで実行するんだったらもっとスケールを大きく行こう!! って事なんでしょうか。

今回の成果物

github.com

解説

とは言うものの、全然難しいことはしていません。 まずはActivityを1つのタスクと考えて実装します。

[FunctionName(nameof(TaskAActivity))]
        public static async Task<List<string>> TaskAActivity(
            [ActivityTrigger] List<string> results,
            TraceWriter logger
        )
        {
            logger.Info("Task A start");
            results.Add("TaskA Complete");
            return results;
        }

このようなアクティビティをA~Fまで作成しました。

次にタスクを繋ぎ合わて作成するロジックをOrchestratorと考えて実装します。

        [FunctionName(nameof(DataSource1Orchestrator))]
        public static async Task<List<string>> DataSource1Orchestrator(
            [OrchestrationTrigger] DurableOrchestrationContext context
            )
        {
            var input = context.GetInput<string>();
            var a = await context.CallActivityAsync<List<string>>(nameof(TaskAActivity), new List<string>() {input});
            var b = await context.CallActivityAsync<List<string>>(nameof(TaskBActivity), a);
            var c = await context.CallActivityAsync<List<string>>(nameof(TaskCActivity), b);
            var d = await context.CallActivityAsync<List<string>>(nameof(TaskDActivity), c);
            return d;
        }

        [FunctionName(nameof(DataSource2Orchestrator))]
        public static async Task<List<string>> DataSource2Orchestrator(
            [OrchestrationTrigger] DurableOrchestrationContext context
            )
        {
            var input = context.GetInput<string>();
            var a = await context.CallActivityAsync<List<string>>(nameof(TaskAActivity), new List<string>() { input });
            var b = await context.CallActivityAsync<List<string>>(nameof(TaskBActivity), a);
            var e = await context.CallActivityAsync<List<string>>(nameof(TaskEActivity), b);
            var f = await context.CallActivityAsync<List<string>>(nameof(TaskFActivity), e);
            return f;
        }

あとはHttpTriggerによる呼び出しを作成すれば終わりです。

呼び出し結果として

DataSource1であれば

{
    "runtimeStatus": "Completed",
    "input": "test",
    "output": [
        "test",
        "TaskA Complete",
        "TaskB Complete",
        "TaskC Complete",
        "TaskD Complete"
    ],
    "createdTime": "2018-01-22T14:54:31Z",
    "lastUpdatedTime": "2018-01-22T14:54:39Z"
}

DataSource2であれば

{
    "runtimeStatus": "Completed",
    "input": "kokoro",
    "output": [
        "kokoro",
        "TaskA Complete",
        "TaskB Complete",
        "TaskE Complete",
        "TaskF Complete"
    ],
    "createdTime": "2018-01-22T14:16:27Z",
    "lastUpdatedTime": "2018-01-22T14:16:31Z"
}

になります。

Pipes and Filtersのパターンって、そのままDurableFunctionsなんですよね。

特に難しいことはないです。

Orchestratorのコードって一見すると、そのコンピュータ上で非同期処理をしている様に見えるのですが実際にどのコンピュータで実行されているかは分からないんです。

例えばFunctionsをAppServiceプランのインスタンス x 10みたいな環境で実行したとすれば、その10台の中のどこかでActivityが実行される感じです。

コードの見た目以上にスケールの変動に容易に対応しちゃってるんですよね。

あと、例えば

Orchestratorのコードを

        [FunctionName(nameof(DataSource2Orchestrator))]
        public static async Task<List<string>> DataSource2Orchestrator(
            [OrchestrationTrigger] DurableOrchestrationContext context
            )
        {
            var input = context.GetInput<string>();
            var a = await context.CallActivityWithRetryAsync<List<string>>(nameof(TaskAActivity),new RetryOptions(TimeSpan.FromSeconds(30), 10),  new List<string>() { input });
            var b = await context.CallActivityWithRetryAsync<List<string>>(nameof(TaskBActivity), new RetryOptions(TimeSpan.FromSeconds(30), 10), a);
            var e = await context.CallActivityWithRetryAsync<List<string>>(nameof(TaskEActivity), new RetryOptions(TimeSpan.FromSeconds(30), 10), b);
            var f = await context.CallActivityWithRetryAsync<List<string>>(nameof(TaskFActivity), new RetryOptions(TimeSpan.FromSeconds(30), 10), e);
            return f;
        }

こう言う風にCallActivityWithRetryAsyncに置き換えるだけで簡単にAcitvity単位のRetryを設定することができます。

このリトライの容易さがDurable Functionsの良い点でもあり、どのように活かすかの腕の見せどころな様に思います。

要するにActivityの処理にてどの様な時にRetryを行ってどの様な時にRetryしないかを設計して実装すれば、あとはOrchestratorで如何様にもできますぜって感じなのです。

いやぁ、Durable Functionsって本当にいいものですねぇ

それではまた次回をお楽しみに、サヨナラ、サヨナラ、サヨナラ!