我正在寻找一个非常基本的例子来说明如何使用无功扩展(RxPY)和Twisted。这里有一个最小的hello应用程序,它使用Twisted来传输消息。

def hello():
    print 'Hello from the reactor loop!'
    print 'Lately I feel like I\'m stuck in a rut.'

from twisted.internet import reactor

reactor.callWhenRunning(hello)

print 'Starting the reactor.'
reactor.run()

我想使用RxPY库钩住这些流(如果这样做更容易的话,它们不必打印到屏幕上),并执行规范的操作,如映射、筛选等。。。
我可以找到的所有RxPY示例都可以生成自己的流,例如从iterable生成的流,下面的代码流整数为0-9:
xs = Observable.from_(range(10))
xs.map(
    lambda x: x * 2
      ).subscribe(print)

或者包含在更复杂的示例中(如子类化WebSocket处理程序)。知道我怎么截取打印信息吗?例如,从扭曲的反应堆中产生一个可观察的流?

最佳答案

我一直在想同样的事情。这是我的发现。
启动RX数据流的最简单方法是创建Subject(),它将ObserverObservable结合在一起。然后您可以使用on_next方法将数据输入到其中。
Rx部分:

from rx.subjects import Subject
subject=Subject()
subject.filter(...).map(...).subscribe(my_observer)

从twisted回调中,您只需执行以下操作:
subject.on_next(data_item)
...
subject.on_completed()

或者您可以发出错误信号:
subject.on_error(my_error)

在学习RxPy的时候,我已经写了simple web server using both RxPy and Twisted。它是一个python文件,可以作为示例使用。

10-05 18:07