ASP.NET Coreで作成したWebSocketサーバをAzure Event Hubsを使ってスケールアウトに対応してみたけど・・・
注意
2017年1月17日に大幅訂正を行いました。
今回実装してみた結果、WebSocketのスケールアウトにEvent Hubsを用いることは不適切です。
後半に何故EventHubsでスケールアウトを実装することが不適切であったかを書きました。
後日、より適切な実装を行ってみたいと思っています。
はじめに
前回の記事
で、ASP.NET Coreを使ってWebSocketチャットサーバを作成してみました。
ただ、前回の実装ではスケールアウトすることができませんでした。
1プロセスで1チャットサーバとなるためマルチプロセスなサーバで動かした時にはダメな感じになってしまいますしロードバランスで負荷分散した時などもダメダメです。
今回はAzure Event Hubsを使用してスケールアウトに対応したWebSocketサーバを作成してみました。
ソースコードはこちらになります。
解説
使用した主なライブラリ
NuGet Gallery | Microsoft.Azure.EventHubs 0.0.4-preview
.NET Standard対応しているEventHubs用のライブラリです。
現時点でpreview版です。
NuGet Gallery | Microsoft.Azure.EventHubs.Processor 0.0.4-preview
.NET Standard対応しているEventHubsからメッセージを受信する為のライブラリです。
現時点でpreview版です。
前回からの変更点
Comparing master...Backplane · yuka1984/aspnetcore_webspcket_sample · GitHub
前回サンプルからの差分になります。
Azure側での操作
Event Hubsの作成
まずはMarketPlaceからEvent Hubsを選択して作成を押下します。
Nameを入力した後にPrincing TierをBasicにします。
Basicのほうが安いです。
作成を押下します。
Event Hubsが作成できたらEventHubEntityの追加を行います。
EvnetHubの名前を入力して作成します。
Azure Blob Strageの作成
リソースグループにストレージアカウントを追加します。
作成したストレージアカウントでBlobコンテナーを追加します。
以上でAzure上での作成は終了です。
送信クラス
まずはAzure Event Hubsへ送信を行うクラスを作成します。
Azure Event Hubsライブラリを使用して送信します。
EHConnectionStringはEvent Hubs内のShared access policiesにて確認できるConnection string primary keyを設定します。
EHEntityPathには先ほど作成したEventHubEntityの名前を指定します。
前回、WebSocketの接続をObserver/Observableパターンで作成しましたので引き続き送信クラスもObsreverな形で実装しました。
受信クラス
Event Hubsへの接続は特殊な用途でない場合にはEventProcessorHostクラスを用いて受信します。
このクラスで受信を行うとパーティションへの接続の排他制御、オフセットの管理をAzure Blob Storageを用いて行ってくれます。
C# での Event Hubs の使用 | Microsoft Docs
こちらのページが参考になるでしょう。
しかし、スケールアウトでEvent hubsを用いるためには、EventProcessorHostを用いるとうまくいきません。
そこでDirectRecieveを使用します。
DirectReceiveはイベントの受信を低レベルで行うことが可能です。
今回はDirectEventReceiveManagerというクラスを作成して受信処理を行います。
Event Hubsに対して送信したメッセージはどのパーティションに送信されるかは基本的に不明です。
なんでスケールアウトを達成するためにすべてのパーティションを監視します。
ますはEventhubClient.GetRuntimeInformationAsyncにてEventHubsの情報を取得します。
PartitionIdsプロパティにてパーティションIdを取得することができます。
パーティション毎に監視を行う際にGetPartitionRuntimeInformationAsyncにてパーティションの情報を取得します。
LastEnqueuedOffsetプロパティにてこのパーティションにエンキューされた最後のオフセット情報を取得できます。
取得開始時にこのオフセットを用いることで起動以前のメッセージを受信しないようにします。
それ以降はCreateReceiverでレシーバーを作成し受信メソッド行い、受信が成功した場合にはオフセットを更新します。
DirectEventReceiveManagerはIObserver
チャットへの組み込み
ChatServerクラスを変更してEvent Hubsを用いてチャットメッセージのスケールアウトに対応します。
前回からの変更点として
SendChatMessageToEventHubsObserverクラス DirectEventReceiveManagerクラスのインスタンスをもって
CreateEventProcessorの戻り値にChatMessageProcessorのインスタンスを常に返すように実装。
また前回は 送信/受信をObserver/Obesrvableで実装したWebSocket管理クラスを相互にSubScribeし合うことでチャットを実現していましたが、管理クラスのSubscribeにEventHubsObserverを繋げることで、チャットメッセージを受信した場合にEvent Hubsに送信されるように変更し、DirectEventReceiveManagerのSubscribeに管理クラスを繋げることでEvent Hubsからメッセージを受信した場合にWebSocketで送信が行われるように変更しました。
Observer/Observableパターンで実装すると繋ぎ方の変更で処理を変更できたりします。
このチェインが実装していると楽しいです(^◇^)
最後にstartupクラスにてChatServer.EventRecieveEventAsyncを実行してDirectEventReceiveManagerクラスのRecieveAsyncを実行してすれば完了です。
この実装の問題点
今回の実装には大きな問題点があります。
実は一つのPartitionへの接続は最大5クライアントまでしか行うことができません。
要するに今回の実装ではWebSocketサーバ5プロセス分までしかスケールアウトすることができないのです。
標準的な受信を実現してるEventProcessorHostでは、Partitionへの接続の排他制御をAzure Blob StorageとEpoch受信という仕組みを用いて行っています。
Epoch受信はpartitionへの接続を行う際にlong値であるEpochを指定して接続を行います。
Epoch接続を行った場合、Epoch値が大きい接続が優先して接続されます。
例えばEpoch10のクライアントがパーティション1に接続していて他のクライアントがEpoch20で接続を行ってきた場合、Epoch10は切断されEpoch20が接続されます。
またEpoch20が接続されている状態でEpoch10のクライアントが接続を行った場合、Epoch10クライアントは接続を行うことができません。
DirectRecieveでもエポックに対応した接続を行うことが可能ですが、別途クライアント間でパーティション毎のエポック管理を行う必要があります。
このようなことから、Event Hubsはスケールアウトのバックグラウンドとしては適切でありませんでした。
最後に
次回は頑張ります(´・ω・`)
今回は以上になります。
ではでは