眠いしお腹すいたし(´・ω・`)

C#関連を主に書きます。掲載内容は個人の見解であり、所属する企業を代表するものではありません。

Durable Functionsで学ぶクラウドデザインパターン -Scheduler Agent Supervisor Pattern-

長らくお待たせ致しました?

Durable Functionsで学ぶクラウドデザインパターンシリーズ

前回の記事を書いてから、Durable FunctionsはGAされて(2018.06.03現在、V1のみ V2はPreview)現在のバーションは1.4.1。

その間にDurable Functionsの生みの親、Chris Gillum (@cgillum) | Twitterさんがde:codeで日本語で登壇したりしてます。

もう使うしかない機運漂うDurable Functions、ガンガン触っていきましょう。

そして要望が出たり問題を見つけたらイシューからのプルリクだo(・`д・´。)

さてさて今回は第3弾として「Scheduler Agent Supervisor パターン」を学んでみましょう。

Scheduler Agent Supervisor

docs.microsoft.com

さて、このデザインパターン、これまで紹介したCompensating Transaction PatternPipes and Filters Patternに比べると、解決したい問題などが回復性に寄っていて複雑です。

そうです、今流行りのワード?「回復性」です。

システムの回復性に関しては

回復性に優れた Azure 用アプリケーションの設計 | Microsoft Docs

NoOpsで高可用性・ハイスケールシステムを自律運用させよう! 実現に必要な3つのポイント【デブサミ2018】 (1/2):CodeZine(コードジン)

システム運用は自動化が常識に、NoOpsまでの歴史 | 日経 xTECH(クロステック)

色々な記事が見つかるかと思います。

個人的にシステムは作成する上で非常に重要な概念であり、これはオンプレミスだろうとクラウドだろうとサーバレスであろうと障害との付き合いは普遍的であり、その中でもっとも目指すべき方向の一つであると考えています。

もちろん「運用は人の手を使ってやれば良いのだ。そんなアーキテクチャを考えて実装するより下請けを安く買い叩けば良いのだ」なんていう考えであれば話は別ですが、現実の中で人の手をできる限り少なく運用できる事はエンジニアのQOLにも直結する大切な問題であると思うのです。

少し脱線しましたが、具体的にScheduler Agent Supervisor Patternを見てみましょう。

分散された一連のアクションを 1 つの操作として調整します。 いずれかのアクションが失敗した場合は、全体の操作が全体として成功または失敗するように、その失敗を透過的に処理しようとするか、または実行された作業を元に戻します。 これにより、一時的な例外、長期間続く障害、プロセスのエラーなどのために失敗したアクションを復旧して再試行することが可能になるため、分散システムに回復性が追加される場合があります。

ワケ⊂(´-` ) ワカ( ´-`)つ ラン♪⊂(´ヘ`)つ

もっと私みたいな日本語が不自由な人でもわかる日本語で書いてもらいたいですね( -`ω-)

仕方がないので詳細な説明の方を見て見ましょう。

アプリケーションは、その一部でリモート サービスが呼び出されたり、リモート リソースにアクセスしたりする可能性のあるいくつかの手順を含むタスクを実行します。 それぞれのステップは互いに独立しているかもしれませんが、それらを指揮するのは、タスクを実装するアプリケーションのロジックです。

要するに、アプリケーション(サービス)は処理の中でサービスのリポジトリにだけアクセスするのではなくて外部のAPIを呼び出したりする事もありますよね? それらをコントロールするのはアプリケーションのロジックになりますよね?

っていう事でしょう。そりゃそうでしょう。

可能な場合は常に、アプリケーションはタスクが完了まで実行されるようにし、リモート サービスまたはリソースへのアクセス時に発生する可能性のあるすべての障害を解決する必要があります。 障害は、さまざまな原因で発生することがあります。 たとえば、ネットワークが停止したり、通信が中断されたり、リモート サービスが無応答または不安定な状態になったり、おそらくリソースの制約のためにリモート リソースが一時的にアクセスできなくなったりすることがあります。 多くの場合、障害は一時的であり、再試行パターンを使用して処理できます。

アプリケーションは外部APIへのアクセスやリポジトリへのアクセスで発生する可能性がある障害を、なんとかして処理が完了するように実装する必要がありますよね?

これらの原因には通信が中断されたり外部APIが不安定であったりスロットリングに引っかかったりすることがあるよね?

多くの場合は、こういう障害は一時的に発生するものだから再試行パターンで処理可能だよね?

アプリケーションは、容易に復旧できないより永続的な障害を検出した場合、システムを整合性のある状態に復元し、操作全体の整合性を保証できる必要があります。

でも、障害が長引くような感じならシステムを問題ない状態までロールバックしたり一時停止したりしてロジック全体で辻褄が合うように戻してあげる必要があるよね?

みたいな感じでしょうか。

まぁ要約するとロジックのなかで処理毎にリトライ処理をするのは当然として、短期間リトライでなんとかならない場合には、長時間待機し障害普及ごにリトライしたり補償トランザクションで整合性を保つようにロールバックするなどの工夫ができるとCOOL!!って感じでしょうか。

これに対する解決策として

https://docs.microsoft.com/ja-jp/azure/architecture/patterns/scheduler-agent-supervisor#solution

を書いています。

はい、ここでこの文を読んで「あれ??」って思った方、それです m9⊃゜∀゜)!!

ここで言っているSchedulerとAgentをOrchestratorとActivityに置き換えて見ましょう。

あら不思議!! Durable Functionsを使うだけでSchedulerとAgentになってしますのです。

