眠いしお腹すいたし(´・ω・`)

C#関連を主に書きます。掲載内容は個人の見解であり、所属する企業を代表するものではありません。

Durable Functionsで学ぶクラウドデザインパターン -Idempotency Key-

ごきげんよう、みなさま。

いかがお過ごしでしょうか?

2019年になりました。

Azure FunctionsはすでにV2が基本となり、Durable Functionsのバージョンは1.7.1へ、以前に比べてパフォーマンスもだいぶ良くなり、ますます使うしかないDurable Functions。

みなさま、張り切ってでゅらぶって参りましょう!!

という事で、今回は私自身に学びがあった内容の記録としてDurable Functionsで学ぶシリーズをやっていこうと思います。

とは言いましても今回はタイトルにはクラウドデザインパターンとありますがクラウドデザインパターンとは少し違います。

SOAとかそっちよりなんですかね、実際。あんまりよくわからないですが、でも必要な箇所は多いパターンだと思います。

じゃあなんでクラウドデザインパターンってタイトルにしているんだって?

それは、統一性というやつですよ(^_^;)?

あとSEO的なね。

さて、本シリーズ第4弾となります今回は「Idempotency Key」というパターンについて学んでいきたいと思います。

まえおき

みなさん、APIにPOSTやらPUSHやらPUTなんかの登録・更新リクエストとかって投げまくってますよね?

その時に1回しかリクエストしてはいけない。みたいなケースってないですか?

まあ世の中の登録や更新リクエストは1回しか投げちゃいけないことが一般的だと思います。

でも実際に本当に必要なリクエストを1回しか投げないというのは難しかったりします。

そんなの簡単でしょ?って。

じゃあローカルでDurable Functionsのデバックを行なってActivity内でAPIをコールした後にブレークポイントを仕掛けて止まったらそのままプロセスを終了してみてください。

そしてもう1度デバッグを開始して放置するのです・・・・・・2回目、投げられちゃいますよね?

無理やりじゃなくても起こり得ます。

例えばAPIリクエスト中に突然のネットワーク切断、なんて起こったらAPI側は処理を実行してくれているのに結果を受け取れません。

それが処理されたのか、処理される前にネットワーク切断が起きたのか? 

そうなったらもうわかりません。闇の中です。

絶対に2回要求を投げないという設計は結構無理があるのです。

では、それに対応するためのデザインパターンは?ってなった時にIdempotency Key パターンが出てくるのです。

ただしこのパターン。API側のデザインパターンであって要求を出す側のデザインパターンではありません。

なので、世の中のAPIがIdempotency keyを採用してくれるといいなぁということで今回はAPI側の実装を行なってみました。

実際Durable FunctionsのOrchestrator起動にAPIを公開するようなケースでは用意しておいた方が良いと思います。

Idempotency Key

Idempotencyは日本語で冪等性になります。

冪等性、Durable Functionsを扱う上では大変重要な言葉です。

Wikipediaによれば

冪等性 = "大雑把に言って、ある操作を1回行っても複数回行っても結果が同じであることをいう概念である"

らしいです。

このIdempotency Key パターンを大雑把に言ってしまえば

要求の中にKeyを指定すると、同じキーのリクエストの処理は1度しか実行せず返答結果は常に同じになる。

というものです。

要求を出す側で要求を出す前にリクエスト用のキーを生成して保存しておいてね。

もし要求したか分からなくなっちゃても、同じキーをつけてリクエストしてくれればこっちで返すべき返答をちゃんとコントロールしておくよ。

ってな感じです。

このデザインパターンが採用されている有名どころというとStripeという決済サービスのAPIが有名です。

私もStripeの実装を調べている中で、このパターンを学習しました。

日本の決済系のサービスも是非採用していただきたい😤パターンです。

サンプル

github.com

OrchestratorとかActivityは特に中身のないものをになっています。

Orchestratorを起動するAPIに工夫が色々盛り込まれている感じです。

解説

では早速解説に参りましょう。

まずはクライアントが送信するリクエスト内容

public class RequestModel
        {
            public string UserId { get; set; }

            public string IdempotencyKey { get; set; }

            public string ProductId { get; set; }

            public long Amount { get; set; }

            public bool GetValidate()
            {
                return
                    !string.IsNullOrEmpty(UserId)
                    &&
                    !string.IsNullOrEmpty(IdempotencyKey)
                    &&
                    !string.IsNullOrEmpty(ProductId)
                    &&
                    Amount > 0;
            }
        }

まぁなんかそれっぽくしてあります。

APIの最初はリクエスト内容の処理です。

そしてforループに入ります。

このループはIdempotency keyの一意性を確保するためのリトライループです。

ループ内ではまずStorage tableからUserId, IdempotencyKeyによってTableEntityを取得しています。Line49

このTableEntityは以下のような定義です。

        public class IdempotencyKeyTableEntity : TableEntity
        {
            public string TaskHubName { get; set; }

            public string InstanceId { get; set; }
        }

PartitionKeyにはUserId、RowKeyにはIdempotencyKeyを指定しいます。

ここでわかるように今回の実装ではUserId + IdempotencyKeyによって冪等性が保証される設計になっています。

場合よってはIdempotencyKeyのみで保証するパターンもあり得ますしリクエスト内容全てをハッシュにしてキーとして使用することでリクエスト内容 + IdempotencyKeyで冪等性を保証する設計にするパターンもあるかと思います。

そしてこのテーブルにはTaskHubNameとInstanceIdのフィールドがあります。

InstanceIdはわかりますがなぜTaskHubNameを保存するのでしょう?

それはAPIの更新によってTaskHubNameが変更された場合にも冪等性を確保するためです。

では過去のTaskhubのDurable Functionsの結果をどのように呼び出すのでしょうか?

それにはBinderを使用します。

Binderは関数の引数に設定することで取得でき、 Binderを使用すると関数内にて入出力バインドを得ることができます。

今回は保存されているTaskHubNameで作成されたDurableOrchestrationClientBaseを取得しています。Line58

有効なIdempotencyKeyが見つかった場合にはTaskHubNameとInstanceIdを元にCreateCheckStatusResponseにて結果を返答しています。

そして見つからなかった場合、まずIdempotencyKeyTableEntityをInsertしに行きます。

ここで大切なことはInsertの結果、409(コンフリクト)レスポンスが起き得るということです。

これをキャッチした場合は並列に同じリクエストを受け付けたようなケースが想定されます。

409をキャッチしたらcontinueしてループな最初から実行し直して行きます。

その後はOrchestratorを起動し作成されたInstanceIdをIdempotencyKeyの保存情報に更新して返答をします。

今回はInstanceIdの作成をDurable Functionsに任せていますが自分で作成してしまえば、最後のInsertMergeは必要なくなります。

終わりに

いかがでしたでしょうか。

割と簡単にIdempotency key パターンが提供できたかと思います。

もちろん超大規模な環境だったりした場合にはもっと速度を意識した実装にする必要があると思います。

世の中のAPIたちが色々といい感じになる世の中が早く来ると良いですね。2019年ですがcsv連携とかやりたくないものですね。

それではまた次回をお楽しみに、サヨナラ、サヨナラ、サヨナラ!

Xamarin.AndroidのArchitecture Componentsを試してみた。

すいません。アドベントカレンダー用にやっていた内容、落としました。

記事が内容薄くなってしまい大変申し訳なく思っております。

やろうとしていたことは、「Architecture ComponentsのWorkManagerをXamarin Androidで試してみる」という内容だったのですが、苦手な「Native Binding」にどハマりしまして、どうしてもうまく解決ができなく時間が無くなってしまいました。

既に40時間以上くらいつぎ込んでしまっているので、何とかして年内には完成させたいと思います。

ということで、代理的な内容になるのですが

現在Nugetに公開されているArchitecture Componentsのパッケージを試してみようと思います。

現在Nugetに公開されているパッケージは

NuGet Gallery | Packages matching Xamarin.Android.Arch

現在存在するArchitecuture Componentsのなかの

  • Lifecycle
  • VIewModel
  • LiveData

になります。

2017年に発表されていた中では

  • Room

は、公開されていません。

Roomは代用になるパッケージや実装があるので公開されていないのかなぁと思われます。

ただ、今年発表された

  • Navigation
  • Paging
  • WorkManager

にはRoomが依存してます。

今回試したかったWorkManagerはバックグラウンドタスクの管理コントロールを行うためのパッケージで、色々と使いどころが多そうなのでRoomの公開をして頂ければなぁと思ったりしています。

では、現在公開されている3つのパッケージを見ていきましょう。

LiveData

LiveDataはわたし的に説明するならLifeCycleを意識できるReactivePropertyというところです。

ApplicationのLifeCycyleの状態によって値を流す流さないを制御できるObservableです。

Java/KotlinではLiveDataとして使用できます。

ただXamarinのLiveDataパッケージにはGenericタイプがありません。

よってobjectで値をやり取りする感じになります。この辺は使いづらい感じです。

LiveDataはIObservable型というわけでは当然ないのですがTransformというReactive的な操作を行うクラスがあります。

Map関数はいわゆるSelelctです

Transformations.Map(LiveData data, IFunction func);

ただ、第2引数がIFunctionという使いにくい型なので

public class AnonymousFunction : Java.Lang.Object, IFunction
    {
        private readonly Func<Java.Lang.Object, Java.Lang.Object> _func;
        public AnonymousFunction(Func<Java.Lang.Object, Java.Lang.Object> func)
        {
            _func = func;
        }
        
        public Java.Lang.Object Apply(Java.Lang.Object p0) => _func(p0);
    }

こんなクラスを作っておくと良いかもしれません。

SwitchMap関数はネットワーク処理などを介して値を変換するような処理に向いているかと思います。

単純に流すだけでなく第2引数の関数が完了するまで値を通さなくする性質があるみたいです。

ViewModel

ViewModelはActivety Fragmentの生存期間を乗り越えて生きてくれるようです。

基本的にはAndroid.Arch.Lifecycle.ViewModelクラスを継承してViewModelを作成します。

ViewModelではApplicationContextやViewのインスタンスは受け取りません。リークになります。

Applicationを受け取りたい場合にはAndroidViewModelクラスを継承すると良いようです。

FragmentActivityのOnCreateにてViewModelProvidersからViewModelを作成すれば、OKです。

        protected override void OnCreate(Bundle savedInstanceState)
        {
            base.OnCreate(savedInstanceState);

            // Set our view from the "main" layout resource
            SetContentView(Resource.Layout.activity_main);

            var vm1 = ViewModelProviders.Of(this).Get(Class.FromType(typeof(ProductViewModel)));

            var vm2 = ViewModelProviders
                .Of(this, new ProductViewModel.ProductViewModelFactory(this.Application, 5))
                .Get(Class.FromType(typeof(ProductViewModel)));
        }

LifeCycle

ライフサイクルは上の二つのベースとなる概念とライブラリです。

イベントとステートという2種類の機能によって、LiveDataのLifeCycle管理であったりViewModelの生存期間の管理などを行います。

ILifecycleObserverというインターフェースを実装するのですが、このILifecycleObserver、何も定義されてません。

public class MuListener : Java.Lang.Object, ILifecycleObserver
    {
        [Lifecycle.Event.OnStop]
        [Export]
        public void Stopped()
        {
            
        }

        [Lifecycle.Event.OnStart]
        [Export]
        public void Started()
        {
            
        }
    }

このような感じでAttributeを用いるようです。

このようにして様々なLifeCycleの管理させたいクラスに関数を持たせることができるようです。

そしてLifecycleOwnerというLifeCycle管理の親クラスにAddObserverすることによってLifeCycleの変化によって関数が呼び出されるようになります。

var listener = new MyListener();
 this.Lifecycle.AddObserver(listener);

基本的には2017年の最初のコンセプトは変化なく、これらの土台の上に2018年に発表されたWorkManagerとうの新しいコンポーネントが追加されたようです。

これらの手順に沿うことで、Xamarin Androidでもバグが起こりにくいアプリケーションの作成を行えるかと思われます。

ただ、LiveDataを始めとしてC#では少し扱いにくい面が見られます。

ReactiveExtensions / ReactivePropertyなどと組み合わせを行い便利にできる可能性があると思われます。

以上

本題を落としてしまい、あまり良い内容で書けませんでしたこと、申し訳なく思います。

<m(__)m>

Durable Functionsで学ぶクラウドデザインパターン -Scheduler Agent Supervisor Pattern-

長らくお待たせ致しました?

Durable Functionsで学ぶクラウドデザインパターンシリーズ

前回の記事を書いてから、Durable FunctionsはGAされて(2018.06.03現在、V1のみ V2はPreview)現在のバーションは1.4.1。

その間にDurable Functionsの生みの親、Chris Gillum (@cgillum) | Twitterさんがde:codeで日本語で登壇したりしてます。

もう使うしかない機運漂うDurable Functions、ガンガン触っていきましょう。

そして要望が出たり問題を見つけたらイシューからのプルリクだo(・`д・´。)

