我有一个通过套接字协议(protocol)连接到服务的Observable。与套接字的连接通过客户端库进行。我使用的客户端库具有java.util.Observer,可以注册将其推送到其中的事件

final class MyObservable extends Observable[MyEvent] {

  def subscribe(subscriber: Subscriber[MyEvent]) = {
    // connect to the Socket (Step: 1)
    // get the responses that are pushed (Step: 2)
    // transform them into MyEvent type (Step: 3)
  }
}

我有两个我不明白的开放性问题。

如何在订户中获得步骤3的结果?

每次当我获得MyEvent时,具有如下所示的订阅者,我都会看到正在创建一个新的连接。最终,每个传入事件都将运行步骤1,步骤2和步骤3。
val myObservable = new MyObservale()
myObservable.subscribe()

最佳答案

除非我误解了您的问题,否则您只需致电onNext:

def subscribe(subscriber: Subscriber[MyEvent]) = {
  // connect to the Socket (Step: 1)
  // get the responses that are pushed (Step: 2)
  // transform them into MyEvent type (Step: 3)

  // finally notify the subscriber:
  subscriber.onNext(myEventFromStep3)
}

和订阅的代码将执行以下操作:
myObservable.subscribe(onNext = println(_))

10-08 01:29