我正在尝试使用Source.actorRef方法创建akka.stream.scaladsl.Source对象。形式的东西

import akka.stream.OverflowStrategy.fail
import akka.stream.scaladsl.Source

case class Weather(zip : String, temp : Double, raining : Boolean)

val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)

val sunnySource = weatherSource.filter(!_.raining)
...

我的问题是:如何将数据发送到基于ActorRef的源对象

我以为向源发送消息是某种形式
//does not compile
weatherSource ! Weather("90210", 72.0, false)
weatherSource ! Weather("02139", 32.0, true)

但是weatherSource没有!运算符或tell方法。

documentation不太说明如何使用Source.actorRef,它只是说您可以...

预先感谢您的审核和回复。

最佳答案

您需要一个Flow:

  import akka.stream.OverflowStrategy.fail
  import akka.stream.scaladsl.Source
  import akka.stream.scaladsl.{Sink, Flow}

  case class Weather(zip : String, temp : Double, raining : Boolean)

  val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)

  val sunnySource = weatherSource.filter(!_.raining)

  val ref = Flow[Weather]
    .to(Sink.ignore)
    .runWith(sunnySource)

  ref ! Weather("02139", 32.0, true)

请记住,这都是实验性的,可能会改变!

08-03 15:20