さてさて今回は第3弾として「Scheduler Agent Supervisor パターン」を学んでみましょう。

Scheduler Agent Supervisor

docs.microsoft.com

さて、このデザインパターン、これまで紹介したCompensating Transaction PatternPipes and Filters Patternに比べると、解決したい問題などが回復性に寄っていて複雑です。

そうです、今流行りのワード?「回復性」です。

システムの回復性に関しては

回復性に優れた Azure 用アプリケーションの設計 | Microsoft Docs

NoOpsで高可用性・ハイスケールシステムを自律運用させよう! 実現に必要な3つのポイント【デブサミ2018】 (1/2):CodeZine(コードジン)

システム運用は自動化が常識に、NoOpsまでの歴史 | 日経 xTECH(クロステック)

色々な記事が見つかるかと思います。

個人的にシステムは作成する上で非常に重要な概念であり、これはオンプレミスだろうとクラウドだろうとサーバレスであろうと障害との付き合いは普遍的であり、その中でもっとも目指すべき方向の一つであると考えています。

もちろん「運用は人の手を使ってやれば良いのだ。そんなアーキテクチャを考えて実装するより下請けを安く買い叩けば良いのだ」なんていう考えであれば話は別ですが、現実の中で人の手をできる限り少なく運用できる事はエンジニアのQOLにも直結する大切な問題であると思うのです。