あぁ Durable Functions まじCOOL(●´∀`●)

残るはSuperVisorを用意するだけで良いのです。

サンプル

github.com

今回のサンプルはドキュメント内で例としてあげられてる

https://docs.microsoft.com/ja-jp/azure/architecture/patterns/scheduler-agent-supervisor#example

物をDurable Functionsで作成して見ました。

ECで注文を受けた場合データベースに受注情報を書き込みリモートサービスをコールする。

リモートサービスをコールする箇所でのエラーが発生した場合にリトライ処理を行う。

しかしリトライが閾値を超えた場合には一時停止しアラートを上げる。

ユーザは問題の解決後にリトライを行うことができる。

みたいな感じです。

今回は複雑なので少し図を描いてみました。

まずは正常時の処理ですが非常に単純です。

f:id:tamafuyou:20180603205720p:plain

OrderRequest関数でリクエストを受け付けて(ECウェブアプリケーションからのコールを想定)Orchestratorを起動しStorageTableに受注情報を保存し外部APIをコールして終了します。

次に外部APIをコール時にAPIをコールしリトライしても解決しなかった場合のアーキテクチャの図です。

f:id:tamafuyou:20180603210123p:plain

CallRemoteServiceが正常に終了しなかった場合、失敗レポートとしてFailReportテーブルにInstanceIdなどの情報を記録します。

その後OrchestrotorはWaitForExternalEventをして外部からRiseEventされるまで待機状態となります。

FailReportWatcher関数はTimerTriggerで動いていて定周期でFailReportテーブルを監視しています。

FailReportテーブルにデータを見つけるとSlackにアラートの通知を行います。

その際にRetry用のリンクも一緒にメッセージとして送信します。

ユーザは外部APIの障害が解決後にそのリンクをクリックします。

リンクはFailReportRetry関数のURLであり、関数内でFialReportに記述されているInstanceIdに対してRiseEventを行います。

RiseEventが行われるとOrchestratorが再び動き出しCallRemoveServiceから再開されます。

コード解説

ではOrchestratorを見てみましょう。

gist.github.com

上記のアーキテクチャを読むとなんとなくOrchestraorもわかると思います。

ポイント1

CallRemoteService Activityの呼び出し際にCallActivityWithRetryを使用しています。 これを使用する事でExponential backoffなトライ処理を行う事ができます。

ポイント2

CallRemoteServiceの呼び出しでリトライをしたけれど成功しなかった場合にWaitForExternalEventで待機状態に移るのですが、その際にタイムアウトを5日として待機しタイムアウトした場合には補償トランザクション処理を行っています。

Activityに関してなのですが各々はとてもTableにデータを書き込んだりAPIにアクセスしたりしているだけで単純ですので解説は省きます。

終わりに

どうでしょう?SchedulerAgentSupervisorパターンの理解は進みましたでしょうか?

ここまで読まれて頂けたのであれば一見するとかなり複雑そうなアーキテクチャもDurable Functionsを使用するとかなり簡潔に実装する事ができる事、Durable Functionsのパワーを実感して頂けたのではと思っております。

今回のサンプルは実戦ではもっと考慮するべき事があるのですが省いています。

実戦で使用する時にはどこまで考慮するべきか・・・検討してみるのも1つの勉強になるのではないかと思います。

いやぁ、Durable Functionsって本当にいいものですねぇ

それではまた次回をお楽しみに、サヨナラ、サヨナラ、サヨナラ!

おまけ

Durable Functionsの公式Wikiに記事のリンクを記載いただけました。

Home · Azure/azure-functions-durable-extension Wiki · GitHub

v(。・・。)イエィッ♪

リツイート抽選サービス「ついせん」をはじめました。

ツイッターリツイートで抽選を行うためのサービス「ついせん」を作りました。

twisen.azurewebsites.net

無料で使えてリツイート抽選できます。

ツイッターリツイートで抽選を行うサービスでは国内では「あたれら」くらいだったのですが、「あたれら」さんは、抽選処理を同期的に行う仕様上、TwitterAPIの7日以前の検索が行えない制約から最大7日間までの範囲での抽選企画を対象としています。

しかし、最近Twitter社さんは有料APIに力を入れていて去年の11月ごろから無制限に古いデータを検索できるようにしていています。

Twitterがより自由にデータにアクセス可能となる開発者向けプレミアムAPIをリリース - GIGAZINE

ですので有料APIを使用すれば同期的処理であっても7日縛りを突破できます。

「あたれら」さんのプログラムで有料APIを使用すればOKな感じです。

でも無料サービスで月額17,000円の有料APIを使うのは辛いし無料APIの範囲で何とかしてみたい・・・ということを考えまして「ついせん」を作成しました。

あと、Durable Functionsを使ってサービスを作ってみたかった・・・という理由もあります。

「ついせん」ではリツイートしてもらいたい対象のツイートを登録するときに、締切日時を設定します。 この締切日時は1ヶ月後でも1年後でもOKです。 「ついせん」は締切日時までリツイートしてくれたユーザを収集していきます。 そして締切日時を過ぎたら収集したユーザの中から抽選を行います。 これによって無料で7日の縛りを越えるようにしています。

またリツイート数の制限も特に設けていません。 ツイッターのSearchAPIはRateLimitが結構厳しいので15分間に180になります。 この辺も非同期に処理する仕組みなので乗り越えています。 試験では40000RTまで確認しました。

動作環境としてはAzureで動作させていてWeb Apps(F1) + SQLDatabase(Basic) + Azure Functions(従量課金) + Azure Storageっていう感じです。

現状は運用費用は月額600円くらいです。「あたれら」さんがあるのであまり使われないと思いますのでSQLDatabaseの部分を別のストレージに置き換えれば無料で運営できるかなぁとか思います。

無料で使えますのでお気軽にご利用ください。

Durable Functionsを使用したサンプル -リクエスト結果をWebHookで返答するサービスへの対応-

今回はDurable Functionsを使用した少しだけ実践的なパターンを想定したサンプル実装を行ってみたいと思います。

前置き

前置き書いていたら長くなったので良い飛ばしてください。

みなさん、サービス間の連携にはどんな仕組みをよく使いますか?

一般的にはWeb APIが多いでしょうか?

今時的だとgPRCを使用したりする事もあるでしょう。

もしかしたら専用イーサネットワーク越しDATABASE LINKなどで RDBを使用して連携しているかも知れません。もしかしたらFTPサーバにcsvファイルをアップロードして連携するなんて事も2018年の今でも現在でも存在しているかも知れません。もしかしたらcsvファイルがshift-jisで書かれていたりeuc-jpで書かれていたり、どこかの企業が拡張した文字コードで書かれていたりするかも知れません。もしかしたらH○LFTなどの連携サービスを使用しているかも知れません。

Web APIでのサービス間連携は同期的であればとてもシンプルです。

1セッション内でリクエストとレスポンスが行われます。

しかし1処理に時間がかかる場合にはWeb APIによる連携は最善とは言えないかも知れません。

リクエストを受ける側は同期的に処理を実行するという制約がかけられる事によって実装難易度が上がってしまったりスケールアウトが難しくなってしまったりするかも知れません。

ロングランニングな処理に対する解決方法としては非同期なAPIを設計する場合があります。

リクエストを受け付けた時に処理IDを発行して返答し、リクエストを行なった側は処理IDを使用して結果取得APIをコールする事で結果を取得する形式です。

Durable FunctionsのOrchestratorは正にその振る舞いをします。

しかしこの形式はリクエストを送る側が結果取得APIをポーリングしなければならないという問題点があります。

そこで最終的に辿り着くパターンとしては処理が終わった時にリクエストを送信した側に何らかの方法で結果を通知する方法です。

良くある方式としてはリクエストを送信する側も結果通知リクエストを受け付けるエンドポイントを用意しておく所謂WebHook的な連携です。

今回は1リクエスト1フックなAPIへのリクエストをDurable Functionsの外部イベントを使用して処理するサンプルを作成してみたいと思います。

成果物

github.com

解説

docs.microsoft.com

人による操作または他の外部トリガーを処理するときに便利です

と書かれている通り外部トリガーというのが今回でいうWebHookであったりします。

まぁWebHookじゃなくてもFTPサーバに結果CSVをおいて(ry

WebHookOrchestrator

それではリクエストを行う側のOrchestrationを見てみましょう。

gist.github.com

まずはHookApiRequestActivityをコールしてAPIリクエストを行なって処理IDの取得をおこなっています。

gist.github.com

処理は単純です。

次にSaveRequestKeyActivityをコールして取得した処理IDを自身のInstanceIdとセットにしてStorageTableの保存しています。

gist.github.com

次にWaitForExternalEventを使用して外部から結果を発火されるのを待ちます。

最後にOrchestrateの結果として外部からの結果を返答します。

HookRecieve

次にサービスからWebHookを受け付けるFunctionを見てみます。

gist.github.com

WebHookによってリクエストされた情報の中からInstanceIdを取得してStorageTableに問い合わせを行い、リクエストを行なったOrchestratorのInstanceIdを取得。

そのInstanceIdでRaiseEventAsyncをコールします。

RaiseEventAsyncをコールするとWaitForExternalEventで待機していたOrchestratorが動き出してリクエストを行なったOrchestratorの処理が終了します。

終わりに

今回のサンプルであればリクエストを行う部分とWebHookを受ける部分を別々に処理したとしても問題がありません。

しかしAPIリクエストが処理の一部に過ぎず、その結果を使用してさらに複雑に処理を行なっていくようなパターンでは今回のようにDurable FunctionsのOrchestrationで実装していく方が楽になるかとおもいます。

また今回のパターンではWaitForExternalEventで待機した状態でWebHookが行われなかった場合、処理が中途半端になってしまします。

その場合のデザインパターンとしてScheduler Agent Supervisorパターンを用いるのも良いかも知れません。

その辺のサンプルはまた次回に。

それでは👋

Durable Functionsで学ぶクラウドデザインパターン -Pipes and Filters Pattern-

前回

tamafuyou.hatenablog.com

に引き続き、今回はPipes and Filters (パイプとフィルターのパターン)をDurable Functionsで実装してみます。

Durable Functionsのご紹介は

Durable Functions - Google 検索

この辺でお願いします。

Pipes and Filters

docs.microsoft.com

クラウドデザインパターンとして紹介されているのですが昔からある古典デザインパターンの一つですね。

著しく雑に表現すると、

モノシリックな大きいロジックは途中にボトルネックなロジックが含まれていても大きいロジックの単位でしか実行されないからリソースの無駄になりやすいよね。だからタスク単位に分割しよう。そうすれば再利用性も向上するし小さい単位での並列実行が可能になるからリソースの無駄を防げるかもね。しかも要件によってタスクの順序が入れ替わるようなケースでも対応できちゃうよね。しかもこのパターンにするとタスク毎に別々のコンピュータリソースで実行が可能だから障害に対する回復性も向上するよね。

って感じです。

C#のプログラム上でTask等の並列処理を使えば一つのコンピュータ上での並列実行は可能なんですけどクラウドで実行するんだったらもっとスケールを大きく行こう!! って事なんでしょうか。

今回の成果物

github.com

解説

とは言うものの、全然難しいことはしていません。 まずはActivityを1つのタスクと考えて実装します。

[FunctionName(nameof(TaskAActivity))]
        public static async Task<List<string>> TaskAActivity(
            [ActivityTrigger] List<string> results,
            TraceWriter logger
        )
        {
            logger.Info("Task A start");
            results.Add("TaskA Complete");
            return results;
        }

このようなアクティビティをA~Fまで作成しました。

次にタスクを繋ぎ合わて作成するロジックをOrchestratorと考えて実装します。

        [FunctionName(nameof(DataSource1Orchestrator))]
        public static async Task<List<string>> DataSource1Orchestrator(
            [OrchestrationTrigger] DurableOrchestrationContext context
            )
        {
            var input = context.GetInput<string>();
            var a = await context.CallActivityAsync<List<string>>(nameof(TaskAActivity), new List<string>() {input});
            var b = await context.CallActivityAsync<List<string>>(nameof(TaskBActivity), a);
            var c = await context.CallActivityAsync<List<string>>(nameof(TaskCActivity), b);
            var d = await context.CallActivityAsync<List<string>>(nameof(TaskDActivity), c);
            return d;
        }

        [FunctionName(nameof(DataSource2Orchestrator))]
        public static async Task<List<string>> DataSource2Orchestrator(
            [OrchestrationTrigger] DurableOrchestrationContext context
            )
        {
            var input = context.GetInput<string>();
            var a = await context.CallActivityAsync<List<string>>(nameof(TaskAActivity), new List<string>() { input });
            var b = await context.CallActivityAsync<List<string>>(nameof(TaskBActivity), a);
            var e = await context.CallActivityAsync<List<string>>(nameof(TaskEActivity), b);
            var f = await context.CallActivityAsync<List<string>>(nameof(TaskFActivity), e);
            return f;
        }

あとはHttpTriggerによる呼び出しを作成すれば終わりです。

呼び出し結果として

DataSource1であれば

{
    "runtimeStatus": "Completed",
    "input": "test",
    "output": [
        "test",
        "TaskA Complete",
        "TaskB Complete",
        "TaskC Complete",
        "TaskD Complete"
    ],
    "createdTime": "2018-01-22T14:54:31Z",
    "lastUpdatedTime": "2018-01-22T14:54:39Z"
}

DataSource2であれば

{
    "runtimeStatus": "Completed",
    "input": "kokoro",
    "output": [
        "kokoro",
        "TaskA Complete",
        "TaskB Complete",
        "TaskE Complete",
        "TaskF Complete"
    ],
    "createdTime": "2018-01-22T14:16:27Z",
    "lastUpdatedTime": "2018-01-22T14:16:31Z"
}

になります。

Pipes and Filtersのパターンって、そのままDurableFunctionsなんですよね。

特に難しいことはないです。

Orchestratorのコードって一見すると、そのコンピュータ上で非同期処理をしている様に見えるのですが実際にどのコンピュータで実行されているかは分からないんです。

例えばFunctionsをAppServiceプランのインスタンス x 10みたいな環境で実行したとすれば、その10台の中のどこかでActivityが実行される感じです。

コードの見た目以上にスケールの変動に容易に対応しちゃってるんですよね。

あと、例えば

Orchestratorのコードを

        [FunctionName(nameof(DataSource2Orchestrator))]
        public static async Task<List<string>> DataSource2Orchestrator(
            [OrchestrationTrigger] DurableOrchestrationContext context
            )
        {
            var input = context.GetInput<string>();
            var a = await context.CallActivityWithRetryAsync<List<string>>(nameof(TaskAActivity),new RetryOptions(TimeSpan.FromSeconds(30), 10),  new List<string>() { input });
            var b = await context.CallActivityWithRetryAsync<List<string>>(nameof(TaskBActivity), new RetryOptions(TimeSpan.FromSeconds(30), 10), a);
            var e = await context.CallActivityWithRetryAsync<List<string>>(nameof(TaskEActivity), new RetryOptions(TimeSpan.FromSeconds(30), 10), b);
            var f = await context.CallActivityWithRetryAsync<List<string>>(nameof(TaskFActivity), new RetryOptions(TimeSpan.FromSeconds(30), 10), e);
            return f;
        }

こう言う風にCallActivityWithRetryAsyncに置き換えるだけで簡単にAcitvity単位のRetryを設定することができます。

このリトライの容易さがDurable Functionsの良い点でもあり、どのように活かすかの腕の見せどころな様に思います。

要するにActivityの処理にてどの様な時にRetryを行ってどの様な時にRetryしないかを設計して実装すれば、あとはOrchestratorで如何様にもできますぜって感じなのです。

いやぁ、Durable Functionsって本当にいいものですねぇ

それではまた次回をお楽しみに、サヨナラ、サヨナラ、サヨナラ!

Durable Functionsで学ぶクラウドデザインパターン -Compensating Transaction Pattern-

先日、牛尾さん、Kanioさん主催の

www.meetup.com

に参加してきました。

Durable functionsの紹介だけではなくモブプログラミングもあり私が英語が駄目すぎることを除いてすごく楽しいセッションでした。

元々、非常に興味があるフレームワークだったのですが触ってはいなかったので、しばらくDurable functionsで遊んでみようという気になっている次第です。

今回はDurable Functionsを使ってマイクロサービスアーキテクチャなどでよく使われるCompensation Transaction Pattern(補正トランザクションパターン)を実装することで、補正トランザクションパターンを勉強してみようと思います。

## 今回の成果物

github.com

環境

Visual studio 2017 - 15.5.2

Azure Functions Cli - 1.0.7

Azure Storage Emulator 5.2

Nuget Package

https://www.nuget.org/packages/Microsoft.Azure.WebJobs.Extensions.DurableTask/1.1.0-beta2

Compensation Transaction Pattern

この資料が一番わかりやすいと思います。

www.slideshare.net

著しく雑に表現すると

全然別々のサービス間でもトランザクション処理を行わなければならないケースはあるのだけど全てのサービスを通して高レベルなロールバック処理は難しいから、どこかで失敗したら各サービスで元に戻す上書きを行おう。

という感じです。

解説

OrchestratorとActivity

Durable Funcitonsでは処理の単位としてOrchestratorとActivityが存在します。

OrchestratorはOrchestration、つまりアーキテクチャを実装します。

Activityは処理を実装します。

なのでOrchestraorでI/Oを伴う処理をすることはアンチパターンです。

例えばRestAPIをコールして結果を取得する処理を行おうとした時には

Activity関数で実際に通信する部分を実装してOrchestratorでActivityを呼び出して結果を取得する処理を実装するようなイメージです。

ここを理解した上でコードの解説行います。

StartNew

[FunctionName("HttpStartSingle")]
        public static async Task<HttpResponseMessage> RunSingle(
            [HttpTrigger(AuthorizationLevel.Function, methods: "post",
                Route = "orchestrators/HttpStartSingle/{requestId}/{point}")] HttpRequestMessage req,
            [OrchestrationClient] DurableOrchestrationClient starter,
            string requestId,
            int point,
            TraceWriter log)
        {
            var instanceId = await starter.StartNewAsync("CompensatingTransactOrchestrator",
                new PointRequest() {RequestId = requestId, Point = point});

            return starter.CreateCheckStatusResponse(req, instanceId);
        }

この関数ではHttpRequestを受けた場合に「CompensatingTransactOrchestrator」というOrchestratorを起動させます。

そしてリクエストの結果として、そのオーケストレータの「インスタンスID」に伴う情報を返答しています。

Durable Functionsではオーケストレーションを開始すると、そのオーケストレーションのIDとしてInstanceIdが発行されます。

そのInstanceIdを使用してオーケストレーションの結果を追跡することが可能となります。

このリクエストの結果はこのようになります。

{
    "id": "f0d9f24a2cc94a3e8394e6ae00c30a59",
    "statusQueryGetUri": "http://localhost:7071/admin/extensions/DurableTaskExtension/instances/f0d9f24a2cc94a3e8394e6ae00c30a59?taskHub=DurableFunctionsHub&connection=Storage&code=e5aGhnpascgIhDdfTvQozANdKlML3FlayXzx1Pr6NqRyb7gSs2aIiQ==",
    "sendEventPostUri": "http://localhost:7071/admin/extensions/DurableTaskExtension/instances/f0d9f24a2cc94a3e8394e6ae00c30a59/raiseEvent/{eventName}?taskHub=DurableFunctionsHub&connection=Storage&code=e5aGhnpascgIhDdfTvQozANdKlML3FlayXzx1Pr6NqRyb7gSs2aIiQ==",
    "terminatePostUri": "http://localhost:7071/admin/extensions/DurableTaskExtension/instances/f0d9f24a2cc94a3e8394e6ae00c30a59/terminate?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=e5aGhnpascgIhDdfTvQozANdKlML3FlayXzx1Pr6NqRyb7gSs2aIiQ=="
}

このJsonに書かれているUrlにアクセスするとオーケストレーションの結果を確認できたりオーケストレーションを途中で止めたりすることができます。

ステータス取得の結果はこんな感じになります。

{
    "runtimeStatus": "Completed",
    "input": {
        "$type": "CompensatingTransactionOrchestration.Functions+PointRequest, CompensatingTransactionOrchestration",
        "RequestId": "h",
        "Point": 100
    },
    "output": "repository1 Failure",
    "createdTime": "2018-01-21T04:06:13Z",
    "lastUpdatedTime": "2018-01-21T04:06:22Z"
}

Orchestrator and Activity

Compensating Transaction PatternなOrchestratorを見てみます。

        [FunctionName("CompensatingTransactOrchestrator")]
        public static async Task<string> CompensatingTransactOrchestrator(
            [OrchestrationTrigger] DurableOrchestrationContext context
        )
        {
            var pointRequest = context.GetInput<PointRequest>();
            var compensatingStack = new Stack<ValueTuple<string, RepositoryResult>>();

            try
            {


                var result1 = await context.CallActivityAsync<RepositoryResult>(nameof(Repository1Activity), pointRequest);
                if (result1.Succeed)
                {
                    compensatingStack.Push((nameof(Repository1CompensatingActivity), result1));
                }
                else
                {
                    return "repository1 Failure";
                }

                var result2 = await context.CallActivityAsync<RepositoryResult>(nameof(Repository2Activity), pointRequest);
                if (result2.Succeed)
                {
                    compensatingStack.Push((nameof(Repository2CompensatingActivity), result2));
                }
                else
                {
                    return "repository2 Failure";
                }

                var result3 = await context.CallActivityAsync<RepositoryResult>(nameof(Repository3Activity), pointRequest);
                if (result3.Succeed)
                {
                    compensatingStack.Push((nameof(Repository3CompensatingActivity), result3));
                }
                else
                {
                    return "repository3 Failure";
                }

                compensatingStack.Clear();
            }
            finally
            {
                while (compensatingStack.Any())
                {
                    var item = compensatingStack.Pop();
                    await context.CallActivityWithRetryAsync(item.Item1, GetRetryOprion(), item.Item2);
                }
            }

            return "Success";
        }

このFunctionはOrchestrationTriggerを使用しているのでStartNewされた時、もしくはCallSubOrchestrationされた時に実行されます。

読んでわかる通りCompensation Transactin Patternそのままです。

Repository1~3に対する処理を行うために各ActivityをCallしていきます。

しかし途中で失敗した場合には、実行が完了してしまっているRepositoryの補正用ActivityをCallして補正を行なっています。

各アクティビティの処理はこんな感じです。

        [FunctionName("Repository1Activity")]
        public static async Task<RepositoryResult> Repository1Activity(
            [ActivityTrigger] PointRequest context
            , [Table("table1")] CloudTable table
            )
        {
            var query =
                new TableQuery<PointTable>().Where(TableQuery.GenerateFilterCondition(nameof(PointTable.PartitionKey),
                    QueryComparisons.Equal, context.RequestId));

            if (!table.ExecuteQuery(query).Any())
            {
                var processId = Guid.NewGuid().ToString();
                var insertOperation =
                    TableOperation.Insert(new PointTable()
                    {
                        PartitionKey = context.RequestId,
                        RowKey = processId,
                        Point = context.Point
                    });

                var result = await table.ExecuteAsync(insertOperation);
                return new RepositoryResult()
                {
                    ProcessId = processId,
                    Request = context,
                    Succeed = true,
                };
            }

            return new RepositoryResult()
            {
                Request = context,
                Succeed = false
            };
        }

        [FunctionName("Repository1CompensatingActivity")]
        public static async Task Repository1CompensatingActivity(
            [ActivityTrigger] RepositoryResult repositoryResult
            , [Table("table1")] CloudTable table
            )
        {
            var query =
                    new TableQuery<PointTable>()
                        .Where(TableQuery.GenerateFilterCondition(nameof(PointTable.PartitionKey),
                            QueryComparisons.Equal,
                            repositoryResult.Request.RequestId))
                        .Where(TableQuery.GenerateFilterCondition(nameof(PointTable.RowKey), QueryComparisons.Equal,
                            repositoryResult.ProcessId))
                ;

            var targets = table.ExecuteQuery(query).ToArray();

            if (targets.Any())
            {
                foreach (var target in targets)
                {
                    var deleteOperation = TableOperation.Delete(target);
                    await table.ExecuteAsync(deleteOperation);
                }
            }
        }

ストレージテーブルに処理を書いて補正では書いた処理を消している感じです。

大事なこと

このOrchestratorと各Activityにブレークポイントをしかけて実行すると、とても面白い動作をしていることに気がつきます。

OrchestrationTriggerによって実行されているFunctionが何回も呼び出されるのです。

1回しかリクエストを行なっていないのに、です。

しかし各Activityは1度しか実行されません。

これがDurable FunctionsがDurableであるとされる部分です。

Orchestratorが何らかの理由で止まってしまったとしても再実行され再実行時にはすでに処理を行なっているCallActivityに関してはActivityの実際の呼び出しは行われずに結果だけ最初の実行と同じものが返されます。

常にOrchestratorFunctionは再実行されることが前提の設計となっているのです。

ここにOrchestratorでI/O処理を行なってはいけない理由にあります。

Orchestratorを実装する時にはI/OをOrchestration/Activityに限定しなければ、外部の情報の変化により結果が変化してしまうためハチャメチャな処理になってしまうのです。

終わりに

いかがでしたでしょうか?

Durable Functions 大変面白いです。

もちろん「Durable Funcitonsを使用することによるデメリットは無い」、なんてことは言えないのですが、それにしても大変面白いフレームワークであると言えると思います。

ヾ(´・∀・[.:゚+またね.: :]

おまけ

今回はHttpリクエストに対して非同期にオーケストレーションが実行されてますが

        [FunctionName("HttpStartWait")]
        public static async Task<HttpResponseMessage> RunWait(
            [HttpTrigger(AuthorizationLevel.Function, methods: "post",
                Route = "orchestrators/HttpStartWait/{requestId}/{point}")] HttpRequestMessage req,
            [OrchestrationClient] DurableOrchestrationClient starter,
            string requestId,
            int point,
            TraceWriter log)
        {
            var instanceId = await starter.StartNewAsync("CompensatingTransactOrchestrator",
                new PointRequest() { RequestId = requestId, Point = point });

            while (true)
            {
                var status = await starter.GetStatusAsync(instanceId);
                if (status?.RuntimeStatus > OrchestrationRuntimeStatus.Running)
                {
                    return new HttpResponseMessage(HttpStatusCode.OK)
                    {
                        Content = new StringContent(JsonConvert.SerializeObject(status))
                    };
                }
                await Task.Delay(TimeSpan.FromSeconds(2));
            }
        }

こんな感じにすれば一応リクエストに対して同期的に返答することも可能かです。

DroidKaigi2017アプリ for Xamarin.AndroidをXamarin.iOSに少し移植してみた。

この記事は Xamarin その1 Advent Calendar 2017 - Qiita の20日目の記事です。

はじめに

2017年は色々ありました。

転職してちょっとだけ都会に引っ越してみたりAzureを本格的に使い始めてみたり・・・それくらいかな?

今年も平常運転でクリスマスは1人なのでCivilization6でもやってるとお思います。

前提

今回の記事は

tamafuyou.hatenablog.com

この記事の続きになっているのですが

実は私の中ではこの記事のさらに前提があります。

Xamarin.Forms とコンソールアプリでモデル層を共通化してみた // Speaker Deck

今年3月に生まれて始めて登壇というものをさせていただいたのですが

この内容の実証実験と言いますか

この登壇資料を作っていた時に考えていた事を実装で組み込んでいます。

やってみた事

少し前にDroidKaigi2017アプリをXamarin.Androidに移植してみる、という挑戦を行いました。

これは単純にその当時で割と新し目の設計であるDroidKaigiアプリを学ぶ事でAndroidネイティブの知識を得ると同時にXamarin.Androidをやってみる事でXamarin Nativeというのはどういう特性があって何が良くて何が厳しいのか、自分の視点で考えてみようという目的がありました。

その実装を行うなかでどうせXamrinするなら所謂MVなんちゃらのM(Model)を3月の登壇内容を前提として設計を行ってみてXamarin.iOS版を作ってみるという方向に動きました。

そして今回、Xamarin.Androidとモデル層を共通化してXamarin.iOSを実装して見た形になります。

今回の取り組みに関する知識の前提

  • Androidネイティブ少しわかる
  • iOSネイティブはXamarin.Formsで使う範囲内くらいわかる
  • Swiftはちょっとだけわかる
  • Objective-Cは見たくない
  • C#/.NET Frameworkは少々わかる
  • Xamarin.Forms(Android/iOS)はそこそこ知ってる方だと思ってた時期がありました

今回の成果物

github.com

https://sleepyandhungry1984.tumblr.com/post/168746685717/droidkaigi2017アプリfor-xamarin-ios
sleepyandhungry1984.tumblr.com

謝罪

まだ余裕あるかなぁとかグダグダやってたら全然時間なくなってしまって執筆の時点で実装が不十分です。

年内はリファクタなどを行うかと思います。

解説

共通部分

DroidKaigi2017forXamarin/src/DroidKaigi2017.Service at master · yuka1984/DroidKaigi2017forXamarin · GitHub

モデル層の共有部分はSharedプロジェクトにて共有化を行なっています。

私はSharedプロジェクトをよく用います。

XamarinによるAndroid/iOS共通開発では..NET StandardLibrary(PCL)での共有プロジェクトにする意味はないだろうと考えています。 逆にStdLibとXamarin.Android/iOS間でのnugetパッケージでの不都合などが起きやすいと思います。

Serviceクラスとリポジトリクラスに分かれていて所謂クリーンアーキテクチャのような構成になっているかと思います。

Serviceクラスの特徴として流れの1方向性を意識した作りにあります。

DroidKaigi2017forXamarin/SessionServices.cs at master · yuka1984/DroidKaigi2017forXamarin · GitHub

公開プロパティはIObservableでありかつステートを持つことが可能であるReadOnlyReactivePropertyとReadOnlyReactiveCollectionを使用します。

そしてサービスに対するリクエストは戻り値のないasync/awaitでの実装でありデータの流れの1方向性となっています。

なぜ1方向性にするか、これは双方向性の繋がりの場合だと仕様変更・複数画面間での共有にて悲惨な問題が起きやすいという経験への対策になります。

またこのパターンの場合、View間の同期問題への対策を行うこともできます。

今回作成した動画の後半を見てもらえるとわかるのですが異なるページであってもデータの変更が伝搬するためView側で意識せずに状態の動機を行うことができます。

ただ、この実装にも問題はあり一つのサービスを多くの画面が共有するような状況で、かつ流れるデータが複雑な時にモデル層の変更のリスクが高くなってります。

今後はこの課題に対する解決策を考えていけたらと思っています。

Repositoryはただのデータの保存庫であり実装としてはCRUD的な作りとなっています。

Azure Mobile Appsのデータテーブルを使用しています。

これらのクラス類は前回のXamarin.Android編にて作成したままで変更はほぼありませんでした。(バグを直しただけ)

iOS部分

私はiOSネイティブ開発への理解度や知識が高くないため、今回作成したものはかなりメチャクチャではないかと思っています。 次こそはちゃんとiOS開発を学びたいと思います。

今回作成するにあたってViewModelを作った方が良いのか?とか考えていたのですが、単純に時間がなかったためViewControllerに直接サービスクラスを持って、そのままコードで操作するような作りとなりました。

storyboardを使いこなせる知識もなかったため、コードでゴリゴリ書いています。

感覚的にはXAMLです( ̄ー+ ̄) ニヤリ…

DroidKaigi2017forXamarin/FirstViewController.cs at master · yuka1984/DroidKaigi2017forXamarin · GitHub

こちらはセッション一覧画面にのコードになるのですが、今回はUICollectionViewを使い縦横スクロールのLayoutは

xyk.hatenablog.com

こちらを参考(C#移植)に作成しました。

DroidKaigi2017アプリのように13タイルや21タイルを組み込もうとするともう少し手を入れなければならないのですが、時間と技量と才能が足りなかったため今回は全て1*1タイルでの実装となりました。

この辺は特に難しいことはなくISessionServiceのSessionsプロパティを用いてCollectionを展開しSessionsの変更がおこなわれた場合UICollectionViewをReloadDataしている形になります。

またIMySessionServiceにてMySession情報が更新された時にもReloadDataをしています。

セルがロングタップされた場合にはIMySessionServiceにMySession情報の変更を依頼しています。

全ては1方向になっているので基本的には画面からのイベントはServiceへの依頼へと射影していきます。

画面は異なりますがMySession画面もほぼ同様の作りとなります。

MySession画面はUITableViewをベースにしています。

詳細画面に関しては

DroidKaigi2017forXamarin/DetailViewController.cs at master · yuka1984/DroidKaigi2017forXamarin · GitHub

これも特に難しくなくレイアウトしてセッション情報が流れてきたら更新する、という作りになっています。

おわりに

Xamarin.iOSを触ってみてやっぱりC#いいなぁという気持ちになりました。

もちろんSwiftやKotlinのような新しい言語の方が機能的に優れているのは間違い無いのですが

慣れた言語でバックエンドもフロントエンドも全部作れるって楽なんですよねぇ(´д`)〜з

