我想创建一个实用程序方法,该方法为仅在订阅时AND调用的Action创建IObservable。遵循SubscribeOn(...)指令。这是我的实现,它基于我可以从http://www.introtorx.com和其他资源中提取的内容,但是在一种特定情况下失败:

    /// <summary>
    /// Makes an observable out of an action. Only at subscription the task will be executed.
    /// </summary>
    /// <param name="action">The action.</param>
    /// <returns></returns>
    public static IObservable<Unit> MakeObservable_2(Action action)
    {
        return Observable.Create<Unit>(
            observer =>
            {
                return System.Reactive.Concurrency.CurrentThreadScheduler.Instance.Schedule(
                    () =>
                    {
                        try
                        {
                            action();
                            observer.OnNext(Unit.Default);
                            observer.OnCompleted();
                        }
                        catch (Exception ex)
                        {
                            observer.OnError(ex);
                        }
                    });
            });
    }


我希望使用CurrrentThreadScheduler可以导致使用SubscribeOn()中给定的Scheduler。此实现适用于.SubscribeOn(TaskPoolScheduler.Default),但不适用于.SubscribeOn(Dispatcher.CurrentDispatcher)。您能否更改上面的实现,以使下面的所有单元测试都通过?

    [Test]
    public void RxActionUtilities_MakeObservableFromAction_WorksAsExpected()
    {
        ManualResetEvent evt = new ManualResetEvent(false);

        // Timeout of this test if sth. goes wrong below
        Task.Factory.StartNew(() =>
        {
            Thread.Sleep(5000);
            Console.WriteLine("Test timed out!");
            evt.Set();
        });

        int threadIdOfAction = -42;
        int threadIdOfSubscriptionContect = -43;
        bool subscriptionWasCalled = false;

        Action action = () =>
            {
                threadIdOfAction = Thread.CurrentThread.ManagedThreadId;
                Console.WriteLine("This is an action on thread " + threadIdOfAction);
            };

        var observable = RxActionUtilities.MakeObservable_2(action);

        threadIdOfSubscriptionContect = Thread.CurrentThread.ManagedThreadId;
        Console.WriteLine("Before subscription on thread " + threadIdOfSubscriptionContect);

        // The next line is the one I want to have working, but the subscription is never executed
        observable.SubscribeOn(Dispatcher.CurrentDispatcher).Subscribe(
            //observable.Subscribe( // would pass
            (unit) =>
            {
                Console.WriteLine("Subscription: OnNext " + threadIdOfAction + ", " + threadIdOfSubscriptionContect);
                subscriptionWasCalled = true;
            },
            (ex) => evt.Set(), () => evt.Set());

        Console.WriteLine("After subscription");

        evt.WaitOne();

        Assert.AreNotEqual(-42, threadIdOfAction);
        Assert.AreNotEqual(-43, threadIdOfSubscriptionContect);

        Assert.AreEqual(threadIdOfAction, threadIdOfSubscriptionContect);
        Assert.That(subscriptionWasCalled);
    }

    [Test]
    // This test passes with the current implementation
    public void RxActionUtilities_MakeObservableFromActionSubscribeOnDifferentThread_WorksAsExpected()
    {
        ManualResetEvent evt = new ManualResetEvent(false);

        // Timeout of this test if sth. goes wrong below
        Task.Factory.StartNew(() =>
        {
            Thread.Sleep(5000);
            Console.WriteLine("Test timed out!");
            evt.Set();
        });

        int threadIdOfAction = 42;
        int threadIdOfSubscriptionContect = 43;
        bool subscriptionWasCalled = false;

        Action action = () =>
        {
            threadIdOfAction = Thread.CurrentThread.ManagedThreadId;
            Console.WriteLine("This is an action on thread " + threadIdOfAction);
        };

        var observable = RxActionUtilities.MakeObservable_2(action);

        threadIdOfSubscriptionContect = Thread.CurrentThread.ManagedThreadId;
        Console.WriteLine("Before subscription on thread " + threadIdOfSubscriptionContect);

        // The next line is the one I want to have working, but the subscription is never executed
        observable.SubscribeOn(TaskPoolScheduler.Default).Subscribe(
            (unit) =>
            {
                Console.WriteLine("Subscription: OnNext " + threadIdOfAction + ", " + threadIdOfSubscriptionContect);
                subscriptionWasCalled = true;
            },
            (ex) => evt.Set(), () => evt.Set());

        evt.WaitOne();

        Console.WriteLine("After subscription");

        Assert.AreNotEqual(-42, threadIdOfAction);
        Assert.AreNotEqual(-43, threadIdOfSubscriptionContect);
        Assert.AreNotEqual(threadIdOfAction, threadIdOfSubscriptionContect);
        Assert.That(subscriptionWasCalled);
    }


    [Test]
    public void RxActionUtilities_MakeObservableFromAction_IsCancellable()
    {
        ManualResetEvent evt = new ManualResetEvent(false);

        // Timeout of this test if sth. goes wrong below
        Task.Factory.StartNew(() =>
        {
            Thread.Sleep(5000);
            Console.WriteLine("Test timed out!");
            evt.Set();
        });

        int threadIdOfAction = -42;
        int threadIdOfSubscriptionContect = -43;
        bool subscriptionWasCalled = false;
        bool actionTerminated = false;

        Action action = () =>
        {
            threadIdOfAction = Thread.CurrentThread.ManagedThreadId;

            for (int i = 0; i < 10; ++i)
            {
                Console.WriteLine("Some action #" + i);

                Thread.Sleep(200);
            }

            actionTerminated = true;
            evt.Set();
        };

        var observable = RxActionUtilities.MakeObservable_2(action);

        threadIdOfSubscriptionContect = Thread.CurrentThread.ManagedThreadId;
        Console.WriteLine("Before subscription on thread " + threadIdOfSubscriptionContect);

        var subscription =
            observable.SubscribeOn(TaskPoolScheduler.Default).Subscribe(
                (unit) =>
                {
                    Console.WriteLine("Subscription: OnNext " + threadIdOfAction + ", " + threadIdOfSubscriptionContect);
                    subscriptionWasCalled = true;
                },
                (ex) => evt.Set(), () => evt.Set());

        Console.WriteLine("After subscription");

        Thread.Sleep(1000);
        Console.WriteLine("Killing subscription ...");
        subscription.Dispose();
        Console.WriteLine("... done.");

        evt.WaitOne();

        Assert.IsFalse(actionTerminated);

        Assert.AreNotEqual(-42, threadIdOfAction);
        Assert.AreNotEqual(-43, threadIdOfSubscriptionContect);

        Assert.AreEqual(threadIdOfAction, threadIdOfSubscriptionContect);
        Assert.That(subscriptionWasCalled);
    }


