眠いしお腹すいたし(´・ω・`)

C#関連を主に書きます。掲載内容は個人の見解であり、所属する企業を代表するものではありません。

Azure Functionsでサーキットブレーカー付きなService Bus トリガーを作ってみる

あいさつ

もうすぐ年末ですがAdventカレンダーの進捗はいかがでしょうか?

私は全くです(ノ゚Д゚)八(゚Д゚ )ノイエーイ

そしてここでお知らせがございます。

クリスマス中止のお知らせ

2017年12月24,25日に開催予定のクリスマスは、隣国の情勢の悪化に伴い中止となりました。

本決定により、クリスマスイブも中止になります。

中止、ならびに本告知が遅れたことにつきまして、楽しみにしておられた方々、及び関係者各位には謹んでお詫び申し上げます。

はじめに

みなさん、Azure Functionsは使ってますか?

ビバ!サーバーレス!! 素晴らしきかなサーバーレス!! 

サーバレスで作成すマイクロサービスアーキテクチャでAIを利用すればOK(∩´∀`)∩

なんてことは全く思っているわけではないんですがFunctions自体は好んで使ったりしてます。

Functionsではキュートリガーであったりブロブトリガーなどはよく使うかと思います。

キュートリガー系の設計を設計して行った時に以下のような課題にぶつかったりする事があります。

  • トリガーで動作するFunctionの中で外部サービスへアクセスを行なっている。
  • 外部サービスが障害により停止した時にもFunctionは実行されてしまう。
  • FunctionはExceptionが発生するのでキューはいずれポインズンキュー or 配信不能キューに移動する
  • 失敗する事が確定しているのに無駄にリソースが消費されてしまう

この様なケースに対応するクラウドなサービスパターンはサーキットブレーカーパターンでしょう。

github.com

www.slideshare.net

Azure Functionsでサーキットブレークを行おうとすると、おそらく二つの方法が思いつきます。

  1. Application Insights等でFunctionの実行状況を監視してエラーが閾値を超えた場合にFunction App自体を停止する。
  2. そういう風な仕組みを持ったトリガーを自作する。

今回は前回の記事

tamafuyou.hatenablog.com

を応用して、サーキットブレークするトリガーの作成してみたいと思います。

なお、今回サーキットブレークを厳密に実装しているわけではなくなんちゃってサーキットブレーカーである事をはじめに宣言しておきます。

本題

今回の成果物

github.com

SErviceBus・・・・・

解説

実装検討

まず最初に検討した実装方法は「SIngletonAttribute的な感覚でCircutiBreakerパターンを行うことはできないだろうか」ということだったのですが、SingletonAttribute自体WebJobsのコアに食い込んで実装されているため同様な実装を行うためにはかなりのコストがかかってメンドくさいのでボツ。

じゃあトリガー系でよく使われるのはAzure Storage QueueのトリガーとServiceBusのキュートリガーなので「継承していい感じにできないかな」と考えたんですがほとんどの実装がinternalとなっているため無理(・ω・ノ)ノ!

どちらの方が再実装が楽かなぁとコードを見てみた結果、ServiceBusの方が実装シンプルだったので、今回はServiceBusのキュートリガーを作成してみることにしました。

ServiceBusTrigger

azure-webjobs-sdk/src/Microsoft.Azure.WebJobs.ServiceBus at dev · Azure/azure-webjobs-sdk · GitHub

この辺がMicrosoftで作られているServiceBusTriggerのコードです。

現状のdevブランチはVersion3.0系の開発が行われています。 3.0系は.net standardに対応するためにServiceBusTriggerで使われているServiceBusClientのパッケージが

www.nuget.org

となっています。

version2.0系は

www.nuget.org

こちらのパッケージを使用しています。

で、今回どっちを使おうかなぁって考えたのですが、Microsoft.Azure.ServiceBusの方は今まで使った事がなかったので、今回はこちらで実装してみることにしました。

解説

ServiceBusTriggerの実装を見てみるとやっていることは実にシンプルでMessageReceiverでメッセージ受信を行なってメッセージを受信したらFunctionを実行して終わったらComplete or Abandonする。という事をしています。

ちなみにStorageQueueトリガーの場合にはループ処理にてキューの取得を行なってキューが存在していたらFunctionを実行する様な処理をしています。

Triggerの作り方とかは前回の記事で解説していますので今回は実装のキモであるIListenerの実装部分のみ解説します。

SErviceBusTriggerWithCircuitBreaker/ServiceBusQueueListener.cs at master · yuka1984/SErviceBusTriggerWithCircuitBreaker · GitHub

