本文介绍了在冷 IObservable 上暂停和恢复订阅的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
使用 Rx,我需要以下代码中的暂停和恢复功能:
如何实现 Pause() 和 Resume() ?
静态 IDisposable _subscription;静态无效主(字符串 [] args){订阅();线程睡眠(500);//两秒后不应显示第二个值:暂停();线程.睡眠(5000);//继续并显示第二个值及以后的值:恢复();}静态无效订阅(){var list = new List{ 1, 2, 3, 4, 5 };var obs = list.ToObservable();_subscription = obs.SubscribeOn(Scheduler.NewThread).Subscribe(p =>{Console.WriteLine(p.ToString());线程睡眠(2000);},错误 =>Console.WriteLine("错误"),() =>Console.WriteLine("序列完成"));}静态无效暂停(){//伪代码://_subscription.Pause();}静态无效恢复(){//伪代码://_subscription.Resume();}
接收解决方案?
我相信我可以让它与某种布尔字段门控结合使用线程锁定(
Monitor.Wait
和Monitor.Pulse
)但是是否有 Rx 操作符或其他一些反应式速记来实现相同的目标?
解决方案
它只是有效:
类 SimpleWaitPulse{静态只读对象_locker = new object();静态布尔_go;静态无效主(){//新线程会阻塞新线程(工作).开始();//因为 _go==false.Console.ReadLine();//等待用户回车lock (_locker)//现在让我们通过以下方式唤醒线程{//设置 _go=true 和脉冲._go = 真;Monitor.Pulse (_locker);}}静态无效工作(){锁 (_locker)而 (!_go)Monitor.Wait (_locker);//在我们等待时释放锁Console.WriteLine("醒了!!!");}}
请参阅如何使用等待和脉冲了解更多详情>
Using Rx, I desire pause and resume functionality in the following code:
How to implement Pause() and Resume() ?
static IDisposable _subscription;
static void Main(string[] args)
{
Subscribe();
Thread.Sleep(500);
// Second value should not be shown after two seconds:
Pause();
Thread.Sleep(5000);
// Continue and show second value and beyond now:
Resume();
}
static void Subscribe()
{
var list = new List<int> { 1, 2, 3, 4, 5 };
var obs = list.ToObservable();
_subscription = obs.SubscribeOn(Scheduler.NewThread).Subscribe(p =>
{
Console.WriteLine(p.ToString());
Thread.Sleep(2000);
},
err => Console.WriteLine("Error"),
() => Console.WriteLine("Sequence Completed")
);
}
static void Pause()
{
// Pseudocode:
//_subscription.Pause();
}
static void Resume()
{
// Pseudocode:
//_subscription.Resume();
}
Rx Solution?
I believe I could make it work with some kind of Boolean field gating combined with thread locking (
Monitor.Wait
andMonitor.Pulse
)But is there an Rx operator or some other reactive shorthand to achieve the same aim?
解决方案
It just works:
class SimpleWaitPulse
{
static readonly object _locker = new object();
static bool _go;
static void Main()
{ // The new thread will block
new Thread (Work).Start(); // because _go==false.
Console.ReadLine(); // Wait for user to hit Enter
lock (_locker) // Let's now wake up the thread by
{ // setting _go=true and pulsing.
_go = true;
Monitor.Pulse (_locker);
}
}
static void Work()
{
lock (_locker)
while (!_go)
Monitor.Wait (_locker); // Lock is released while we’re waiting
Console.WriteLine ("Woken!!!");
}
}
Please, see How to Use Wait and Pulse for more details
这篇关于在冷 IObservable 上暂停和恢复订阅的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!