眠いしお腹すいたし(´・ω・`)

C#関連を主に書きます。掲載内容は個人の見解であり、所属する企業を代表するものではありません。

ASP.NET Coreで作成したWebSocketサーバをAzure Event Hubsを使ってスケールアウトに対応してみたけど・・・

注意

2017年1月17日に大幅訂正を行いました。

今回実装してみた結果、WebSocketのスケールアウトにEvent Hubsを用いることは不適切です。

後半に何故EventHubsでスケールアウトを実装することが不適切であったかを書きました。

後日、より適切な実装を行ってみたいと思っています。

はじめに

前回の記事

tamafuyou.hatenablog.com

で、ASP.NET Coreを使ってWebSocketチャットサーバを作成してみました。

ただ、前回の実装ではスケールアウトすることができませんでした。

1プロセスで1チャットサーバとなるためマルチプロセスなサーバで動かした時にはダメな感じになってしまいますしロードバランスで負荷分散した時などもダメダメです。

今回はAzure Event Hubsを使用してスケールアウトに対応したWebSocketサーバを作成してみました。

ソースコードはこちらになります。

github.com

解説

使用した主なライブラリ

NuGet Gallery | Microsoft.Azure.EventHubs 0.0.4-preview

.NET Standard対応しているEventHubs用のライブラリです。

現時点でpreview版です。

NuGet Gallery | Microsoft.Azure.EventHubs.Processor 0.0.4-preview

.NET Standard対応しているEventHubsからメッセージを受信する為のライブラリです。

現時点でpreview版です。

前回からの変更点

Comparing master...Backplane · yuka1984/aspnetcore_webspcket_sample · GitHub

前回サンプルからの差分になります。

Azure側での操作

Event Hubsの作成

まずはMarketPlaceからEvent Hubsを選択して作成を押下します。

f:id:tamafuyou:20170112223947p:plain

Nameを入力した後にPrincing TierをBasicにします。

Basicのほうが安いです。

作成を押下します。

f:id:tamafuyou:20170112224116p:plain

Event Hubsが作成できたらEventHubEntityの追加を行います。

f:id:tamafuyou:20170112225354p:plain

EvnetHubの名前を入力して作成します。

f:id:tamafuyou:20170112225613p:plain

Azure Blob Strageの作成

リソースグループにストレージアカウントを追加します。

f:id:tamafuyou:20170112231315p:plain

作成したストレージアカウントでBlobコンテナーを追加します。

f:id:tamafuyou:20170112231915p:plain

以上でAzure上での作成は終了です。

送信クラス

まずはAzure Event Hubsへ送信を行うクラスを作成します。

gist.github.com

Azure Event Hubsライブラリを使用して送信します。

EHConnectionStringはEvent Hubs内のShared access policiesにて確認できるConnection string primary keyを設定します。

f:id:tamafuyou:20170112234558p:plain

EHEntityPathには先ほど作成したEventHubEntityの名前を指定します。

前回、WebSocketの接続をObserver/Observableパターンで作成しましたので引き続き送信クラスもObsreverな形で実装しました。

受信クラス

Event Hubsへの接続は特殊な用途でない場合にはEventProcessorHostクラスを用いて受信します。

このクラスで受信を行うとパーティションへの接続の排他制御、オフセットの管理をAzure Blob Storageを用いて行ってくれます。

C# での Event Hubs の使用 | Microsoft Docs

こちらのページが参考になるでしょう。

しかし、スケールアウトでEvent hubsを用いるためには、EventProcessorHostを用いるとうまくいきません。

そこでDirectRecieveを使用します。

DirectReceiveはイベントの受信を低レベルで行うことが可能です。

今回はDirectEventReceiveManagerというクラスを作成して受信処理を行います。

gist.github.com

Event Hubsに対して送信したメッセージはどのパーティションに送信されるかは基本的に不明です。

なんでスケールアウトを達成するためにすべてのパーティションを監視します。

ますはEventhubClient.GetRuntimeInformationAsyncにてEventHubsの情報を取得します。

PartitionIdsプロパティにてパーティションIdを取得することができます。

パーティション毎に監視を行う際にGetPartitionRuntimeInformationAsyncにてパーティションの情報を取得します。

LastEnqueuedOffsetプロパティにてこのパーティションにエンキューされた最後のオフセット情報を取得できます。

取得開始時にこのオフセットを用いることで起動以前のメッセージを受信しないようにします。

それ以降はCreateReceiverでレシーバーを作成し受信メソッド行い、受信が成功した場合にはオフセットを更新します。

DirectEventReceiveManagerはIObserverを持っているのでSubscribeしているObserverに対してメッセージを流すように実装します。

チャットへの組み込み

ChatServerクラスを変更してEvent Hubsを用いてチャットメッセージのスケールアウトに対応します。

前回からの変更点として

SendChatMessageToEventHubsObserverクラス DirectEventReceiveManagerクラスのインスタンスをもって

CreateEventProcessorの戻り値にChatMessageProcessorのインスタンスを常に返すように実装。

また前回は 送信/受信をObserver/Obesrvableで実装したWebSocket管理クラスを相互にSubScribeし合うことでチャットを実現していましたが、管理クラスのSubscribeにEventHubsObserverを繋げることで、チャットメッセージを受信した場合にEvent Hubsに送信されるように変更し、DirectEventReceiveManagerのSubscribeに管理クラスを繋げることでEvent Hubsからメッセージを受信した場合にWebSocketで送信が行われるように変更しました。

Observer/Observableパターンで実装すると繋ぎ方の変更で処理を変更できたりします。

このチェインが実装していると楽しいです(^◇^)

最後にstartupクラスにてChatServer.EventRecieveEventAsyncを実行してDirectEventReceiveManagerクラスのRecieveAsyncを実行してすれば完了です。

この実装の問題点

今回の実装には大きな問題点があります。

実は一つのPartitionへの接続は最大5クライアントまでしか行うことができません。

要するに今回の実装ではWebSocketサーバ5プロセス分までしかスケールアウトすることができないのです。

標準的な受信を実現してるEventProcessorHostでは、Partitionへの接続の排他制御をAzure Blob StorageとEpoch受信という仕組みを用いて行っています。

Epoch受信はpartitionへの接続を行う際にlong値であるEpochを指定して接続を行います。

Epoch接続を行った場合、Epoch値が大きい接続が優先して接続されます。

例えばEpoch10のクライアントがパーティション1に接続していて他のクライアントがEpoch20で接続を行ってきた場合、Epoch10は切断されEpoch20が接続されます。

またEpoch20が接続されている状態でEpoch10のクライアントが接続を行った場合、Epoch10クライアントは接続を行うことができません。

DirectRecieveでもエポックに対応した接続を行うことが可能ですが、別途クライアント間でパーティション毎のエポック管理を行う必要があります。

このようなことから、Event Hubsはスケールアウトのバックグラウンドとしては適切でありませんでした。

最後に

次回は頑張ります(´・ω・`)

今回は以上になります。

ではでは