それではまた〜ヾ(´д`)ノシ

Azure FunctionsでCircuit Breaker付きなService Bus トリガーを作ってみる

あいさつ

もうすぐ年末ですがAdventカレンダーの進捗はいかがでしょうか?

私は全くです(ノ゚Д゚)八(゚Д゚ )ノイエーイ

そしてここでお知らせがございます。

クリスマス中止のお知らせ

2017年12月24,25日に開催予定のクリスマスは、隣国の情勢の悪化に伴い中止となりました。

本決定により、クリスマスイブも中止になります。

中止、ならびに本告知が遅れたことにつきまして、楽しみにしておられた方々、及び関係者各位には謹んでお詫び申し上げます。

はじめに

みなさん、Azure Functionsは使ってますか?

ビバ!サーバーレス!! 素晴らしきかなサーバーレス!! 

サーバレスで作成すマイクロサービスアーキテクチャでAIを利用すればOK(∩´∀`)∩

なんてことは全く思っているわけではないんですがFunctions自体は好んで使ったりしてます。

Functionsではキュートリガーであったりブロブトリガーなどはよく使うかと思います。

キュートリガー系の設計を設計して行った時に以下のような課題にぶつかったりする事があります。

  • トリガーで動作するFunctionの中で外部サービスへアクセスを行なっている。
  • 外部サービスが障害により停止した時にもFunctionは実行されてしまう。
  • FunctionはExceptionが発生するのでキューはいずれポインズンキュー or 配信不能キューに移動する
  • 失敗する事が確定しているのに無駄にリソースが消費されてしまう

