我想创建一个实用程序方法,该方法为仅在订阅时AND调用的Action创建IObservable。遵循SubscribeOn(...)指令。这是我的实现,它基于我可以从http://www.introtorx.com和其他资源中提取的内容,但是在一种特定情况下失败:
/// <summary>
/// Makes an observable out of an action. Only at subscription the task will be executed.
/// </summary>
/// <param name="action">The action.</param>
/// <returns></returns>
public static IObservable<Unit> MakeObservable_2(Action action)
{
return Observable.Create<Unit>(
observer =>
{
return System.Reactive.Concurrency.CurrentThreadScheduler.Instance.Schedule(
() =>
{
try
{
action();
observer.OnNext(Unit.Default);
observer.OnCompleted();
}
catch (Exception ex)
{
observer.OnError(ex);
}
});
});
}
我希望使用CurrrentThreadScheduler可以导致使用SubscribeOn()中给定的Scheduler。此实现适用于.SubscribeOn(TaskPoolScheduler.Default),但不适用于.SubscribeOn(Dispatcher.CurrentDispatcher)。您能否更改上面的实现,以使下面的所有单元测试都通过?
[Test]
public void RxActionUtilities_MakeObservableFromAction_WorksAsExpected()
{
ManualResetEvent evt = new ManualResetEvent(false);
// Timeout of this test if sth. goes wrong below
Task.Factory.StartNew(() =>
{
Thread.Sleep(5000);
Console.WriteLine("Test timed out!");
evt.Set();
});
int threadIdOfAction = -42;
int threadIdOfSubscriptionContect = -43;
bool subscriptionWasCalled = false;
Action action = () =>
{
threadIdOfAction = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine("This is an action on thread " + threadIdOfAction);
};
var observable = RxActionUtilities.MakeObservable_2(action);
threadIdOfSubscriptionContect = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine("Before subscription on thread " + threadIdOfSubscriptionContect);
// The next line is the one I want to have working, but the subscription is never executed
observable.SubscribeOn(Dispatcher.CurrentDispatcher).Subscribe(
//observable.Subscribe( // would pass
(unit) =>
{
Console.WriteLine("Subscription: OnNext " + threadIdOfAction + ", " + threadIdOfSubscriptionContect);
subscriptionWasCalled = true;
},
(ex) => evt.Set(), () => evt.Set());
Console.WriteLine("After subscription");
evt.WaitOne();
Assert.AreNotEqual(-42, threadIdOfAction);
Assert.AreNotEqual(-43, threadIdOfSubscriptionContect);
Assert.AreEqual(threadIdOfAction, threadIdOfSubscriptionContect);
Assert.That(subscriptionWasCalled);
}
[Test]
// This test passes with the current implementation
public void RxActionUtilities_MakeObservableFromActionSubscribeOnDifferentThread_WorksAsExpected()
{
ManualResetEvent evt = new ManualResetEvent(false);
// Timeout of this test if sth. goes wrong below
Task.Factory.StartNew(() =>
{
Thread.Sleep(5000);
Console.WriteLine("Test timed out!");
evt.Set();
});
int threadIdOfAction = 42;
int threadIdOfSubscriptionContect = 43;
bool subscriptionWasCalled = false;
Action action = () =>
{
threadIdOfAction = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine("This is an action on thread " + threadIdOfAction);
};
var observable = RxActionUtilities.MakeObservable_2(action);
threadIdOfSubscriptionContect = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine("Before subscription on thread " + threadIdOfSubscriptionContect);
// The next line is the one I want to have working, but the subscription is never executed
observable.SubscribeOn(TaskPoolScheduler.Default).Subscribe(
(unit) =>
{
Console.WriteLine("Subscription: OnNext " + threadIdOfAction + ", " + threadIdOfSubscriptionContect);
subscriptionWasCalled = true;
},
(ex) => evt.Set(), () => evt.Set());
evt.WaitOne();
Console.WriteLine("After subscription");
Assert.AreNotEqual(-42, threadIdOfAction);
Assert.AreNotEqual(-43, threadIdOfSubscriptionContect);
Assert.AreNotEqual(threadIdOfAction, threadIdOfSubscriptionContect);
Assert.That(subscriptionWasCalled);
}
[Test]
public void RxActionUtilities_MakeObservableFromAction_IsCancellable()
{
ManualResetEvent evt = new ManualResetEvent(false);
// Timeout of this test if sth. goes wrong below
Task.Factory.StartNew(() =>
{
Thread.Sleep(5000);
Console.WriteLine("Test timed out!");
evt.Set();
});
int threadIdOfAction = -42;
int threadIdOfSubscriptionContect = -43;
bool subscriptionWasCalled = false;
bool actionTerminated = false;
Action action = () =>
{
threadIdOfAction = Thread.CurrentThread.ManagedThreadId;
for (int i = 0; i < 10; ++i)
{
Console.WriteLine("Some action #" + i);
Thread.Sleep(200);
}
actionTerminated = true;
evt.Set();
};
var observable = RxActionUtilities.MakeObservable_2(action);
threadIdOfSubscriptionContect = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine("Before subscription on thread " + threadIdOfSubscriptionContect);
var subscription =
observable.SubscribeOn(TaskPoolScheduler.Default).Subscribe(
(unit) =>
{
Console.WriteLine("Subscription: OnNext " + threadIdOfAction + ", " + threadIdOfSubscriptionContect);
subscriptionWasCalled = true;
},
(ex) => evt.Set(), () => evt.Set());
Console.WriteLine("After subscription");
Thread.Sleep(1000);
Console.WriteLine("Killing subscription ...");
subscription.Dispose();
Console.WriteLine("... done.");
evt.WaitOne();
Assert.IsFalse(actionTerminated);
Assert.AreNotEqual(-42, threadIdOfAction);
Assert.AreNotEqual(-43, threadIdOfSubscriptionContect);
Assert.AreEqual(threadIdOfAction, threadIdOfSubscriptionContect);
Assert.That(subscriptionWasCalled);
}
更新资料
针对Lee的详尽回答,我再次尝试并重新提出我的问题。 IIUC我们可以总结一下
您无法停止已经开始的操作
我完全误解了Dispatcher.CurrentDispatcher及其工作原理:AFAICS永远不要将其用作SubscribeOn()的参数,而应仅用作ObserveOn的参数。
我误会了CurrentThreadScheduler
为了创建可以取消的内容,我们需要知道取消的操作,例如通过使用
Action<CancellationToken>
。这是我的下一个尝试。请告诉我您是否认为此实现非常适合Rx框架,或者我们是否可以再次对其进行改进:public static IObservable<Unit>
MakeObservable(Action<CancellationToken> action, IScheduler scheduler)
{
return Observable.Create<Unit>(
observer
=>
{
// internally creates a new CancellationTokenSource
var cancel = new CancellationDisposable();
var scheduledAction = scheduler.Schedule(() =>
{
try
{
action(cancel.Token);
observer.OnCompleted();
}
catch (Exception ex)
{
observer.OnError(ex);
}
});
// Cancellation before execution of action is performed
// by disposing scheduledAction
// Cancellation during execution of action is performed
// by disposing cancel
return new CompositeDisposable(cancel, scheduledAction);
});
}
如果您愿意的话:我不知道如何使用
TestScheduler
进行测试:[Test]
public void MakeObservableFromCancelableAction_CancellationTakesPlaceWithTrueThread()
{
var scheduler = NewThreadScheduler.Default;
Action<CancellationToken> action =
(cancellationToken) =>
{
for (int i = 0; i < 10; ++i)
{
Console.WriteLine("Some action #" + i);
if (cancellationToken.IsCancellationRequested)
{
break;
}
Thread.Sleep(20);
// Hoping that the disposal of the subscription stops
// the loop before we reach i == 4.
Assert.Less(i, 4);
}
};
var observable = RxActionUtilities.MakeObservable(action, scheduler);
var subscription = observable.Subscribe((unit) => { });
Thread.Sleep(60);
subscription.Dispose();
}
最佳答案
我认为您可以使您的代码简单得多,也可以使测试简单得多。 Rx的优点在于您应该可以消除所有Task / Thread / ManualResetEvent。我还假设您也可以只使用NUnit的[Timeout]属性而不是自定义代码。
无论如何...
@Per是正确的,Observable.Start是您想要的。您为它传递了一个Action和一个IScheduler,这似乎正是您想要的。
[Test]
public void Run_Action_as_IOb_on_scheduler_with_ObStart()
{
var scheduler = new TestScheduler();
var flag = false;
Action action = () => { flag = true; };
var subscription = Observable.Start(action, scheduler)
.Subscribe();
Assert.IsFalse(flag);
scheduler.AdvanceBy(1);
Assert.IsTrue(flag);
subscription.Dispose(); //Not required as the sequence will have completed and then auto-detached.
}
但是,您可能会注意到它确实有一些奇怪的行为(至少在我在这台PC上的V1中)。具体来说,Observable.Start将立即运行Action,而实际上不等待可观察序列被订阅。同样由于这个原因,调用订阅,然后在应该执行该操作之前处置订阅无效。嗯
[Test]
public void Run_Action_as_IOb_on_scheduler_with_ObStart_dispose()
{
var scheduler = new TestScheduler();
var flag = false;
Action action = () => { flag = true; };
var subscription = Observable.Start(action, scheduler).Subscribe();
Assert.IsFalse(flag);
subscription.Dispose();
scheduler.AdvanceBy(1);
Assert.IsFalse(flag); //FAILS. Oh no! this is true!
}
[Test]
public void Run_Action_as_IOb_on_scheduler_with_ObStart_no_subscribe()
{
var scheduler = new TestScheduler();
var flag = false;
Action action = () => { flag = true; };
Observable.Start(action, scheduler);
//Note the lack of subscribe?!
Assert.IsFalse(flag);
scheduler.AdvanceBy(1);
Assert.IsFalse(flag);//FAILS. Oh no! this is true!
}
但是,我们可以按照您的方式使用Observable.Create。您是如此亲密,但是,您不需要在Create委托中进行任何计划。只需相信Rx即可为您执行此操作。
[Test]
public void Run_Action_as_IOb_on_scheduler_with_ObCreate()
{
var scheduler = new TestScheduler();
var flag = false;
Action action = () => { flag = true; };
var subscription = Observable.Create<Unit>(observer =>
{
try
{
action();
observer.OnNext(Unit.Default);
observer.OnCompleted();
}
catch (Exception ex)
{
observer.OnError(ex);
}
return Disposable.Empty;
})
.SubscribeOn(scheduler)
.Subscribe(); //Without subscribe, the action wont run.
Assert.IsFalse(flag);
scheduler.AdvanceBy(1);
Assert.IsTrue(flag);
subscription.Dispose(); //Not required as the sequence will have completed and then auto-detached.
}
[Test]
public void Run_Action_as_IOb_on_scheduler_with_ObCreate_dispose()
{
var scheduler = new TestScheduler();
var flag = false;
Action action = () => { flag = true; };
var subscription = Observable.Create<Unit>(observer =>
{
try
{
action();
observer.OnNext(Unit.Default);
observer.OnCompleted();
}
catch (Exception ex)
{
observer.OnError(ex);
}
return Disposable.Empty;
})
.SubscribeOn(scheduler)
.Subscribe(); //Without subscribe, the action wont run.
Assert.IsFalse(flag);
subscription.Dispose();
scheduler.AdvanceBy(1);
Assert.IsFalse(flag); //Subscription was disposed before the scheduler was able to run, so the action did not run.
}
如果您希望能够在正在处理的动作中途取消实际动作,那么您将需要做一些比这更高级的事情。
最终的实现很简单:
public static class RxActionUtilities
{
/// <summary>
/// Makes an observable out of an action. Only at subscription the task will be executed.
/// </summary>
/// <param name="action">The action.</param>
/// <returns></returns>
/// <example>
/// <code>
/// <![CDATA[
/// RxActionUtilities.MakeObservable_3(myAction)
/// .SubscribeOn(_schedulerProvider.TaskPoolScheduler)
/// .Subscribe(....);
///
/// ]]>
/// </code>
/// </example>
public static IObservable<Unit> MakeObservable_3(Action action)
{
return Observable.Create<Unit>(observer =>
{
try
{
action();
observer.OnNext(Unit.Default);
observer.OnCompleted();
}
catch (Exception ex)
{
observer.OnError(ex);
}
return Disposable.Empty;
});
}
}
希望对您有所帮助。
编辑:
不建议您在单元测试中使用Dispatcher。我认为您首先应该尝试理解其工作原理,然后再应用另一层(Rx)来增加混乱。在WPF中进行编码时,Rx给我带来的主要好处之一就是通过调度程序对Dispatcher的抽象。它使我可以轻松地在WPF中测试并发性。例如,此简单测试在此处失败:
[Test, Timeout(2000)]
public void DispatcherFail()
{
var wasRun = false;
Action MyAction = () =>
{
Console.WriteLine("Running...");
wasRun = true;
Console.WriteLine("Run.");
};
Dispatcher.CurrentDispatcher.BeginInvoke(MyAction);
Assert.IsTrue(wasRun);
}
如果运行此命令,您会注意到什至没有任何内容打印到控制台上,因此我们没有争用条件,该操作永远不会运行。原因是调度程序尚未启动它的消息循环。要更正此测试,我们必须用凌乱的基础结构代码填充它。
[Test, Timeout(2000)]
public void Testing_with_Dispatcher_BeginInvoke()
{
var frame = new DispatcherFrame(); //1 - The Message loop
var wasRun = false;
Action MyAction = () =>
{
Console.WriteLine("Running...");
wasRun = true;
Console.WriteLine("Run.");
frame.Continue = false; //2 - Stop the message loop, else we hang forever
};
Dispatcher.CurrentDispatcher.BeginInvoke(MyAction);
Dispatcher.PushFrame(frame); //3 - Start the message loop
Assert.IsTrue(wasRun);
}
因此,我们显然不希望在WPF中需要并发的所有测试中执行此操作。尝试将frame.Continue = false注入我们不控制的动作将是一场噩梦。幸运的是,IScheudler通过其Schedule方法公开了我们需要的所有内容。
下一个CurrentThreadScheduler应该被认为是一个蹦床,而不是一个SynchronizationContext(这就是我想的那样)。
关于c# - 从 Action 创建可取消的IObservable,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/13877555/