我想在第一个客户端连接时向我的服务器发送一个启动脉冲,在最后一个客户端断开连接时向我发送一个结束脉冲。

public class MyAdapter : IObservable<MyType> {

    IObservable<MyType> MyObservable = BuildMyObservable()
        .Initially(Start) // <- this method doesn't exist
        .Finally(Stop).Publish().RefCount().Subscribe(observer);

    public IDisposable Subscribe(IObserver<MyType> observer);
        return MyObservable.Subscribe(observer)
    }

    async Task Start() { /* start UDP stream */ }
    async Task Stop() { /* stop UDP stream */ }
    IObservable<MyType> BuildMyObservable() { /* wire up stuff */ }

}


在上述方法中,我是否正在寻找一个不存在的函数Initially,还是只是忽略了它?

最佳答案

我猜您正在从RxJava寻找.aa的.Net等效项。它不存在。

您可以做的是将MyObservable包装在Observable.Defer函数中,并在Defer中调用服务器。您可以使用下面的代码来了解我的意思:

class Program
{
    static void Main(string[] args)
    {
        var source = Observable.Interval(TimeSpan.FromSeconds(1));

        var published = Observable.Defer(() =>
        {
            Console.WriteLine("Start"); // Here, you post "Start" to server
            return source;
        })
        .Finally(() => Console.WriteLine("End")) // Here, you post "End"
        .Publish()
        .RefCount();

        Console.ReadLine();
        var disposable = published.Subscribe(x => Console.WriteLine("First " + x));
        Console.ReadLine();
        var disposable2 = published.Subscribe(x => Console.WriteLine("Second " + x));
        Console.ReadLine();
        disposable.Dispose();
        Console.ReadLine();
        disposable2.Dispose();
        Console.ReadLine();
        published.Subscribe(x => Console.WriteLine("Third " + x));
        Console.ReadLine();
    }
}


有关延迟的更多说明,请参见doOnSubscribe

关于c# - 将任务添加到IObservable <T>(最终对称),我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/37909419/

10-12 23:02