二つのObservableでタイムアウトをチェック
Rxのお勉強。
Rxのタイムアウトって一つのシーケンスの間隔でタイムアウトを管理するのですが、やりたい事は二つのObservableを組み合わせるパターン。
一つ目のObservableがOnNextされてから二つ目のObservableがOnNextされるまでの間にタイムアウトが発生したらErrorを流す。
リクエストとレスポンスを同期させてタイムアウトも管理したいイメージです。
public static IObservable<TResult> SyncTimeout<TSource, TResult>(this IObservable<TSource> first, IObservable<TSource> second, TimeSpan dueTime, Func<TSource, TSource, TimeSpan, TResult> selectorFunc) { var firstStamp = first.Select(x => Tuple.Create(x, DateTime.Now.Ticks)); var secondStamp = second.Select(x => Tuple.Create(x, DateTime.Now.Ticks)) .Merge(firstStamp.Delay(dueTime).Select(x => Tuple.Create(x.Item1, (long) 0))); return firstStamp .Zip(secondStamp, (f, s) => new {First = f.Item1, Second = s.Item1, Span = s.Item2 - f.Item2}) .Take(1) .Repeat() .Select(x => { if (x.Span < 0) throw new TimeoutException(); return selectorFunc(x.First, x.Second, TimeSpan.FromMilliseconds(x.Span)); }); }
なんかもっとうまく書けないですかねぇ(´・ω・`)
アドバイスいただけたりするとうれしいです。
その後
アドバイスいただいてこんな感じにできました。