更新资料

针对Lee的详尽回答,我再次尝试并重新提出我的问题。 IIUC我们可以总结一下


您无法停止已经开始的操作
我完全误解了Dispatcher.CurrentDispatcher及其工作原理:AFAICS永远不要将其用作SubscribeOn()的参数,而应仅用作ObserveOn的参数。
我误会了CurrentThreadScheduler


为了创建可以取消的内容,我们需要知道取消的操作,例如通过使用Action<CancellationToken>。这是我的下一个尝试。请告诉我您是否认为此实现非常适合Rx框架,或者我们是否可以再次对其进行改进:

public static IObservable<Unit>
    MakeObservable(Action<CancellationToken> action, IScheduler scheduler)
{
    return Observable.Create<Unit>(
        observer
        =>
        {
            // internally creates a new CancellationTokenSource
            var cancel = new CancellationDisposable();

            var scheduledAction = scheduler.Schedule(() =>
            {
                try
                {
                    action(cancel.Token);
                    observer.OnCompleted();
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                }
            });

            // Cancellation before execution of action is performed
            // by disposing scheduledAction
            // Cancellation during execution of action is performed
            // by disposing cancel
            return new CompositeDisposable(cancel, scheduledAction);
        });
}


如果您愿意的话:我不知道如何使用TestScheduler进行测试:

