眠いしお腹すいたし(´・ω・`)

C#関連を主に書きます。掲載内容は個人の見解であり、所属する企業を代表するものではありません。

Azureでリクエストを1件ずつ処理してみる

ごあいさつ

風邪ひきました( д)、;‘.・

はじめに

最近、お仕事でバックエンドの設計とか実装とかしていてAzureを使っているわけですが

こういう時どうしよう、ああいう事したい時にはどうしたら?

みたいな事が沢山でてきてます。

ほとんどは同僚に聞けば解決なのですが

ちょっとは自分で考えましょう、ということで勉強のために実験してみた事を記事にしてみました。

どんなことしてみたの?

パラレルに発生するリクエストに対して順序通りに1件ずつ処理して同期的に返答するにはどうすれば良いか?

という事を考えてみました。

そもそも「そんな設計自体が間違っている」とか「アンチパターンだ」とか思われる方もいらっしゃると思います

もちろん・・・正解です。 

成果物

github.com

設計

こんなイメージ

f:id:tamafuyou:20170819005114j:plain

リクエスト・レスポンスメッセージングパターンに対して不特定な送信者に対応するためにレスポンスメッセージをトッピクによるPub/Subにした感じ。

解説

使用した材料

  • Function App (従量課金プラン) ・・・ 1個 (必須ではない)
  • Function App (AppServiceプランB1)・・・1個   
  • Service Bus (Standard) ・・・ 1個
  • Azureストレージ ・・・1個 (Function Appを作るのに必要)

下準備

ServiceBusにてキューを1個とトピックを1個作っておきます。

HttpTrigger Function

Httpリクエストを受け付けるためにHttp TriggerのFunctionsを使用します。

        [FunctionName("GetRequestTrigger")]
        public static async Task<HttpResponseMessage> Run(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestMessage req
            , ILogger log)
        {
            var request = new MessageModel();
            var namevalue = req.RequestUri.ParseQueryString();
            foreach (var key in namevalue.AllKeys) {
                if (key.Equals("a", StringComparison.CurrentCultureIgnoreCase))
                    if (int.TryParse(namevalue[key], out int a))
                        request.A = a;
                if (key.Equals("b", StringComparison.CurrentCultureIgnoreCase))
                    if (int.TryParse(namevalue[key], out int b))
                        request.B = b;
            }

            var namespaceManager =
                NamespaceManager.CreateFromConnectionString(
                    ConfigurationManager.AppSettings["servicebusInQueueConnections"]);
            var subscriptionName = Guid.NewGuid().ToString("N");

            await namespaceManager.CreateSubscriptionAsync(new SubscriptionDescription("out", subscriptionName));

            var sbclient =
                SubscriptionClient.CreateFromConnectionString(
                    ConfigurationManager.AppSettings["servicebusInQueueConnections"], "out", subscriptionName);

            var queueclient =
                QueueClient.CreateFromConnectionString(ConfigurationManager.AppSettings["servicebusInQueueConnections"],
                    "in");

            var resultJson = JsonConvert.SerializeObject(request);
            var binary = Encoding.UTF8.GetBytes(resultJson);


            var outMessage = new BrokeredMessage(new MemoryStream(binary));
            var id = Guid.NewGuid().ToString();
            outMessage.To = id;

            await queueclient.SendAsync(outMessage);

            while (true) {
                var message = await sbclient.ReceiveAsync();
                await message.CompleteAsync();
                if (message.To == id) {
                    var stream = message.GetBody<Stream>();
                    var json = new StreamReader(stream).ReadToEnd();

                    await namespaceManager.DeleteSubscriptionAsync("out", subscriptionName);
                    return req.CreateResponse(HttpStatusCode.OK, json);
                }
            }
        }

リクエストを受け付けたら、まずNamespaceManagerを使用してトピックにサブスクリプションを作成します。

そして、作成したサブスクリプションへ接続するクライアントを準備します。

次にキューへの接続クライントを作成します。

BrokeredMessageを作成して送信メッセージを作成してToプロパティにGUIDを設定し、キューへ送信します。

その後、トピックからの受信を待ちます。

トピックからメッセージを受信しToプロパティが送信時にせっていたGUID出会った場合には処理が戻ってきたという事でHttpRequestMessageを戻して終了です。

このファンクションは従量課金プランのFunction Appへデプロイします。

Service Bus Queue Trigger Function

[FunctionName("SumSBTrigger")]
        public static void SumSBTrigger(
            [ServiceBusTrigger("in", AccessRights.Listen, Connection = "servicebusInQueueConnections")] BrokeredMessage
                myQueueItem
            ,
            [ServiceBus("out", Connection = "servicebusInQueueConnections", EntityType = EntityType.Topic)] out
                BrokeredMessage outMessage, ILogger log)
        {
            myQueueItem.RenewLock();
            var stream = myQueueItem.GetBody<Stream>();
            var json = new StreamReader(stream).ReadToEnd();
            var request = JsonConvert.DeserializeObject<MessageModel>(json);


            Thread.Sleep(5000);
            request.Result = request.A + request.B;
            request.ExecuteDateTime = DateTime.Now;

            var resultJson = JsonConvert.SerializeObject(request);
            var binary = Encoding.UTF8.GetBytes(resultJson);

            outMessage = new BrokeredMessage(new MemoryStream(binary));
            outMessage.To = myQueueItem.To;
            myQueueItem.Complete();
        }

特に特別な事はなく

受信したら五秒待機してトピックにアウトプットしています。

トピックにアウトプットするBrokerdMessage.Toに受信したBrokerdMessage.Toを設定しておきます。

このファンクションがもっとも大事な点はAppServiceプランのFunction Appへデプロイするという点とhost.jsonにて以下の設定を行います。

{
  "serviceBus": {
    "maxConcurrentCalls": 1
  }
}

この設定を行う事で1つのインスタンス場でのFunctionの同時実行数が1つとなりシングルスレッド的に処理を行う事ができます。

なぜAppServiceプランのFunction App?

Function Appはスケールコントローラによって自動的にスケールアウト・スケールダウンが行われます。

従量課金プランの場合やAppServiceプランの自動スケールアウトを有効にしていた場合にはインスタンス数の固定化が行えないため負荷によってインスタンス数が増えてしまいます。

AppServiceプランで手動スケールアウトにして1インスタンスに設定し、maxConcurrentCalls = 1に設定する事で完全に1つのプロセスでキューの処理を行う事ができます。

おわりに

こんな事しなくて良い設計を行う事が大事だと思います。

(ノ゚Д゚)八(゚Д゚ )ノイエーイ