本文介绍了在冷 IObservable 上暂停和恢复订阅的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用 Rx,我需要以下代码中的暂停和恢复功能:

如何实现 Pause() 和 Resume() ?

 静态 IDisposable _subscription;静态无效主(字符串 [] args){订阅();线程睡眠(500);//两秒后不应显示第二个值:暂停();线程.睡眠(5000);//继续并显示第二个值及以后的值:恢复();}静态无效订阅(){var list = new List{ 1, 2, 3, 4, 5 };var obs = list.ToObservable();_subscription = obs.SubscribeOn(Scheduler.NewThread).Subscribe(p =>{Console.WriteLine(p.ToString());线程睡眠(2000);},错误 =>Console.WriteLine("错误"),() =>Console.WriteLine("序列完成"));}静态无效暂停(){//伪代码://_subscription.Pause();}静态无效恢复(){//伪代码://_subscription.Resume();}

接收解决方案?

  • 我相信我可以让它与某种布尔字段门控结合使用线程锁定(Monitor.WaitMonitor.Pulse)

  • 但是是否有 Rx 操作符或其他一些反应式速记来实现相同的目标?

解决方案

它只是有效:

 类 SimpleWaitPulse{静态只读对象_locker = new object();静态布尔_go;静态无效主(){//新线程会阻塞新线程(工作).开始();//因为 _go==false.Console.ReadLine();//等待用户回车lock (_locker)//现在让我们通过以下方式唤醒线程{//设置 _go=true 和脉冲._go = 真;Monitor.Pulse (_locker);}}静态无效工作(){锁 (_locker)而 (!_go)Monitor.Wait (_locker);//在我们等待时释放锁Console.WriteLine("醒了!!!");}}

请参阅如何使用等待和脉冲了解更多详情>

Using Rx, I desire pause and resume functionality in the following code:

How to implement Pause() and Resume() ?

    static IDisposable _subscription;

    static void Main(string[] args)
    {
        Subscribe();
        Thread.Sleep(500);
        // Second value should not be shown after two seconds:
        Pause();
        Thread.Sleep(5000);
        // Continue and show second value and beyond now:
        Resume();
    }

    static void Subscribe()
    {
        var list = new List<int> { 1, 2, 3, 4, 5 };
        var obs = list.ToObservable();
        _subscription = obs.SubscribeOn(Scheduler.NewThread).Subscribe(p =>
        {
            Console.WriteLine(p.ToString());
            Thread.Sleep(2000);
        },
        err => Console.WriteLine("Error"),
        () => Console.WriteLine("Sequence Completed")
        );
    }

    static void Pause()
    {
        // Pseudocode:
        //_subscription.Pause();
    }

    static void Resume()
    {
        // Pseudocode:
        //_subscription.Resume();
    }

Rx Solution?

  • I believe I could make it work with some kind of Boolean field gating combined with thread locking (Monitor.Wait and Monitor.Pulse)

  • But is there an Rx operator or some other reactive shorthand to achieve the same aim?

解决方案

It just works:

    class SimpleWaitPulse
    {
      static readonly object _locker = new object();
      static bool _go;

      static void Main()
      {                                // The new thread will block
        new Thread (Work).Start();     // because _go==false.

        Console.ReadLine();            // Wait for user to hit Enter

        lock (_locker)                 // Let's now wake up the thread by
        {                              // setting _go=true and pulsing.
          _go = true;
          Monitor.Pulse (_locker);
        }
      }

      static void Work()
      {
        lock (_locker)
          while (!_go)
            Monitor.Wait (_locker);    // Lock is released while we’re waiting

        Console.WriteLine ("Woken!!!");
      }
    }

Please, see How to Use Wait and Pulse for more details

这篇关于在冷 IObservable 上暂停和恢复订阅的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-30 23:43