眠いしお腹すいたし(´・ω・`)

C#関連を主に書きます。掲載内容は個人の見解であり、所属する企業を代表するものではありません。

ASP.NET Coreで作成したWebSocketサーバをAzure Service Busを使ってスケールアウトに対応してみた

はじめに

tamafuyou.hatenablog.com

前回の記事で失敗しましたので、今回はAzure Service Busを使ってスケールアウトを実装してみました。

実装

コード

今回のコードはここに保存されています。

github.com

NET CoreでのService busの利用

良い感じのパッケージがなかったので今回はメッセージの送受信は

www.nuget.org

AMQP.Net.Liteを使いました。

Service Busの操作はRest APIを使用して実装しました。

Service Bus

AzureでService Busの設定を行います。

まずはリソースグループにServiceBusを作成します。

f:id:tamafuyou:20170120010938p:plain

価格レベルは、スケールアウトにTopicを使うためStandard以上にしてください。

次にTopicの作成を行います。

f:id:tamafuyou:20170120011414p:plain

これで、今回のAzure側での操作は完了です。

では、プログラムの解説を行っていきましす。

Service Busへのメッセージの送信

aspnetcore_webspcket_sample/TopicsSender.cs at ServiceBusTopics · yuka1984/aspnetcore_webspcket_sample · GitHub

送信は特に難しい事は無くて

AmqpLiteを使えば簡単に送信できます。

        public void OnNext(ChatMessage value)
        {
            if (senderLink == null || senderLink.IsClosed)
            {
                senderLink = new SenderLink(GetSession(GetConnection()), SenderSubscriptionId, Topic);
            }
            
            var message = new Amqp.Message(JsonConvert.SerializeObject(value));
            message.Properties = new Properties
            {
                MessageId = Guid.NewGuid().ToString()
            };
            message.ApplicationProperties = new ApplicationProperties();
            message.ApplicationProperties["Message.Type.FullName"] = typeof(string).FullName;
            senderLink.Send(message);
        }

こんなイメージです。

ちなみにSenderLinkクラスのコンストラクタ引数ですが

    public abstract class  TopicsClientBase
    {
        public string NameSpaceUrl { get; set; } = "";
        public string BaseUrl => $"https://{NameSpaceUrl}/";
        public string PolicyName { get; set; } = "";
        public string Key { get; set; } = "";
        public string ConnectionString => $"amqps://{WebUtility.UrlEncode(PolicyName)}:{WebUtility.UrlEncode(Key)}@{NameSpaceUrl}/";
        public string Topic { get; set; } = "";
        protected Address GetAddress() => new Address(ConnectionString);
        protected Connection GetConnection() => new Connection(new Address(ConnectionString));
        protected Session GetSession(Connection connection) => new Session(connection);        

    }

ベースクラスがこんな感じになっていて

NameSpaceUrlにはxxxxxxxxxx.servicebus.windows.netみたいな感じのUrlを設定します。xxxxxxxの部分は作成したServiceBusの名前です。

PolicyNameとKeyは

f:id:tamafuyou:20170120020307p:plain

この辺の値を設定します。

Topicは先ほど作成したTopicの名前です。

Service Busからの受信

Service Bus Topicからの受信を行うクラスです。

aspnetcore_webspcket_sample/TopicsReciever.cs at ServiceBusTopics · yuka1984/aspnetcore_webspcket_sample · GitHub

受信開始時にTopicに送信されたメッセージを購読するためにSubscriptionと追加します。

Subscriptionの名前はGUIDを作成して使用しています。

今回はRest APIで実装していましてこんな感じです。

        private static async Task<HttpResponseMessage> CreateSubscriptionAsync(string baseAddress, string topicName,
            string subscriptionName, string token)
        {
            var subscriptionAddress = baseAddress + topicName + "/Subscriptions/" + subscriptionName;
            var client = new HttpClient();
            client.DefaultRequestHeaders.Authorization = AuthenticationHeaderValue.Parse(token);
            var putData = @"<entry xmlns=""http://www.w3.org/2005/Atom"">
                                  <title type=""text"">" + subscriptionName + @"</title>
                                  <content type=""application/xml"">
                                    <SubscriptionDescription xmlns:i=""http://www.w3.org/2001/XMLSchema-instance"" xmlns=""http://schemas.microsoft.com/netservices/2010/10/servicebus/connect"" />
                                  </content>
                                </entry>";
            return
                await client.PutAsync(subscriptionAddress, new ByteArrayContent(Encoding.UTF8.GetBytes(putData)))
                    .ConfigureAwait(false);
        }

ほとんどマイクロソフトのサンプル通りな感じです。

そして購読を追加したらReceiverLinkクラスを使用して受信を行います。

受信の方法はいくつかあってReceiverLink.ReceiveAsyncメソッドによって受信処理を行うこともできるしReceiverLink.Startメソッドでコールバックを登録する形式でも受信できます。

今回はStartメソッドを使用しました。

そしてTopicReceiveクラスにIDisposableを設定してDispose時に、先ほど作成したSubscriptionを削除するようにしています。

削除関数はこんな感じ

        private static async Task<HttpResponseMessage> DeleteSubscriptionAsync(string baseAddress, string topicName,
            string subscriptionName, string token)
        {
            var subscriptionAddress = baseAddress + topicName + "/Subscriptions/" + subscriptionName;
            var client = new HttpClient();
            client.DefaultRequestHeaders.Authorization = AuthenticationHeaderValue.Parse(token);

            return await client.DeleteAsync(subscriptionAddress).ConfigureAwait(false);
        }

削除しないとSubscriptionが残ったままになっちゃいます。

今回も送信/受信はObserver/Observableパターンで実装しています。

なのでChatServerクラスをこんな感じにします。

aspnetcore_webspcket_sample/ChatServer.cs at ServiceBusTopics · yuka1984/aspnetcore_webspcket_sample · GitHub

今回の一番の変更点はTopicReceiverクラスがIDisposableインターフェースを持っているため、TopicReceiverをメンバに持つChatServerクラスもIDisposableを実装するようにしました。

ASP.NET MVC Coreでアプリケーションシャットダウン時に処理する方法ですが、StartUpクラスのConfigureメソッドの引数にIApplicationLifetimeインターフェースの引数を追加し、

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public async void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory, IApplicationLifetime applicationLifetim)
        {
            loggerFactory.AddConsole();

            if (env.IsDevelopment())
                app.UseDeveloperExceptionPage();

            applicationLifetim.ApplicationStopping.Register(Stopping);
        }
        private void Stopping()
        {
            _chatServer.Dispose();
        }

こんな感じで処理を登録してあげることで終了開始時、終了時などに処理を実行することができます。

おわりに

SigalRを使わずにWebSocketサーバを作成することはそんなに難しくなく行うことができます。

もちろん、これまでのサンプル実装はエラー処理なども入れていないので、実践的に使用するためにはもっときちんとした実装が必要です、

ただSignalRでは大きすぎる、もっと細かくゴリゴリしたい、みたいな時にはこんな感じで実装していけばよいよっていう一端をご紹介できたかと思います。

それではまた(^^)/