とは言ったものの、解説するほど大したことはしていなくて

Functionの実行結果が成功でなかった場合、ErrorCountをインクリメントして閾値をオーバーした場合に一旦受信を停止します。

停止した際にオープン状態時間経過後に再度受信を開始します。

エラーが一定時間経過間に発生しなくなった場合にはエラー状態をリセットして正常に復帰します。

サーキットブレーカーと書いていますがハーフオープンなどもなく違う感じなのですがブレーカーが落ちるっていうところでサーキットブレークとさせてください(´・ω・`)

動作画面は非常に地味なため載せません。(´・ω・)(´-ω-)) ペコリなさい

こんな感じで自作したトリガーを用いてデザインパターンを使って課題を解決するっていう事ができます。

いやぁAzure Functionsって本当に良いものですね、それではヾ(´д`)ノシ

Azure Functionsのカスタムトリガーの作り方を調べてみた

ちょっと前にカスタムの出力バインディングの作り方

tamafuyou.hatenablog.com

を調べたのですが、今回は独自なカスタムのトリガーの作成方法を調べて見ました。

今回の成果物はこちら

github.com

今回参考にしたサイトはこちら

GitHub - Azure/azure-webjobs-sdk: Azure WebJobs SDK

それでは早速

今回はSlackのReal Time Messaging API

api.slack.com

を用いてSlackでメッセージを書き込んだらファンクションが実行されるトリガーの作成を行いました。

Slackでこういう事をする場合にはOutgoing WebHooksを使うのが普通ですが今回はトリガーの学習なのでRTMを使ってみました。

RTMはWebSocketを使用したリアルタイムAPIなのですが、これまで実装するとなるとちょっとめんどくさいので今回はSlackConnectorというライブラリを使用しました。

www.nuget.org

この辺を見てもらえればわかるかと思います。

