我正在寻找一个非常基本的例子来说明如何使用无功扩展(RxPY)和Twisted。这里有一个最小的hello应用程序,它使用Twisted来传输消息。
def hello():
print 'Hello from the reactor loop!'
print 'Lately I feel like I\'m stuck in a rut.'
from twisted.internet import reactor
reactor.callWhenRunning(hello)
print 'Starting the reactor.'
reactor.run()
我想使用RxPY库钩住这些流(如果这样做更容易的话,它们不必打印到屏幕上),并执行规范的操作,如映射、筛选等。。。
我可以找到的所有RxPY示例都可以生成自己的流,例如从iterable生成的流,下面的代码流整数为0-9:
xs = Observable.from_(range(10))
xs.map(
lambda x: x * 2
).subscribe(print)
或者包含在更复杂的示例中(如子类化WebSocket处理程序)。知道我怎么截取打印信息吗?例如,从扭曲的反应堆中产生一个可观察的流?
最佳答案
我一直在想同样的事情。这是我的发现。
启动RX数据流的最简单方法是创建Subject()
,它将Observer
和Observable
结合在一起。然后您可以使用on_next
方法将数据输入到其中。
Rx部分:
from rx.subjects import Subject
subject=Subject()
subject.filter(...).map(...).subscribe(my_observer)
从twisted回调中,您只需执行以下操作:
subject.on_next(data_item)
...
subject.on_completed()
或者您可以发出错误信号:
subject.on_error(my_error)
在学习RxPy的时候,我已经写了simple web server using both RxPy and Twisted。它是一个python文件,可以作为示例使用。