眠いしお腹すいたし(´・ω・`)

C#関連を主に書きます。掲載内容は個人の見解であり、所属する企業を代表するものではありません。

Durable Functionsで学ぶクラウドデザインパターン -Compensating Transaction Pattern-

先日、牛尾さん、Kanioさん主催の

www.meetup.com

に参加してきました。

Durable functionsの紹介だけではなくモブプログラミングもあり私が英語が駄目すぎることを除いてすごく楽しいセッションでした。

元々、非常に興味があるフレームワークだったのですが触ってはいなかったので、しばらくDurable functionsで遊んでみようという気になっている次第です。

今回はDurable Functionsを使ってマイクロサービスアーキテクチャなどでよく使われるCompensation Transaction Pattern(補正トランザクションパターン)を実装することで、補正トランザクションパターンを勉強してみようと思います。

## 今回の成果物

github.com

環境

Visual studio 2017 - 15.5.2

Azure Functions Cli - 1.0.7

Azure Storage Emulator 5.2

Nuget Package

https://www.nuget.org/packages/Microsoft.Azure.WebJobs.Extensions.DurableTask/1.1.0-beta2

Compensation Transaction Pattern

この資料が一番わかりやすいと思います。

www.slideshare.net

著しく雑に表現すると

全然別々のサービス間でもトランザクション処理を行わなければならないケースはあるのだけど全てのサービスを通して高レベルなロールバック処理は難しいから、どこかで失敗したら各サービスで元に戻す上書きを行おう。

という感じです。

解説

OrchestratorとActivity

Durable Funcitonsでは処理の単位としてOrchestratorとActivityが存在します。

OrchestratorはOrchestration、つまりアーキテクチャを実装します。

Activityは処理を実装します。

なのでOrchestraorでI/Oを伴う処理をすることはアンチパターンです。

例えばRestAPIをコールして結果を取得する処理を行おうとした時には

Activity関数で実際に通信する部分を実装してOrchestratorでActivityを呼び出して結果を取得する処理を実装するようなイメージです。

ここを理解した上でコードの解説行います。

StartNew

[FunctionName("HttpStartSingle")]
        public static async Task<HttpResponseMessage> RunSingle(
            [HttpTrigger(AuthorizationLevel.Function, methods: "post",
                Route = "orchestrators/HttpStartSingle/{requestId}/{point}")] HttpRequestMessage req,
            [OrchestrationClient] DurableOrchestrationClient starter,
            string requestId,
            int point,
            TraceWriter log)
        {
            var instanceId = await starter.StartNewAsync("CompensatingTransactOrchestrator",
                new PointRequest() {RequestId = requestId, Point = point});

            return starter.CreateCheckStatusResponse(req, instanceId);
        }

この関数ではHttpRequestを受けた場合に「CompensatingTransactOrchestrator」というOrchestratorを起動させます。

そしてリクエストの結果として、そのオーケストレータの「インスタンスID」に伴う情報を返答しています。

Durable Functionsではオーケストレーションを開始すると、そのオーケストレーションのIDとしてInstanceIdが発行されます。

そのInstanceIdを使用してオーケストレーションの結果を追跡することが可能となります。

このリクエストの結果はこのようになります。

{
    "id": "f0d9f24a2cc94a3e8394e6ae00c30a59",
    "statusQueryGetUri": "http://localhost:7071/admin/extensions/DurableTaskExtension/instances/f0d9f24a2cc94a3e8394e6ae00c30a59?taskHub=DurableFunctionsHub&connection=Storage&code=e5aGhnpascgIhDdfTvQozANdKlML3FlayXzx1Pr6NqRyb7gSs2aIiQ==",
    "sendEventPostUri": "http://localhost:7071/admin/extensions/DurableTaskExtension/instances/f0d9f24a2cc94a3e8394e6ae00c30a59/raiseEvent/{eventName}?taskHub=DurableFunctionsHub&connection=Storage&code=e5aGhnpascgIhDdfTvQozANdKlML3FlayXzx1Pr6NqRyb7gSs2aIiQ==",
    "terminatePostUri": "http://localhost:7071/admin/extensions/DurableTaskExtension/instances/f0d9f24a2cc94a3e8394e6ae00c30a59/terminate?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=e5aGhnpascgIhDdfTvQozANdKlML3FlayXzx1Pr6NqRyb7gSs2aIiQ=="
}

このJsonに書かれているUrlにアクセスするとオーケストレーションの結果を確認できたりオーケストレーションを途中で止めたりすることができます。

ステータス取得の結果はこんな感じになります。

{
    "runtimeStatus": "Completed",
    "input": {
        "$type": "CompensatingTransactionOrchestration.Functions+PointRequest, CompensatingTransactionOrchestration",
        "RequestId": "h",
        "Point": 100
    },
    "output": "repository1 Failure",
    "createdTime": "2018-01-21T04:06:13Z",
    "lastUpdatedTime": "2018-01-21T04:06:22Z"
}

Orchestrator and Activity

Compensating Transaction PatternなOrchestratorを見てみます。

        [FunctionName("CompensatingTransactOrchestrator")]
        public static async Task<string> CompensatingTransactOrchestrator(
            [OrchestrationTrigger] DurableOrchestrationContext context
        )
        {
            var pointRequest = context.GetInput<PointRequest>();
            var compensatingStack = new Stack<ValueTuple<string, RepositoryResult>>();

            try
            {


                var result1 = await context.CallActivityAsync<RepositoryResult>(nameof(Repository1Activity), pointRequest);
                if (result1.Succeed)
                {
                    compensatingStack.Push((nameof(Repository1CompensatingActivity), result1));
                }
                else
                {
                    return "repository1 Failure";
                }

                var result2 = await context.CallActivityAsync<RepositoryResult>(nameof(Repository2Activity), pointRequest);
                if (result2.Succeed)
                {
                    compensatingStack.Push((nameof(Repository2CompensatingActivity), result2));
                }
                else
                {
                    return "repository2 Failure";
                }

                var result3 = await context.CallActivityAsync<RepositoryResult>(nameof(Repository3Activity), pointRequest);
                if (result3.Succeed)
                {
                    compensatingStack.Push((nameof(Repository3CompensatingActivity), result3));
                }
                else
                {
                    return "repository3 Failure";
                }

                compensatingStack.Clear();
            }
            finally
            {
                while (compensatingStack.Any())
                {
                    var item = compensatingStack.Pop();
                    await context.CallActivityWithRetryAsync(item.Item1, GetRetryOprion(), item.Item2);
                }
            }

            return "Success";
        }

このFunctionはOrchestrationTriggerを使用しているのでStartNewされた時、もしくはCallSubOrchestrationされた時に実行されます。

読んでわかる通りCompensation Transactin Patternそのままです。

Repository1~3に対する処理を行うために各ActivityをCallしていきます。

しかし途中で失敗した場合には、実行が完了してしまっているRepositoryの補正用ActivityをCallして補正を行なっています。

各アクティビティの処理はこんな感じです。

        [FunctionName("Repository1Activity")]
        public static async Task<RepositoryResult> Repository1Activity(
            [ActivityTrigger] PointRequest context
            , [Table("table1")] CloudTable table
            )
        {
            var query =
                new TableQuery<PointTable>().Where(TableQuery.GenerateFilterCondition(nameof(PointTable.PartitionKey),
                    QueryComparisons.Equal, context.RequestId));

            if (!table.ExecuteQuery(query).Any())
            {
                var processId = Guid.NewGuid().ToString();
                var insertOperation =
                    TableOperation.Insert(new PointTable()
                    {
                        PartitionKey = context.RequestId,
                        RowKey = processId,
                        Point = context.Point
                    });

                var result = await table.ExecuteAsync(insertOperation);
                return new RepositoryResult()
                {
                    ProcessId = processId,
                    Request = context,
                    Succeed = true,
                };
            }

            return new RepositoryResult()
            {
                Request = context,
                Succeed = false
            };
        }

        [FunctionName("Repository1CompensatingActivity")]
        public static async Task Repository1CompensatingActivity(
            [ActivityTrigger] RepositoryResult repositoryResult
            , [Table("table1")] CloudTable table
            )
        {
            var query =
                    new TableQuery<PointTable>()
                        .Where(TableQuery.GenerateFilterCondition(nameof(PointTable.PartitionKey),
                            QueryComparisons.Equal,
                            repositoryResult.Request.RequestId))
                        .Where(TableQuery.GenerateFilterCondition(nameof(PointTable.RowKey), QueryComparisons.Equal,
                            repositoryResult.ProcessId))
                ;

            var targets = table.ExecuteQuery(query).ToArray();

            if (targets.Any())
            {
                foreach (var target in targets)
                {
                    var deleteOperation = TableOperation.Delete(target);
                    await table.ExecuteAsync(deleteOperation);
                }
            }
        }

ストレージテーブルに処理を書いて補正では書いた処理を消している感じです。

大事なこと

このOrchestratorと各Activityにブレークポイントをしかけて実行すると、とても面白い動作をしていることに気がつきます。

OrchestrationTriggerによって実行されているFunctionが何回も呼び出されるのです。

1回しかリクエストを行なっていないのに、です。

しかし各Activityは1度しか実行されません。

これがDurable FunctionsがDurableであるとされる部分です。

Orchestratorが何らかの理由で止まってしまったとしても再実行され再実行時にはすでに処理を行なっているCallActivityに関してはActivityの実際の呼び出しは行われずに結果だけ最初の実行と同じものが返されます。

常にOrchestratorFunctionは再実行されることが前提の設計となっているのです。

ここにOrchestratorでI/O処理を行なってはいけない理由にあります。

Orchestratorを実装する時にはI/OをOrchestration/Activityに限定しなければ、外部の情報の変化により結果が変化してしまうためハチャメチャな処理になってしまうのです。

終わりに

いかがでしたでしょうか?

Durable Functions 大変面白いです。

もちろん「Durable Funcitonsを使用することによるデメリットは無い」、なんてことは言えないのですが、それにしても大変面白いフレームワークであると言えると思います。

ヾ(´・∀・[.:゚+またね.: :]

おまけ

今回はHttpリクエストに対して非同期にオーケストレーションが実行されてますが

        [FunctionName("HttpStartWait")]
        public static async Task<HttpResponseMessage> RunWait(
            [HttpTrigger(AuthorizationLevel.Function, methods: "post",
                Route = "orchestrators/HttpStartWait/{requestId}/{point}")] HttpRequestMessage req,
            [OrchestrationClient] DurableOrchestrationClient starter,
            string requestId,
            int point,
            TraceWriter log)
        {
            var instanceId = await starter.StartNewAsync("CompensatingTransactOrchestrator",
                new PointRequest() { RequestId = requestId, Point = point });

            while (true)
            {
                var status = await starter.GetStatusAsync(instanceId);
                if (status?.RuntimeStatus > OrchestrationRuntimeStatus.Running)
                {
                    return new HttpResponseMessage(HttpStatusCode.OK)
                    {
                        Content = new StringContent(JsonConvert.SerializeObject(status))
                    };
                }
                await Task.Delay(TimeSpan.FromSeconds(2));
            }
        }

こんな感じにすれば一応リクエストに対して同期的に返答することも可能かです。