それでは解説ヾ(`・ω・´)ノ

作るべきクラス

作るべきクラスは以下

  • IExtensionConfigProviderの実装クラス
  • ITriggerBindingProviderの実装クラス
  • ITriggerBindingの実装クラス
  • IListenerの実装クラス
  • パラメータ用のArributeクラス

になります。

Attributeクラス

実装はSlackMessageTriggerAttributeクラスです。

SampleProjects/SlackMessageTriggerAttribute.cs at master · yuka1984/SampleProjects · GitHub

本当にシンプルで十分です。パラメータに付けるためのAttributeでプロパティにAccessTokenを持たせています。このAccessTokenをキーに環境変数から値を引っ張って来てRTMへの接続に使用します。

IExtensionConfigProviderの実装クラス

実装はSlackMessageExtentionConfigクラスです。

SampleProjects/SlackMessageExtentionConfig.cs at master · yuka1984/SampleProjects · GitHub

このインターフェースの実装クラスのInitialize関数はFuncgionsのホストがスタートした時に呼び出されます。この関数内でITriggerBindingProviderを登録してあげる事でホストがファクションのパラメータバインディングを行う時に登録したProviderのTryCreateAsyncが呼び出される様になります。

ITriggerBindingProviderの実装クラス

実装クラスはSlackMessageTriggerAttributeBindingProviderです。

SampleProjects/SlackMessageTriggerAttributeBindingProvider.cs at master · yuka1984/SampleProjects · GitHub

先ほど説明した様にホストが関数を捜査してパラメータをバインディングする際にTryCreateAsync関数が呼び出されます。

gist.github.com

ですのでパラメータのAttributeが正しければITriggerBindingのインタンスを返してあげて関係ないAttributeであればnullを返してあげればOKです。

ITriggerBindingの実装クラス

実装はSlackMessageTriggerBindingクラスです。

SampleProjects/SlackMessageTriggerBinding.cs at master · yuka1984/SampleProjects · GitHub

このクラスの主な仕事はリスナークラスのインタンス生成とリスナーによってファンクションの実行が指示された際にリスナーからのデータを パラメータにバインドする事です。

gist.github.com

CreateListnerAsync関数は単純です。IListnerの実装を返してあげれば良いです。

gist.github.com

IListenerの実装クラス

実装クラスはSlackMessageListenerです。

SampleProjects/SlackMessageListener.cs at master · yuka1984/SampleProjects · GitHub

IListnerはStartAsync, StopAsync, Cancel, Disposeを実装する必要があります。

今回は単純に何も考えずStartAsyncでRTMへのコネクションを行いStopAsyncで切断を行なっています。

そして引数で受け取ったITriggeredFunctionExecuterのTryExecuteAsync関数をSlackメッセージを受信した際に実行しています。

TryExexuteAsync関数を実行するとTriggerBindingのBindAsync関数を経由してFunctionが実行される流れになります。

例えばServiceBusTriggerなどはTryExecuteAsync後に実行結果に応じて関数の実行が失敗していたらAbandonAsync、成功していたらCompleteAsyncを実行しています。なので関数実行後にキューが完了していたりするわけです。

実際にサンプルを実行すると

f:id:tamafuyou:20171029153028p:plain

f:id:tamafuyou:20171029153047p:plain

こんな感じで実行されたりします。

なお、今回の実装はあくまでもサンプルですので色々と抜けています。

インスタンスがスケールすると1つのメッセージで複数回ファンクションが実行されてしまいます。

ですのでこの感じの実装を使用する際にはSIngletonAttributeをリスナーモードで使用するか、それと同等の実装を行う必要があります。

Azure Functionsでトリガーが作れると夢が広がりますね

(ノ゚Д゚)八(゚Д゚ )ノイエーイ

ではでは

Azure Functions の SingletonAttributeとModeプロパティ

今日職場でSingletonAttributeのModeプロパティに関する話が出てたので検証してみました。

ソースコードはこちら

github.com

gist.github.com

TriggerTest1はSingleton(Mode=Function)

TriggerTest2はSingleton(Mode=Listner)

TriggerTest3はSingletonなし

動作環境はAppServiceプラン(S1 インスタンス数を10に固定)です。

まずはTest3のSingletonなしの結果から

f:id:tamafuyou:20171029151501p:plain

同じ時間に並列実行されています。またPartitionKeyもバラバラです。PartitionKeyにはstatic Lazyで作成されたGuidを設定していますのでGuidの違いは実行インスタンスの違いを表しています。要するにSingletonを付けないキュートリガー関数は複数のインスタンスでそれぞれ並列処理が行われる感じになります。今回の場合は10インスタンスなので10インスタンス * デフォルトサイズ16 なので、最大同時実行数は160になります。

続いてTest1の結果はこんな感じ

f:id:tamafuyou:20171029151547p:plain

Timeを見てみるとおおよそ秒毎に処理が実行されていますので、まさにシングル動作です。またPartitionKeyをみると結構バラバラです。

SingletonをFunctionモードで使用すると複数インスタンスであっても同時実行数1が保証される感じになります。

次にTest2の結果

f:id:tamafuyou:20171029151646p:plain

ingletonを使っているのに同じ時間帯にFunctionが実行されています。しかしPartitionKeyに注目してください。全て一緒です。

SingletonをListnerモードで使用すると単一のインスタンスでの実行が保証されます。今回の場合にはバッチサイズはデフォルトなので最大同時実行数は16になります。

ではListnerモードはどのように有効活用すると良いか? という事ですが、私自身、これだ!!っていう決定的な活用法は思いついていないのですが

qiita.com

こちらで書いたような同時実行の細かな制御を行うのに良いのかなぁとか思っています。

Azure Functionsのカスタム出力バインディングの作り方を調べた

Azure Functionsのバインディングの自作方法を調べてみました。

結論から言うと

github.com

このページに全て書いてあります。

ですので、ここからは私の勉強メモということで

github.com

こちらに出力バインディングツイッターでツイートするサンプルを作ってみました。

出力バインディングを指定するパラメータAttributeの作り方

Microsoft.Azure.WebJobs.Description.BindingAttribute が付いたAttributeを作成すればOK。

作成するカスタムAttributeのプロパティにAppSettingAttributeを付けるとプロパティの文字列でConfigurationManager.AppSettingsのキー名で検索を行って値を設定してくれる。接続文字列、ユーザ・パスワードなどのプロパティに設定しておくとよいです。

出力バインディングするためのクラスの作り方

出力バインディングするためのクラスはIAsyncCollectorを実装したクラスを作成するかStreamを実装したクラスを作成します。

今回は簡単なIAsyncCollectorを実装しました。

SampleProjects/TweetAsyncCollector.cs at master · yuka1984/SampleProjects · GitHub

IAsyncCollectorはAddAsyncとFlushAsyncを宣言していますので、この二つの関数を実装する必要があります。

FunctionsにてICollector.Add /IAsyncCollector.AddAsyncが呼ばれた際にAddAsyncが呼び出されます。FunctionsにてICollectorを引数としている場合にはSyncAsyncCollectorAdapterというICollector実装クラスにIAsyncCollector実装がラップされて実行されます。Addが呼び出された時にGetAwaiter.GetResultしてる感じです。

out string みたいな引数のケースでは後程説明しますが関数終了直後にAddAsyncが呼び出されます。 そこまで処理がすべて完了したらFlushAsyncが呼び出される感じです。

この仕組みで重要な点はIAsyncCollectorの実装では必ずしもAddAsync時に出力が行われているわけではなくFlushAsyncが終了した段階でAddAsyncされたデータが出力されていると考える必要があるということです。

ようするにAddAsyncで出力が行われるかは出力バインディングの実装次第だよ、っていうことです。

バインドを行う為の設定クラスの作り方

Attributeを作成したら次はバインド設定を行う為のクラスを作ります。

Microsoft.Azure.WebJobs.Host.Config.IExtensionConfigProviderの実装クラスを作成してFunctionsプロジェクトで読み込むことでIExtentionConfigProvider.initializeメソッドが呼び出されるようになります。

このメソッドでコンバータの登録とバインディングルールの登録を行います。 バインディングルールは出力バインディングの場合には、このAttributeが設定されている時には、このIAsyncCollector実装を呼び出すよ という設定を行います。

context.AddBindingRule<TweetAttribute>().BindToCollector<TweetMessage>(attr => new TweetAsyncCollector(attr));

Converterは様々な引数タイプに対応するために、IAsyncCollector実装のgenericタイプに変換するための処理を登録しておくイメージになります。

context.AddConverter<string, TweetMessage>(input => new TweetMessage() {Message = input});

以上な感じで実装してあげることでオリジナルな出力バインディングを作成することができます。

新しい出力バインディングを作っておいて使いまわしたい。

既存の出力バインディングの挙動が気に入らないので作りたい、なんて時に良いかもしれません。

Azureでリクエストを1件ずつ処理してみる

ごあいさつ

風邪ひきました( д)、;‘.・

はじめに

最近、お仕事でバックエンドの設計とか実装とかしていてAzureを使っているわけですが

こういう時どうしよう、ああいう事したい時にはどうしたら?

みたいな事が沢山でてきてます。

ほとんどは同僚に聞けば解決なのですが

ちょっとは自分で考えましょう、ということで勉強のために実験してみた事を記事にしてみました。

どんなことしてみたの?

パラレルに発生するリクエストに対して順序通りに1件ずつ処理して同期的に返答するにはどうすれば良いか?

という事を考えてみました。

そもそも「そんな設計自体が間違っている」とか「アンチパターンだ」とか思われる方もいらっしゃると思います

もちろん・・・正解です。 

成果物

github.com

設計

こんなイメージ

f:id:tamafuyou:20170819005114j:plain

リクエスト・レスポンスメッセージングパターンに対して不特定な送信者に対応するためにレスポンスメッセージをトッピクによるPub/Subにした感じ。

解説

使用した材料

  • Function App (従量課金プラン) ・・・ 1個 (必須ではない)
  • Function App (AppServiceプランB1)・・・1個   
  • Service Bus (Standard) ・・・ 1個
  • Azureストレージ ・・・1個 (Function Appを作るのに必要)

下準備

ServiceBusにてキューを1個とトピックを1個作っておきます。

HttpTrigger Function

Httpリクエストを受け付けるためにHttp TriggerのFunctionsを使用します。

        [FunctionName("GetRequestTrigger")]
        public static async Task<HttpResponseMessage> Run(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestMessage req
            , ILogger log)
        {
            var request = new MessageModel();
            var namevalue = req.RequestUri.ParseQueryString();
            foreach (var key in namevalue.AllKeys) {
                if (key.Equals("a", StringComparison.CurrentCultureIgnoreCase))
                    if (int.TryParse(namevalue[key], out int a))
                        request.A = a;
                if (key.Equals("b", StringComparison.CurrentCultureIgnoreCase))
                    if (int.TryParse(namevalue[key], out int b))
                        request.B = b;
            }

            var namespaceManager =
                NamespaceManager.CreateFromConnectionString(
                    ConfigurationManager.AppSettings["servicebusInQueueConnections"]);
            var subscriptionName = Guid.NewGuid().ToString("N");

            await namespaceManager.CreateSubscriptionAsync(new SubscriptionDescription("out", subscriptionName));

            var sbclient =
                SubscriptionClient.CreateFromConnectionString(
                    ConfigurationManager.AppSettings["servicebusInQueueConnections"], "out", subscriptionName);

            var queueclient =
                QueueClient.CreateFromConnectionString(ConfigurationManager.AppSettings["servicebusInQueueConnections"],
                    "in");

            var resultJson = JsonConvert.SerializeObject(request);
            var binary = Encoding.UTF8.GetBytes(resultJson);


            var outMessage = new BrokeredMessage(new MemoryStream(binary));
            var id = Guid.NewGuid().ToString();
            outMessage.To = id;

            await queueclient.SendAsync(outMessage);

            while (true) {
                var message = await sbclient.ReceiveAsync();
                await message.CompleteAsync();
                if (message.To == id) {
                    var stream = message.GetBody<Stream>();
                    var json = new StreamReader(stream).ReadToEnd();

                    await namespaceManager.DeleteSubscriptionAsync("out", subscriptionName);
                    return req.CreateResponse(HttpStatusCode.OK, json);
                }
            }
        }

リクエストを受け付けたら、まずNamespaceManagerを使用してトピックにサブスクリプションを作成します。

そして、作成したサブスクリプションへ接続するクライアントを準備します。

次にキューへの接続クライントを作成します。

BrokeredMessageを作成して送信メッセージを作成してToプロパティにGUIDを設定し、キューへ送信します。

その後、トピックからの受信を待ちます。

トピックからメッセージを受信しToプロパティが送信時にせっていたGUID出会った場合には処理が戻ってきたという事でHttpRequestMessageを戻して終了です。

このファンクションは従量課金プランのFunction Appへデプロイします。

Service Bus Queue Trigger Function

[FunctionName("SumSBTrigger")]
        public static void SumSBTrigger(
            [ServiceBusTrigger("in", AccessRights.Listen, Connection = "servicebusInQueueConnections")] BrokeredMessage
                myQueueItem
            ,
            [ServiceBus("out", Connection = "servicebusInQueueConnections", EntityType = EntityType.Topic)] out
                BrokeredMessage outMessage, ILogger log)
        {
            myQueueItem.RenewLock();
            var stream = myQueueItem.GetBody<Stream>();
            var json = new StreamReader(stream).ReadToEnd();
            var request = JsonConvert.DeserializeObject<MessageModel>(json);


            Thread.Sleep(5000);
            request.Result = request.A + request.B;
            request.ExecuteDateTime = DateTime.Now;

            var resultJson = JsonConvert.SerializeObject(request);
            var binary = Encoding.UTF8.GetBytes(resultJson);

            outMessage = new BrokeredMessage(new MemoryStream(binary));
            outMessage.To = myQueueItem.To;
            myQueueItem.Complete();
        }

特に特別な事はなく

受信したら五秒待機してトピックにアウトプットしています。

トピックにアウトプットするBrokerdMessage.Toに受信したBrokerdMessage.Toを設定しておきます。

このファンクションがもっとも大事な点はAppServiceプランのFunction Appへデプロイするという点とhost.jsonにて以下の設定を行います。

{
  "serviceBus": {
    "maxConcurrentCalls": 1
  }
}

この設定を行う事で1つのインスタンス場でのFunctionの同時実行数が1つとなりシングルスレッド的に処理を行う事ができます。

なぜAppServiceプランのFunction App?

Function Appはスケールコントローラによって自動的にスケールアウト・スケールダウンが行われます。

従量課金プランの場合やAppServiceプランの自動スケールアウトを有効にしていた場合にはインスタンス数の固定化が行えないため負荷によってインスタンス数が増えてしまいます。

AppServiceプランで手動スケールアウトにして1インスタンスに設定し、maxConcurrentCalls = 1に設定する事で完全に1つのプロセスでキューの処理を行う事ができます。

おわりに

こんな事しなくて良い設計を行う事が大事だと思います。

(ノ゚Д゚)八(゚Д゚ )ノイエーイ

NugetPackage License Downloader

プロジェクトで使用しているnugetパッケージのライセンスをテキストファイルに変換してくれるサービスを作ってみました。

Downloader - NugetPackage License Downloader

1つのnugetパッケージに対して1つのテキストファイルが作成されて、zipファイルでまとまめてダウンロードすることができます。

使ってみて不具合等、要望等ありましたらIssue投げてください。

github.com

I tried to create a service that converts license of nuget package used in project into text file.

Downloader - NugetPackage License Downloader

One text file is created for one nuget package and can be downloaded in zip file at once.

If you have problems such as using, if there are demands etc. Please Issue.

TwoWayViewをnugetへ公開しました

github.com

の Xamarin Androidクローンをnugetへ公開しました。

www.nuget.org

プロジェクトサイトはこちらになります。

github.com

サンプルもありますので機会がありましたら使ってみてください。