我想在第一个客户端连接时向我的服务器发送一个启动脉冲,在最后一个客户端断开连接时向我发送一个结束脉冲。
public class MyAdapter : IObservable<MyType> {
IObservable<MyType> MyObservable = BuildMyObservable()
.Initially(Start) // <- this method doesn't exist
.Finally(Stop).Publish().RefCount().Subscribe(observer);
public IDisposable Subscribe(IObserver<MyType> observer);
return MyObservable.Subscribe(observer)
}
async Task Start() { /* start UDP stream */ }
async Task Stop() { /* stop UDP stream */ }
IObservable<MyType> BuildMyObservable() { /* wire up stuff */ }
}
在上述方法中,我是否正在寻找一个不存在的函数
Initially
,还是只是忽略了它? 最佳答案
我猜您正在从RxJava寻找.aa的.Net等效项。它不存在。
您可以做的是将MyObservable
包装在Observable.Defer
函数中,并在Defer
中调用服务器。您可以使用下面的代码来了解我的意思:
class Program
{
static void Main(string[] args)
{
var source = Observable.Interval(TimeSpan.FromSeconds(1));
var published = Observable.Defer(() =>
{
Console.WriteLine("Start"); // Here, you post "Start" to server
return source;
})
.Finally(() => Console.WriteLine("End")) // Here, you post "End"
.Publish()
.RefCount();
Console.ReadLine();
var disposable = published.Subscribe(x => Console.WriteLine("First " + x));
Console.ReadLine();
var disposable2 = published.Subscribe(x => Console.WriteLine("Second " + x));
Console.ReadLine();
disposable.Dispose();
Console.ReadLine();
disposable2.Dispose();
Console.ReadLine();
published.Subscribe(x => Console.WriteLine("Third " + x));
Console.ReadLine();
}
}
有关延迟的更多说明,请参见doOnSubscribe。
关于c# - 将任务添加到IObservable <T>(最终对称),我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/37909419/