我最近一直在阅读有关rx-java的文章。我想知道该框架是否适合线程之间的通信系统。我正在使用Java编写的REST服务器。每次PUT/POST某些资源时,我都想使用工作线程池进行一些计算。但是,我仍然希望能够监视请求,也许可以打印一些统计信息。本质上,我想要一个Observable,因此我可以使用多个Observer灵活地处理请求。

我的问题是,如何创建合适的Observable?我见过的大多数指南都涉及可观察对象的操作,例如映射等。可观察对象主要是由集合或整数范围创建的。在任何情况下,似乎都不可能将新值插入已创建的Observable。显然,保持这种灵活性的唯一方法是使用Observable.create。但是,这似乎是一个很低的层次。我将必须为每个新订户实现一个队列列表,并对每个单个订户执行synchronized推送。这真的必要吗,还是已经在rx-java中实现了类似的功能?

最佳答案

您正在寻找的是Subject。它们既充当观察者又充当可观察者。例如,ReplaySubject将重播发送给所有订阅者的所有事件。

Subject<String> replaySubject = ReplaySubject.create();
replaySubject.subscribe(s -> System.out.println(s));

// elsewhere...

replaySubject.onNext("First");
replaySubject.onNext("Second");
replaySubject.onComplete();

10-05 17:55