この様なケースに対応するクラウドなサービスパターンはサーキットブレーカーパターンでしょう。

github.com

www.slideshare.net

Azure Functionsでサーキットブレークを行おうとすると、おそらく二つの方法が思いつきます。

  1. Application Insights等でFunctionの実行状況を監視してエラーが閾値を超えた場合にFunction App自体を停止する。
  2. そういう風な仕組みを持ったトリガーを自作する。

今回は前回の記事

tamafuyou.hatenablog.com

を応用して、サーキットブレークするトリガーの作成してみたいと思います。

なお、今回サーキットブレークを厳密に実装しているわけではなくなんちゃってサーキットブレーカーである事をはじめに宣言しておきます。

本題

今回の成果物

github.com

SErviceBus・・・・・

解説

実装検討

まず最初に検討した実装方法は「SIngletonAttribute的な感覚でCircutiBreakerパターンを行うことはできないだろうか」ということだったのですが、SingletonAttribute自体WebJobsのコアに食い込んで実装されているため同様な実装を行うためにはかなりのコストがかかってメンドくさいのでボツ。

じゃあトリガー系でよく使われるのはAzure Storage QueueのトリガーとServiceBusのキュートリガーなので「継承していい感じにできないかな」と考えたんですがほとんどの実装がinternalとなっているため無理(・ω・ノ)ノ!