少し脱線しましたが、具体的にScheduler Agent Supervisor Patternを見てみましょう。

分散された一連のアクションを 1 つの操作として調整します。 いずれかのアクションが失敗した場合は、全体の操作が全体として成功または失敗するように、その失敗を透過的に処理しようとするか、または実行された作業を元に戻します。 これにより、一時的な例外、長期間続く障害、プロセスのエラーなどのために失敗したアクションを復旧して再試行することが可能になるため、分散システムに回復性が追加される場合があります。

ワケ⊂(´-` ) ワカ( ´-`)つ ラン♪⊂(´ヘ`)つ

もっと私みたいな日本語が不自由な人でもわかる日本語で書いてもらいたいですね( -`ω-)

仕方がないので詳細な説明の方を見て見ましょう。

アプリケーションは、その一部でリモート サービスが呼び出されたり、リモート リソースにアクセスしたりする可能性のあるいくつかの手順を含むタスクを実行します。 それぞれのステップは互いに独立しているかもしれませんが、それらを指揮するのは、タスクを実装するアプリケーションのロジックです。

要するに、アプリケーション(サービス)は処理の中でサービスのリポジトリにだけアクセスするのではなくて外部のAPIを呼び出したりする事もありますよね? それらをコントロールするのはアプリケーションのロジックになりますよね?

っていう事でしょう。そりゃそうでしょう。

可能な場合は常に、アプリケーションはタスクが完了まで実行されるようにし、リモート サービスまたはリソースへのアクセス時に発生する可能性のあるすべての障害を解決する必要があります。 障害は、さまざまな原因で発生することがあります。 たとえば、ネットワークが停止したり、通信が中断されたり、リモート サービスが無応答または不安定な状態になったり、おそらくリソースの制約のためにリモート リソースが一時的にアクセスできなくなったりすることがあります。 多くの場合、障害は一時的であり、再試行パターンを使用して処理できます。

アプリケーションは外部APIへのアクセスやリポジトリへのアクセスで発生する可能性がある障害を、なんとかして処理が完了するように実装する必要がありますよね?

これらの原因には通信が中断されたり外部APIが不安定であったりスロットリングに引っかかったりすることがあるよね?

多くの場合は、こういう障害は一時的に発生するものだから再試行パターンで処理可能だよね?

アプリケーションは、容易に復旧できないより永続的な障害を検出した場合、システムを整合性のある状態に復元し、操作全体の整合性を保証できる必要があります。