[Test]
public void MakeObservableFromCancelableAction_CancellationTakesPlaceWithTrueThread()
{
    var scheduler = NewThreadScheduler.Default;

    Action<CancellationToken> action =
        (cancellationToken) =>
        {
            for (int i = 0; i < 10; ++i)
            {
                Console.WriteLine("Some action #" + i);

                if (cancellationToken.IsCancellationRequested)
                {
                    break;
                }

                Thread.Sleep(20);
                // Hoping that the disposal of the subscription stops
                // the loop before we reach i == 4.
                Assert.Less(i, 4);
            }
        };

    var observable = RxActionUtilities.MakeObservable(action, scheduler);

    var subscription = observable.Subscribe((unit) => { });

    Thread.Sleep(60);

    subscription.Dispose();
}

最佳答案

我认为您可以使您的代码简单得多,也可以使测试简单得多。 Rx的优点在于您应该可以消除所有Task / Thread / ManualResetEvent。我还假设您也可以只使用NUnit的[Timeout]属性而不是自定义代码。

无论如何...
@Per是正确的,Observable.Start是您想要的。您为它传递了一个Action和一个IScheduler,这似乎正是您想要的。

[Test]
public void Run_Action_as_IOb_on_scheduler_with_ObStart()
{
    var scheduler = new TestScheduler();
    var flag = false;
    Action action = () => { flag = true; };

    var subscription = Observable.Start(action, scheduler)
                                    .Subscribe();

    Assert.IsFalse(flag);
    scheduler.AdvanceBy(1);
    Assert.IsTrue(flag);
    subscription.Dispose(); //Not required as the sequence will have completed and then auto-detached.
}


但是,您可能会注意到它确实有一些奇怪的行为(至少在我在这台PC上的V1中)。具体来说,Observable.Start将立即运行Action,而实际上不等待可观察序列被订阅。同样由于这个原因,调用订阅,然后在应该执行该操作之前处置订阅无效。嗯

[Test]
public void Run_Action_as_IOb_on_scheduler_with_ObStart_dispose()
{
    var scheduler = new TestScheduler();
    var flag = false;
    Action action = () => { flag = true; };

    var subscription = Observable.Start(action, scheduler).Subscribe();


    Assert.IsFalse(flag);
    subscription.Dispose();
    scheduler.AdvanceBy(1);
    Assert.IsFalse(flag);   //FAILS. Oh no! this is true!
}
[Test]
public void Run_Action_as_IOb_on_scheduler_with_ObStart_no_subscribe()
{
    var scheduler = new TestScheduler();
    var flag = false;
    Action action = () => { flag = true; };

    Observable.Start(action, scheduler);
    //Note the lack of subscribe?!

    Assert.IsFalse(flag);
    scheduler.AdvanceBy(1);
    Assert.IsFalse(flag);//FAILS. Oh no! this is true!
}


但是,我们可以按照您的方式使用Observable.Create。您是如此亲密,但是,您不需要在Create委托中进行任何计划。只需相信Rx即可为您执行此操作。

[Test]
public void Run_Action_as_IOb_on_scheduler_with_ObCreate()
{
    var scheduler = new TestScheduler();
    var flag = false;
    Action action = () => { flag = true; };

    var subscription = Observable.Create<Unit>(observer =>
        {
            try
            {
                action();
                observer.OnNext(Unit.Default);
                observer.OnCompleted();
            }
            catch (Exception ex)
            {
                observer.OnError(ex);
            }
            return Disposable.Empty;
        })
        .SubscribeOn(scheduler)
        .Subscribe();   //Without subscribe, the action wont run.

    Assert.IsFalse(flag);
    scheduler.AdvanceBy(1);
    Assert.IsTrue(flag);
    subscription.Dispose(); //Not required as the sequence will have completed and then auto-detached.
}