どちらの方が再実装が楽かなぁとコードを見てみた結果、ServiceBusの方が実装シンプルだったので、今回はServiceBusのキュートリガーを作成してみることにしました。

ServiceBusTrigger

azure-webjobs-sdk/src/Microsoft.Azure.WebJobs.ServiceBus at dev · Azure/azure-webjobs-sdk · GitHub

この辺がMicrosoftで作られているServiceBusTriggerのコードです。

現状のdevブランチはVersion3.0系の開発が行われています。 3.0系は.net standardに対応するためにServiceBusTriggerで使われているServiceBusClientのパッケージが

www.nuget.org

となっています。

version2.0系は

www.nuget.org

こちらのパッケージを使用しています。

で、今回どっちを使おうかなぁって考えたのですが、Microsoft.Azure.ServiceBusの方は今まで使った事がなかったので、今回はこちらで実装してみることにしました。

解説

ServiceBusTriggerの実装を見てみるとやっていることは実にシンプルでMessageReceiverでメッセージ受信を行なってメッセージを受信したらFunctionを実行して終わったらComplete or Abandonする。という事をしています。

ちなみにStorageQueueトリガーの場合にはループ処理にてキューの取得を行なってキューが存在していたらFunctionを実行する様な処理をしています。

Triggerの作り方とかは前回の記事で解説していますので今回は実装のキモであるIListenerの実装部分のみ解説します。

SErviceBusTriggerWithCircuitBreaker/ServiceBusQueueListener.cs at master · yuka1984/SErviceBusTriggerWithCircuitBreaker · GitHub

とは言ったものの、解説するほど大したことはしていなくて

Functionの実行結果が成功でなかった場合、ErrorCountをインクリメントして閾値をオーバーした場合に一旦受信を停止します。

停止した際にオープン状態時間経過後に再度受信を開始します。

エラーが一定時間経過間に発生しなくなった場合にはエラー状態をリセットして正常に復帰します。

サーキットブレーカーと書いていますがハーフオープンなどもなく違う感じなのですがブレーカーが落ちるっていうところでサーキットブレークとさせてください(´・ω・`)

動作画面は非常に地味なため載せません。(´・ω・)(´-ω-)) ペコリなさい

こんな感じで自作したトリガーを用いてデザインパターンを使って課題を解決するっていう事ができます。

いやぁAzure Functionsって本当に良いものですね、それではヾ(´д`)ノシ