でも、障害が長引くような感じならシステムを問題ない状態までロールバックしたり一時停止したりしてロジック全体で辻褄が合うように戻してあげる必要があるよね?

みたいな感じでしょうか。

まぁ要約するとロジックのなかで処理毎にリトライ処理をするのは当然として、短期間リトライでなんとかならない場合には、長時間待機し障害普及ごにリトライしたり補償トランザクションで整合性を保つようにロールバックするなどの工夫ができるとCOOL!!って感じでしょうか。

これに対する解決策として

https://docs.microsoft.com/ja-jp/azure/architecture/patterns/scheduler-agent-supervisor#solution

を書いています。

はい、ここでこの文を読んで「あれ??」って思った方、それです m9⊃゜∀゜)!!

ここで言っているSchedulerとAgentをOrchestratorとActivityに置き換えて見ましょう。

あら不思議!! Durable Functionsを使うだけでSchedulerとAgentになってしますのです。

あぁ Durable Functions まじCOOL(●´∀`●)

残るはSuperVisorを用意するだけで良いのです。

サンプル

github.com

今回のサンプルはドキュメント内で例としてあげられてる

https://docs.microsoft.com/ja-jp/azure/architecture/patterns/scheduler-agent-supervisor#example

物をDurable Functionsで作成して見ました。

ECで注文を受けた場合データベースに受注情報を書き込みリモートサービスをコールする。

リモートサービスをコールする箇所でのエラーが発生した場合にリトライ処理を行う。

しかしリトライが閾値を超えた場合には一時停止しアラートを上げる。

ユーザは問題の解決後にリトライを行うことができる。

みたいな感じです。

今回は複雑なので少し図を描いてみました。

まずは正常時の処理ですが非常に単純です。

f:id:tamafuyou:20180603205720p:plain

OrderRequest関数でリクエストを受け付けて(ECウェブアプリケーションからのコールを想定)Orchestratorを起動しStorageTableに受注情報を保存し外部APIをコールして終了します。

次に外部APIをコール時にAPIをコールしリトライしても解決しなかった場合のアーキテクチャの図です。

f:id:tamafuyou:20180603210123p:plain

CallRemoteServiceが正常に終了しなかった場合、失敗レポートとしてFailReportテーブルにInstanceIdなどの情報を記録します。

その後OrchestrotorはWaitForExternalEventをして外部からRiseEventされるまで待機状態となります。

FailReportWatcher関数はTimerTriggerで動いていて定周期でFailReportテーブルを監視しています。

FailReportテーブルにデータを見つけるとSlackにアラートの通知を行います。

その際にRetry用のリンクも一緒にメッセージとして送信します。

ユーザは外部APIの障害が解決後にそのリンクをクリックします。

リンクはFailReportRetry関数のURLであり、関数内でFialReportに記述されているInstanceIdに対してRiseEventを行います。

RiseEventが行われるとOrchestratorが再び動き出しCallRemoveServiceから再開されます。

コード解説

ではOrchestratorを見てみましょう。

gist.github.com

上記のアーキテクチャを読むとなんとなくOrchestraorもわかると思います。

ポイント1

CallRemoteService Activityの呼び出し際にCallActivityWithRetryを使用しています。 これを使用する事でExponential backoffなトライ処理を行う事ができます。

ポイント2

CallRemoteServiceの呼び出しでリトライをしたけれど成功しなかった場合にWaitForExternalEventで待機状態に移るのですが、その際にタイムアウトを5日として待機しタイムアウトした場合には補償トランザクション処理を行っています。

Activityに関してなのですが各々はとてもTableにデータを書き込んだりAPIにアクセスしたりしているだけで単純ですので解説は省きます。

終わりに

どうでしょう?SchedulerAgentSupervisorパターンの理解は進みましたでしょうか?

ここまで読まれて頂けたのであれば一見するとかなり複雑そうなアーキテクチャもDurable Functionsを使用するとかなり簡潔に実装する事ができる事、Durable Functionsのパワーを実感して頂けたのではと思っております。

今回のサンプルは実戦ではもっと考慮するべき事があるのですが省いています。

実戦で使用する時にはどこまで考慮するべきか・・・検討してみるのも1つの勉強になるのではないかと思います。

いやぁ、Durable Functionsって本当にいいものですねぇ

それではまた次回をお楽しみに、サヨナラ、サヨナラ、サヨナラ!

おまけ

Durable Functionsの公式Wikiに記事のリンクを記載いただけました。

Home · Azure/azure-functions-durable-extension Wiki · GitHub

v(。・・。)イエィッ♪

リツイート抽選サービス「ついせん」をはじめました。

ツイッターリツイートで抽選を行うためのサービス「ついせん」を作りました。

twisen.azurewebsites.net

無料で使えてリツイート抽選できます。

ツイッターリツイートで抽選を行うサービスでは国内では「あたれら」くらいだったのですが、「あたれら」さんは、抽選処理を同期的に行う仕様上、TwitterAPIの7日以前の検索が行えない制約から最大7日間までの範囲での抽選企画を対象としています。

しかし、最近Twitter社さんは有料APIに力を入れていて去年の11月ごろから無制限に古いデータを検索できるようにしていています。

Twitterがより自由にデータにアクセス可能となる開発者向けプレミアムAPIをリリース - GIGAZINE

ですので有料APIを使用すれば同期的処理であっても7日縛りを突破できます。

「あたれら」さんのプログラムで有料APIを使用すればOKな感じです。

でも無料サービスで月額17,000円の有料APIを使うのは辛いし無料APIの範囲で何とかしてみたい・・・ということを考えまして「ついせん」を作成しました。

あと、Durable Functionsを使ってサービスを作ってみたかった・・・という理由もあります。

「ついせん」ではリツイートしてもらいたい対象のツイートを登録するときに、締切日時を設定します。 この締切日時は1ヶ月後でも1年後でもOKです。 「ついせん」は締切日時までリツイートしてくれたユーザを収集していきます。 そして締切日時を過ぎたら収集したユーザの中から抽選を行います。 これによって無料で7日の縛りを越えるようにしています。

またリツイート数の制限も特に設けていません。 ツイッターのSearchAPIはRateLimitが結構厳しいので15分間に180になります。 この辺も非同期に処理する仕組みなので乗り越えています。 試験では40000RTまで確認しました。

動作環境としてはAzureで動作させていてWeb Apps(F1) + SQLDatabase(Basic) + Azure Functions(従量課金) + Azure Storageっていう感じです。

現状は運用費用は月額600円くらいです。「あたれら」さんがあるのであまり使われないと思いますのでSQLDatabaseの部分を別のストレージに置き換えれば無料で運営できるかなぁとか思います。

無料で使えますのでお気軽にご利用ください。

Durable Functionsを使用したサンプル -リクエスト結果をWebHookで返答するサービスへの対応-

今回はDurable Functionsを使用した少しだけ実践的なパターンを想定したサンプル実装を行ってみたいと思います。

前置き

前置き書いていたら長くなったので良い飛ばしてください。

みなさん、サービス間の連携にはどんな仕組みをよく使いますか?

一般的にはWeb APIが多いでしょうか?

今時的だとgPRCを使用したりする事もあるでしょう。

もしかしたら専用イーサネットワーク越しDATABASE LINKなどで RDBを使用して連携しているかも知れません。もしかしたらFTPサーバにcsvファイルをアップロードして連携するなんて事も2018年の今でも現在でも存在しているかも知れません。もしかしたらcsvファイルがshift-jisで書かれていたりeuc-jpで書かれていたり、どこかの企業が拡張した文字コードで書かれていたりするかも知れません。もしかしたらH○LFTなどの連携サービスを使用しているかも知れません。

Web APIでのサービス間連携は同期的であればとてもシンプルです。

1セッション内でリクエストとレスポンスが行われます。

しかし1処理に時間がかかる場合にはWeb APIによる連携は最善とは言えないかも知れません。

リクエストを受ける側は同期的に処理を実行するという制約がかけられる事によって実装難易度が上がってしまったりスケールアウトが難しくなってしまったりするかも知れません。

ロングランニングな処理に対する解決方法としては非同期なAPIを設計する場合があります。

リクエストを受け付けた時に処理IDを発行して返答し、リクエストを行なった側は処理IDを使用して結果取得APIをコールする事で結果を取得する形式です。

Durable FunctionsのOrchestratorは正にその振る舞いをします。

しかしこの形式はリクエストを送る側が結果取得APIをポーリングしなければならないという問題点があります。

そこで最終的に辿り着くパターンとしては処理が終わった時にリクエストを送信した側に何らかの方法で結果を通知する方法です。

良くある方式としてはリクエストを送信する側も結果通知リクエストを受け付けるエンドポイントを用意しておく所謂WebHook的な連携です。

今回は1リクエスト1フックなAPIへのリクエストをDurable Functionsの外部イベントを使用して処理するサンプルを作成してみたいと思います。

成果物

github.com

解説

docs.microsoft.com

人による操作または他の外部トリガーを処理するときに便利です

と書かれている通り外部トリガーというのが今回でいうWebHookであったりします。

まぁWebHookじゃなくてもFTPサーバに結果CSVをおいて(ry

WebHookOrchestrator

それではリクエストを行う側のOrchestrationを見てみましょう。

gist.github.com

まずはHookApiRequestActivityをコールしてAPIリクエストを行なって処理IDの取得をおこなっています。

gist.github.com

処理は単純です。

次にSaveRequestKeyActivityをコールして取得した処理IDを自身のInstanceIdとセットにしてStorageTableの保存しています。

gist.github.com

次にWaitForExternalEventを使用して外部から結果を発火されるのを待ちます。

最後にOrchestrateの結果として外部からの結果を返答します。

HookRecieve

次にサービスからWebHookを受け付けるFunctionを見てみます。

gist.github.com

WebHookによってリクエストされた情報の中からInstanceIdを取得してStorageTableに問い合わせを行い、リクエストを行なったOrchestratorのInstanceIdを取得。

そのInstanceIdでRaiseEventAsyncをコールします。

RaiseEventAsyncをコールするとWaitForExternalEventで待機していたOrchestratorが動き出してリクエストを行なったOrchestratorの処理が終了します。

終わりに

今回のサンプルであればリクエストを行う部分とWebHookを受ける部分を別々に処理したとしても問題がありません。

しかしAPIリクエストが処理の一部に過ぎず、その結果を使用してさらに複雑に処理を行なっていくようなパターンでは今回のようにDurable FunctionsのOrchestrationで実装していく方が楽になるかとおもいます。

また今回のパターンではWaitForExternalEventで待機した状態でWebHookが行われなかった場合、処理が中途半端になってしまします。

その場合のデザインパターンとしてScheduler Agent Supervisorパターンを用いるのも良いかも知れません。

その辺のサンプルはまた次回に。

それでは👋

Durable Functionsで学ぶクラウドデザインパターン -Pipes and Filters Pattern-

前回

tamafuyou.hatenablog.com

に引き続き、今回はPipes and Filters (パイプとフィルターのパターン)をDurable Functionsで実装してみます。

Durable Functionsのご紹介は

Durable Functions - Google 検索

この辺でお願いします。

Pipes and Filters

docs.microsoft.com

クラウドデザインパターンとして紹介されているのですが昔からある古典デザインパターンの一つですね。

著しく雑に表現すると、

モノシリックな大きいロジックは途中にボトルネックなロジックが含まれていても大きいロジックの単位でしか実行されないからリソースの無駄になりやすいよね。だからタスク単位に分割しよう。そうすれば再利用性も向上するし小さい単位での並列実行が可能になるからリソースの無駄を防げるかもね。しかも要件によってタスクの順序が入れ替わるようなケースでも対応できちゃうよね。しかもこのパターンにするとタスク毎に別々のコンピュータリソースで実行が可能だから障害に対する回復性も向上するよね。

って感じです。

C#のプログラム上でTask等の並列処理を使えば一つのコンピュータ上での並列実行は可能なんですけどクラウドで実行するんだったらもっとスケールを大きく行こう!! って事なんでしょうか。

今回の成果物

github.com

解説

とは言うものの、全然難しいことはしていません。 まずはActivityを1つのタスクと考えて実装します。

[FunctionName(nameof(TaskAActivity))]
        public static async Task<List<string>> TaskAActivity(
            [ActivityTrigger] List<string> results,
            TraceWriter logger
        )
        {
            logger.Info("Task A start");
            results.Add("TaskA Complete");
            return results;
        }

このようなアクティビティをA~Fまで作成しました。

次にタスクを繋ぎ合わて作成するロジックをOrchestratorと考えて実装します。

        [FunctionName(nameof(DataSource1Orchestrator))]
        public static async Task<List<string>> DataSource1Orchestrator(
            [OrchestrationTrigger] DurableOrchestrationContext context
            )
        {
            var input = context.GetInput<string>();
            var a = await context.CallActivityAsync<List<string>>(nameof(TaskAActivity), new List<string>() {input});
            var b = await context.CallActivityAsync<List<string>>(nameof(TaskBActivity), a);
            var c = await context.CallActivityAsync<List<string>>(nameof(TaskCActivity), b);
            var d = await context.CallActivityAsync<List<string>>(nameof(TaskDActivity), c);
            return d;
        }

        [FunctionName(nameof(DataSource2Orchestrator))]
        public static async Task<List<string>> DataSource2Orchestrator(
            [OrchestrationTrigger] DurableOrchestrationContext context
            )
        {
            var input = context.GetInput<string>();
            var a = await context.CallActivityAsync<List<string>>(nameof(TaskAActivity), new List<string>() { input });
            var b = await context.CallActivityAsync<List<string>>(nameof(TaskBActivity), a);
            var e = await context.CallActivityAsync<List<string>>(nameof(TaskEActivity), b);
            var f = await context.CallActivityAsync<List<string>>(nameof(TaskFActivity), e);
            return f;
        }

あとはHttpTriggerによる呼び出しを作成すれば終わりです。

呼び出し結果として

DataSource1であれば

{
    "runtimeStatus": "Completed",
    "input": "test",
    "output": [
        "test",
        "TaskA Complete",
        "TaskB Complete",
        "TaskC Complete",
        "TaskD Complete"
    ],
    "createdTime": "2018-01-22T14:54:31Z",
    "lastUpdatedTime": "2018-01-22T14:54:39Z"
}

DataSource2であれば

{
    "runtimeStatus": "Completed",
    "input": "kokoro",
    "output": [
        "kokoro",
        "TaskA Complete",
        "TaskB Complete",
        "TaskE Complete",
        "TaskF Complete"
    ],
    "createdTime": "2018-01-22T14:16:27Z",
    "lastUpdatedTime": "2018-01-22T14:16:31Z"
}

になります。

Pipes and Filtersのパターンって、そのままDurableFunctionsなんですよね。

特に難しいことはないです。

Orchestratorのコードって一見すると、そのコンピュータ上で非同期処理をしている様に見えるのですが実際にどのコンピュータで実行されているかは分からないんです。

例えばFunctionsをAppServiceプランのインスタンス x 10みたいな環境で実行したとすれば、その10台の中のどこかでActivityが実行される感じです。

コードの見た目以上にスケールの変動に容易に対応しちゃってるんですよね。

あと、例えば

Orchestratorのコードを

        [FunctionName(nameof(DataSource2Orchestrator))]
        public static async Task<List<string>> DataSource2Orchestrator(
            [OrchestrationTrigger] DurableOrchestrationContext context
            )
        {
            var input = context.GetInput<string>();
            var a = await context.CallActivityWithRetryAsync<List<string>>(nameof(TaskAActivity),new RetryOptions(TimeSpan.FromSeconds(30), 10),  new List<string>() { input });
            var b = await context.CallActivityWithRetryAsync<List<string>>(nameof(TaskBActivity), new RetryOptions(TimeSpan.FromSeconds(30), 10), a);
            var e = await context.CallActivityWithRetryAsync<List<string>>(nameof(TaskEActivity), new RetryOptions(TimeSpan.FromSeconds(30), 10), b);
            var f = await context.CallActivityWithRetryAsync<List<string>>(nameof(TaskFActivity), new RetryOptions(TimeSpan.FromSeconds(30), 10), e);
            return f;
        }

こう言う風にCallActivityWithRetryAsyncに置き換えるだけで簡単にAcitvity単位のRetryを設定することができます。

このリトライの容易さがDurable Functionsの良い点でもあり、どのように活かすかの腕の見せどころな様に思います。

要するにActivityの処理にてどの様な時にRetryを行ってどの様な時にRetryしないかを設計して実装すれば、あとはOrchestratorで如何様にもできますぜって感じなのです。

いやぁ、Durable Functionsって本当にいいものですねぇ

それではまた次回をお楽しみに、サヨナラ、サヨナラ、サヨナラ!

Durable Functionsで学ぶクラウドデザインパターン -Compensating Transaction Pattern-

先日、牛尾さん、Kanioさん主催の

www.meetup.com

に参加してきました。

Durable functionsの紹介だけではなくモブプログラミングもあり私が英語が駄目すぎることを除いてすごく楽しいセッションでした。

元々、非常に興味があるフレームワークだったのですが触ってはいなかったので、しばらくDurable functionsで遊んでみようという気になっている次第です。

今回はDurable Functionsを使ってマイクロサービスアーキテクチャなどでよく使われるCompensation Transaction Pattern(補正トランザクションパターン)を実装することで、補正トランザクションパターンを勉強してみようと思います。

## 今回の成果物

github.com

環境

Visual studio 2017 - 15.5.2

Azure Functions Cli - 1.0.7

Azure Storage Emulator 5.2

Nuget Package

https://www.nuget.org/packages/Microsoft.Azure.WebJobs.Extensions.DurableTask/1.1.0-beta2

Compensation Transaction Pattern

この資料が一番わかりやすいと思います。

www.slideshare.net

著しく雑に表現すると

全然別々のサービス間でもトランザクション処理を行わなければならないケースはあるのだけど全てのサービスを通して高レベルなロールバック処理は難しいから、どこかで失敗したら各サービスで元に戻す上書きを行おう。

という感じです。

解説

OrchestratorとActivity

Durable Funcitonsでは処理の単位としてOrchestratorとActivityが存在します。

OrchestratorはOrchestration、つまりアーキテクチャを実装します。

Activityは処理を実装します。

なのでOrchestraorでI/Oを伴う処理をすることはアンチパターンです。

例えばRestAPIをコールして結果を取得する処理を行おうとした時には

Activity関数で実際に通信する部分を実装してOrchestratorでActivityを呼び出して結果を取得する処理を実装するようなイメージです。

ここを理解した上でコードの解説行います。

StartNew

[FunctionName("HttpStartSingle")]
        public static async Task<HttpResponseMessage> RunSingle(
            [HttpTrigger(AuthorizationLevel.Function, methods: "post",
                Route = "orchestrators/HttpStartSingle/{requestId}/{point}")] HttpRequestMessage req,
            [OrchestrationClient] DurableOrchestrationClient starter,
            string requestId,
            int point,
            TraceWriter log)
        {
            var instanceId = await starter.StartNewAsync("CompensatingTransactOrchestrator",
                new PointRequest() {RequestId = requestId, Point = point});

            return starter.CreateCheckStatusResponse(req, instanceId);
        }

この関数ではHttpRequestを受けた場合に「CompensatingTransactOrchestrator」というOrchestratorを起動させます。

そしてリクエストの結果として、そのオーケストレータの「インスタンスID」に伴う情報を返答しています。

Durable Functionsではオーケストレーションを開始すると、そのオーケストレーションのIDとしてInstanceIdが発行されます。

そのInstanceIdを使用してオーケストレーションの結果を追跡することが可能となります。

このリクエストの結果はこのようになります。

{
    "id": "f0d9f24a2cc94a3e8394e6ae00c30a59",
    "statusQueryGetUri": "http://localhost:7071/admin/extensions/DurableTaskExtension/instances/f0d9f24a2cc94a3e8394e6ae00c30a59?taskHub=DurableFunctionsHub&connection=Storage&code=e5aGhnpascgIhDdfTvQozANdKlML3FlayXzx1Pr6NqRyb7gSs2aIiQ==",
    "sendEventPostUri": "http://localhost:7071/admin/extensions/DurableTaskExtension/instances/f0d9f24a2cc94a3e8394e6ae00c30a59/raiseEvent/{eventName}?taskHub=DurableFunctionsHub&connection=Storage&code=e5aGhnpascgIhDdfTvQozANdKlML3FlayXzx1Pr6NqRyb7gSs2aIiQ==",
    "terminatePostUri": "http://localhost:7071/admin/extensions/DurableTaskExtension/instances/f0d9f24a2cc94a3e8394e6ae00c30a59/terminate?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=e5aGhnpascgIhDdfTvQozANdKlML3FlayXzx1Pr6NqRyb7gSs2aIiQ=="
}

このJsonに書かれているUrlにアクセスするとオーケストレーションの結果を確認できたりオーケストレーションを途中で止めたりすることができます。

ステータス取得の結果はこんな感じになります。

{
    "runtimeStatus": "Completed",
    "input": {
        "$type": "CompensatingTransactionOrchestration.Functions+PointRequest, CompensatingTransactionOrchestration",
        "RequestId": "h",
        "Point": 100
    },
    "output": "repository1 Failure",
    "createdTime": "2018-01-21T04:06:13Z",
    "lastUpdatedTime": "2018-01-21T04:06:22Z"
}

Orchestrator and Activity

Compensating Transaction PatternなOrchestratorを見てみます。

        [FunctionName("CompensatingTransactOrchestrator")]
        public static async Task<string> CompensatingTransactOrchestrator(
            [OrchestrationTrigger] DurableOrchestrationContext context
        )
        {
            var pointRequest = context.GetInput<PointRequest>();
            var compensatingStack = new Stack<ValueTuple<string, RepositoryResult>>();

            try
            {


                var result1 = await context.CallActivityAsync<RepositoryResult>(nameof(Repository1Activity), pointRequest);
                if (result1.Succeed)
                {
                    compensatingStack.Push((nameof(Repository1CompensatingActivity), result1));
                }
                else
                {
                    return "repository1 Failure";
                }

                var result2 = await context.CallActivityAsync<RepositoryResult>(nameof(Repository2Activity), pointRequest);
                if (result2.Succeed)
                {
                    compensatingStack.Push((nameof(Repository2CompensatingActivity), result2));
                }
                else
                {
                    return "repository2 Failure";
                }

                var result3 = await context.CallActivityAsync<RepositoryResult>(nameof(Repository3Activity), pointRequest);
                if (result3.Succeed)
                {
                    compensatingStack.Push((nameof(Repository3CompensatingActivity), result3));
                }
                else
                {
                    return "repository3 Failure";
                }

                compensatingStack.Clear();
            }
            finally
            {
                while (compensatingStack.Any())
                {
                    var item = compensatingStack.Pop();
                    await context.CallActivityWithRetryAsync(item.Item1, GetRetryOprion(), item.Item2);
                }
            }

            return "Success";
        }

このFunctionはOrchestrationTriggerを使用しているのでStartNewされた時、もしくはCallSubOrchestrationされた時に実行されます。

読んでわかる通りCompensation Transactin Patternそのままです。

Repository1~3に対する処理を行うために各ActivityをCallしていきます。

しかし途中で失敗した場合には、実行が完了してしまっているRepositoryの補正用ActivityをCallして補正を行なっています。

各アクティビティの処理はこんな感じです。

        [FunctionName("Repository1Activity")]
        public static async Task<RepositoryResult> Repository1Activity(
            [ActivityTrigger] PointRequest context
            , [Table("table1")] CloudTable table
            )
        {
            var query =
                new TableQuery<PointTable>().Where(TableQuery.GenerateFilterCondition(nameof(PointTable.PartitionKey),
                    QueryComparisons.Equal, context.RequestId));

            if (!table.ExecuteQuery(query).Any())
            {
                var processId = Guid.NewGuid().ToString();
                var insertOperation =
                    TableOperation.Insert(new PointTable()
                    {
                        PartitionKey = context.RequestId,
                        RowKey = processId,
                        Point = context.Point
                    });

                var result = await table.ExecuteAsync(insertOperation);
                return new RepositoryResult()
                {
                    ProcessId = processId,
                    Request = context,
                    Succeed = true,
                };
            }

            return new RepositoryResult()
            {
                Request = context,
                Succeed = false
            };
        }

        [FunctionName("Repository1CompensatingActivity")]
        public static async Task Repository1CompensatingActivity(
            [ActivityTrigger] RepositoryResult repositoryResult
            , [Table("table1")] CloudTable table
            )
        {
            var query =
                    new TableQuery<PointTable>()
                        .Where(TableQuery.GenerateFilterCondition(nameof(PointTable.PartitionKey),
                            QueryComparisons.Equal,
                            repositoryResult.Request.RequestId))
                        .Where(TableQuery.GenerateFilterCondition(nameof(PointTable.RowKey), QueryComparisons.Equal,
                            repositoryResult.ProcessId))
                ;

            var targets = table.ExecuteQuery(query).ToArray();

            if (targets.Any())
            {
                foreach (var target in targets)
                {
                    var deleteOperation = TableOperation.Delete(target);
                    await table.ExecuteAsync(deleteOperation);
                }
            }
        }

ストレージテーブルに処理を書いて補正では書いた処理を消している感じです。

大事なこと

このOrchestratorと各Activityにブレークポイントをしかけて実行すると、とても面白い動作をしていることに気がつきます。

OrchestrationTriggerによって実行されているFunctionが何回も呼び出されるのです。

1回しかリクエストを行なっていないのに、です。

しかし各Activityは1度しか実行されません。

これがDurable FunctionsがDurableであるとされる部分です。

Orchestratorが何らかの理由で止まってしまったとしても再実行され再実行時にはすでに処理を行なっているCallActivityに関してはActivityの実際の呼び出しは行われずに結果だけ最初の実行と同じものが返されます。

常にOrchestratorFunctionは再実行されることが前提の設計となっているのです。

ここにOrchestratorでI/O処理を行なってはいけない理由にあります。

Orchestratorを実装する時にはI/OをOrchestration/Activityに限定しなければ、外部の情報の変化により結果が変化してしまうためハチャメチャな処理になってしまうのです。

終わりに

いかがでしたでしょうか?

Durable Functions 大変面白いです。

もちろん「Durable Funcitonsを使用することによるデメリットは無い」、なんてことは言えないのですが、それにしても大変面白いフレームワークであると言えると思います。

ヾ(´・∀・[.:゚+またね.: :]

おまけ

今回はHttpリクエストに対して非同期にオーケストレーションが実行されてますが

        [FunctionName("HttpStartWait")]
        public static async Task<HttpResponseMessage> RunWait(
            [HttpTrigger(AuthorizationLevel.Function, methods: "post",
                Route = "orchestrators/HttpStartWait/{requestId}/{point}")] HttpRequestMessage req,
            [OrchestrationClient] DurableOrchestrationClient starter,
            string requestId,
            int point,
            TraceWriter log)
        {
            var instanceId = await starter.StartNewAsync("CompensatingTransactOrchestrator",
                new PointRequest() { RequestId = requestId, Point = point });

            while (true)
            {
                var status = await starter.GetStatusAsync(instanceId);
                if (status?.RuntimeStatus > OrchestrationRuntimeStatus.Running)
                {
                    return new HttpResponseMessage(HttpStatusCode.OK)
                    {
                        Content = new StringContent(JsonConvert.SerializeObject(status))
                    };
                }
                await Task.Delay(TimeSpan.FromSeconds(2));
            }
        }

こんな感じにすれば一応リクエストに対して同期的に返答することも可能かです。