[Test]
public void Run_Action_as_IOb_on_scheduler_with_ObCreate_dispose()
{
    var scheduler = new TestScheduler();
    var flag = false;
    Action action = () => { flag = true; };

    var subscription = Observable.Create<Unit>(observer =>
    {
        try
        {
            action();
            observer.OnNext(Unit.Default);
            observer.OnCompleted();
        }
        catch (Exception ex)
        {
            observer.OnError(ex);
        }
        return Disposable.Empty;
    })
        .SubscribeOn(scheduler)
        .Subscribe();   //Without subscribe, the action wont run.

    Assert.IsFalse(flag);
    subscription.Dispose();
    scheduler.AdvanceBy(1);
    Assert.IsFalse(flag);   //Subscription was disposed before the scheduler was able to run, so the action did not run.
}


如果您希望能够在正在处理的动作中途取消实际动作,那么您将需要做一些比这更高级的事情。

最终的实现很简单:

public static class RxActionUtilities
{
    /// <summary>
    /// Makes an observable out of an action. Only at subscription the task will be executed.
    /// </summary>
    /// <param name="action">The action.</param>
    /// <returns></returns>
    /// <example>
    /// <code>
    /// <![CDATA[
    /// RxActionUtilities.MakeObservable_3(myAction)
    ///                  .SubscribeOn(_schedulerProvider.TaskPoolScheduler)
    ///                  .Subscribe(....);
    ///
    /// ]]>
    /// </code>
    /// </example>
    public static IObservable<Unit> MakeObservable_3(Action action)
    {
        return Observable.Create<Unit>(observer =>
            {
                try
                {
                    action();
                    observer.OnNext(Unit.Default);
                    observer.OnCompleted();
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                }
                return Disposable.Empty;
            });
    }
}


希望对您有所帮助。

编辑:
不建议您在单元测试中使用Dispatcher。我认为您首先应该尝试理解其工作原理,然后再应用另一层(Rx)来增加混乱。在WPF中进行编码时,Rx给我带来的主要好处之一就是通过调度程序对Dispatcher的抽象。它使我可以轻松地在WPF中测试并发性。例如,此简单测试在此处失败:

[Test, Timeout(2000)]
public void DispatcherFail()
{
    var wasRun = false;
    Action MyAction = () =>
        {
            Console.WriteLine("Running...");
            wasRun = true;
            Console.WriteLine("Run.");
        };
    Dispatcher.CurrentDispatcher.BeginInvoke(MyAction);

    Assert.IsTrue(wasRun);
}


如果运行此命令,您会注意到什至没有任何内容打印到控制台上,因此我们没有争用条件,该操作永远不会运行。原因是调度程序尚未启动它的消息循环。要更正此测试,我们必须用凌乱的基础结构代码填充它。

[Test, Timeout(2000)]
public void Testing_with_Dispatcher_BeginInvoke()
{
    var frame = new DispatcherFrame();  //1 - The Message loop
    var wasRun = false;
    Action MyAction = () =>
    {
        Console.WriteLine("Running...");
        wasRun = true;
        Console.WriteLine("Run.");
        frame.Continue = false;         //2 - Stop the message loop, else we hang forever
    };
    Dispatcher.CurrentDispatcher.BeginInvoke(MyAction);

    Dispatcher.PushFrame(frame);        //3 - Start the message loop

    Assert.IsTrue(wasRun);
}


因此,我们显然不希望在WPF中需要并发的所有测试中执行此操作。尝试将frame.Continue = false注入我们不控制的动作将是一场噩梦。幸运的是,IScheudler通过其Schedule方法公开了我们需要的所有内容。

下一个CurrentThreadScheduler应该被认为是一个蹦床,而不是一个SynchronizationContext(这就是我想的那样)。

关于c# - 从 Action 创建可取消的IObservable,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/13877555/

10-13 04:58