本文介绍了如何开始使用Akka Streams?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Akka Streams库已经带有相当多的。



A 可以通过多种方式创建:

  scala> val s = Source.empty 
s:akka.stream.scaladsl.Source [Nothing,akka.NotUsed] = ...

scala> val s = Source.single( single element)
s:akka.stream.scaladsl.Source [String,akka.NotUsed] = ...

scala> val s = Source(1 to 3)
s:akka.stream.scaladsl.Source [Int,akka.NotUsed] = ...

scala> val s = Source(Future( Future(来自期货的单个值)))
s:akka.stream.scaladsl.Source [String,akka.NotUsed] = ...

scala> s runForeach println
res0:scala.concurrent.Future [akka.Done] = ...
来自未来
的单个值

在上述情况下,我们为 Source 提供了有限的数据,这意味着它们最终将终止。不应忘记,默认情况下,响应流是惰性的和异步的。这意味着必须明确要求对流进行评估。在Akka Streams中,这可以通过 run * 方法来完成。 runForeach 与众所周知的 foreach 函数没有什么不同-通过 run 另外,它明确表明我们要求对流进行评估。由于有限的数据很无聊,因此我们继续无限的一个。

  scala> val s = Source.repeat(5)
s:akka.stream.scaladsl.Source [Int,akka.NotUsed] = ...

scala> s占用3 runForeach println
res1:scala.concurrent.Future [akka.Done] = ...
5
5
5

使用 take 方法,我们可以创建一个人工的停止点,以防止我们无限期地进行评估。由于内置了actor支持,因此我们还可以轻松地向流中发送发送给actor的消息:

  def run (演员:ActorRef)= {
未来{Thread.sleep(300);演员! 1}
未来{Thread.sleep(200);演员! 2}
未来{Thread.sleep(100);演员! 3}
}
val s =源
.actorRef [Int](bufferSize = 0,OverflowStrategy.fail)
.mapMaterializedValue(run)

斯卡拉> s runForeach println
res1:scala.concurrent.Future [akka.Done] = ...
3
2
1

我们可以看到 Futures 是在不同线程上异步执行的,这说明了结果。在上面的示例中,传入元素的缓冲区不是必需的,因此,使用 OverflowStrategy.fail ,我们可以配置流在缓冲区溢出时失败。尤其是通过此参与者接口,我们可以通过任何数据源提供流。数据是由同一线程,不同线程,另一个进程创建还是由Internet上的远程系统创建都无所谓。



接收器



A 接收器基本上与源。它是流的端点,因此消耗数据。 接收器具有单个输入通道,没有输出通道。当我们想以可重用的方式指定数据收集器的行为而又不评估流时,特别需要接收器。已知的 run * 方法不允许使用这些属性,因此,最好使用 Sink 。 p>





当然可以将到达接收器的所有值转发给演员:

  val actor = system.actorOf(Props(新Actor {
覆盖def接收= {
case msg => println(s actor receive:$ msg)
}
}))

scala> val sink = Sink.actorRef [Int](actor,onCompleteMessage =流已完成)
sink:akka.stream.scaladsl.Sink [Int,akka.NotUsed] = ...

scala> val runnable =源(1到3),以下沉
runnable:akka.stream.scaladsl.RunnableGraph [akka.NotUsed] = ...

scala> runnable.run()
res3:akka.NotUsed = NotUsed
收到的演员:1
收到的演员:2
收到的演员:3
收到的演员:流已完成



流量



数据源和接收器都很棒如果您需要在Akka流和现有系统之间建立连接,但实际上无法使用它们进行任何操作。流程是Akka Streams基本抽象中最后缺少的部分。它们充当不同流之间的连接器,并可用于转换其元素。







在为了更好地了解,我们将看一些示例:

  scala> val source = Source(1 to 3)
source:akka.stream.scaladsl.Source [Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach [Int](println)
sink:akka.stream.scaladsl.Sink [Int,scala.concurrent.Future [akka.Done]] = ...

scala> val invert = Flow [Int] .map(elem => elem * -1)
invert:akka.stream.scaladsl.Flow [Int,Int,akka.NotUsed] = ...

scala> val doubler = Flow [Int] .map(elem => elem * 2)
doubler:akka.stream.scaladsl.Flow [Int,Int,akka.NotUsed] = ...

scala> val runnable =通过加倍器反转以接收
的源runnable:akka.stream.scaladsl.RunnableGraph [akka.NotUsed] = ...

scala> runnable.run()
res10:akka.NotUsed =未使用
-2
-4
-6

通过通过方法,我们可以将来源流量。我们需要指定输入类型,因为编译器无法为我们推断输入类型。正如我们在这个简单示例中已经看到的那样,流 invert double 的流完全独立于任何数据生产者和消费者。它们仅转换数据并将其转发到输出通道。这意味着我们可以在多个流之间重用流:

  scala> val s1 =通过倒置到接收器
的Source(1至3)s1:akka.stream.scaladsl.RunnableGraph [akka.NotUsed] = ...

scala> val s2 =通过反转到接收器
的源(-3到-1)s2:akka.stream.scaladsl.RunnableGraph [akka.NotUsed] = ...

scala> s1.run()
res10:akka.NotUsed =未使用
-1
-2
-3

scala> s2.run()
res11:akka.NotUsed =未使用
3
2
1

s1 s2 代表全新的流-它们不共享任何数据



无限制的数据流



在继续之前,我们应该首先回顾一些关键反应流的各个方面。无限数量的元素可以到达任意点,并且可以将流置于不同的状态。除了可运行流(这是通常的状态)之外,流还可能由于错误或表示没有其他数据到达的信号而停止。可以通过在时间轴上标记事件来以图形方式对流进行建模,例如:





A Source can be created in multiple ways:

scala> val s = Source.empty
s: akka.stream.scaladsl.Source[Nothing,akka.NotUsed] = ...

scala> val s = Source.single("single element")
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...

scala> val s = Source(1 to 3)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val s = Source(Future("single value from a Future"))
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...

scala> s runForeach println
res0: scala.concurrent.Future[akka.Done] = ...
single value from a Future

In the above cases we fed the Source with finite data, which means they will terminate eventually. One should not forget, that Reactive Streams are lazy and asynchronous by default. This means one explicitly has to request the evaluation of the stream. In Akka Streams this can be done through the run* methods. The runForeach would be no different to the well known foreach function - through the run addition it makes explicit that we ask for an evaluation of the stream. Since finite data is boring, we continue with infinite one:

scala> val s = Source.repeat(5)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> s take 3 runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
5
5
5

With the take method we can create an artificial stop point that prevents us from evaluating indefinitely. Since actor support is built-in, we can also easily feed the stream with messages that are sent to an actor:

def run(actor: ActorRef) = {
  Future { Thread.sleep(300); actor ! 1 }
  Future { Thread.sleep(200); actor ! 2 }
  Future { Thread.sleep(100); actor ! 3 }
}
val s = Source
  .actorRef[Int](bufferSize = 0, OverflowStrategy.fail)
  .mapMaterializedValue(run)

scala> s runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
3
2
1

We can see that the Futures are executed asynchronously on different threads, which explains the result. In the above example a buffer for the incoming elements is not necessary and therefore with OverflowStrategy.fail we can configure that the stream should fail on a buffer overflow. Especially through this actor interface, we can feed the stream through any data source. It doesn't matter if the data is created by the same thread, by a different one, by another process or if they come from a remote system over the Internet.

Sink

A Sink is basically the opposite of a Source. It is the endpoint of a stream and therefore consumes data. A Sink has a single input channel and no output channel. Sinks are especially needed when we want to specify the behavior of the data collector in a reusable way and without evaluating the stream. The already known run* methods do not allow us these properties, therefore it is preferred to use Sink instead.

A short example of a Sink in action:

scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach[Int](elem => println(s"sink received: $elem"))
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...

scala> val flow = source to sink
flow: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> flow.run()
res3: akka.NotUsed = NotUsed
sink received: 1
sink received: 2
sink received: 3

Connecting a Source to a Sink can be done with the to method. It returns a so called RunnableFlow, which is as we will later see a special form of a Flow - a stream that can be executed by just calling its run() method.

It is of course possible to forward all values that arrive at a sink to an actor:

val actor = system.actorOf(Props(new Actor {
  override def receive = {
    case msg => println(s"actor received: $msg")
  }
}))

scala> val sink = Sink.actorRef[Int](actor, onCompleteMessage = "stream completed")
sink: akka.stream.scaladsl.Sink[Int,akka.NotUsed] = ...

scala> val runnable = Source(1 to 3) to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> runnable.run()
res3: akka.NotUsed = NotUsed
actor received: 1
actor received: 2
actor received: 3
actor received: stream completed

Flow

Data sources and sinks are great if you need a connection between Akka streams and an existing system but one can not really do anything with them. Flows are the last missing piece in the Akka Streams base abstraction. They act as a connector between different streams and can be used to transform its elements.

If a Flow is connected to a Source a new Source is the result. Likewise, a Flow connected to a Sink creates a new Sink. And a Flow connected with both a Source and a Sink results in a RunnableFlow. Therefore, they sit between the input and the output channel but by themselves do not correspond to one of the flavors as long as they are not connected to either a Source or a Sink.

In order to get a better understanding of Flows, we will have a look at some examples:

scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach[Int](println)
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...

scala> val invert = Flow[Int].map(elem => elem * -1)
invert: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...

scala> val doubler = Flow[Int].map(elem => elem * 2)
doubler: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...

scala> val runnable = source via invert via doubler to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> runnable.run()
res10: akka.NotUsed = NotUsed
-2
-4
-6

Via the via method we can connect a Source with a Flow. We need to specify the input type because the compiler can't infer it for us. As we can already see in this simple example, the flows invert and double are completely independent from any data producers and consumers. They only transform the data and forward it to the output channel. This means that we can reuse a flow among multiple streams:

scala> val s1 = Source(1 to 3) via invert to sink
s1: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> val s2 = Source(-3 to -1) via invert to sink
s2: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> s1.run()
res10: akka.NotUsed = NotUsed
-1
-2
-3

scala> s2.run()
res11: akka.NotUsed = NotUsed
3
2
1

s1 and s2 represent completely new streams - they do not share any data through their building blocks.

Unbounded Data Streams

Before we move on we should first revisit some of the key aspects of Reactive Streams. An unbounded number of elements can arrive at any point and can put a stream in different states. Beside from a runnable stream, which is the usual state, a stream may get stopped either through an error or through a signal that denotes that no further data will arrive. A stream can be modeled in a graphical way by marking events on a timeline as it is the case here:

We have already seen runnable flows in the examples of the previous section. We get a RunnableGraph whenever a stream can actually be materialized, which means that a Sink is connected to a Source. So far we always materialized to the value Unit, which can be seen in the types:

val source: Source[Int, NotUsed] = Source(1 to 3)
val sink: Sink[Int, Future[Done]] = Sink.foreach[Int](println)
val flow: Flow[Int, Int, NotUsed] = Flow[Int].map(x => x)

For Source and Sink the second type parameter and for Flow the third type parameter denote the materialized value. Throughout this answer, the full meaning of materialization shall not be explained. However, further details about materialization can be found at the official documentation. For now the only thing we need to know is that the materialized value is what we get when we run a stream. Since we were only interested in side effects so far, we got Unit as the materialized value. The exception to this was a materialization of a sink, which resulted in a Future. It gave us back a Future, since this value can denote when the stream that is connected to the sink has been ended. So far, the previous code examples were nice to explain the concept but they were also boring because we only dealt with finite streams or with very simple infinite ones. To make it more interesting, in the following a full asynchronous and unbounded stream shall be explained.

ClickStream Example

As an example, we want to have a stream that captures click events. To make it more challenging, let's say we also want to group click events that happen in a short time after each other. This way we could easily discover double, triple or tenfold clicks. Furthermore, we want to filter out all single clicks. Take a deep breath and imagine how you would solve that problem in an imperative manner. I bet no one would be able to implement a solution that works correctly on the first try. In a reactive fashion this problem is trivial to solve. In fact, the solution is so simple and straightforward to implement that we can even express it in a diagram that directly describes the behavior of the code:

The gray boxes are functions that describe how one stream is transformed into another. With the throttle function we accumulate clicks within 250 milliseconds, the map and filter functions should be self-explanatory. The color orbs represent an event and the arrows depict how they flow through our functions. Later in the processing steps, we get less and less elements that flow through our stream, since we group them together and filter them out. The code for this image would look something like this:

val multiClickStream = clickStream
    .throttle(250.millis)
    .map(clickEvents => clickEvents.length)
    .filter(numberOfClicks => numberOfClicks >= 2)

The whole logic can be represented in only four lines of code! In Scala, we could write it even shorter:

val multiClickStream = clickStream.throttle(250.millis).map(_.length).filter(_ >= 2)

The definition of clickStream is a little bit more complex but this is only the case because the example program runs on the JVM, where capturing of click events is not easily possible. Another complication is that Akka by default doesn't provide the throttle function. Instead we had to write it by ourselves. Since this function is (as it is the case for the map or filter functions) reusable across different use cases I don't count these lines to the number of lines we needed to implement the logic. In imperative languages however, it is normal that logic can't be reused that easily and that the different logical steps happen all at one place instead of being applied sequentially, which means that we probably would have misshaped our code with the throttling logic. The full code example is available as a gist and shall not be discussed here any further.

SimpleWebServer Example

What should be discussed instead is another example. While the click stream is a nice example to let Akka Streams handle a real world example, it lacks the power to show parallel execution in action. The next example shall represent a small web server that can handle multiple requests in parallel. The web sever shall be able to accept incoming connections and receive byte sequences from them that represent printable ASCII signs. These byte sequences or strings should be split at all newline-characters into smaller parts. After that, the server shall respond to the client with each of the split lines. Alternatively, it could do something else with the lines and give a special answer token, but we want to keep it simple in this example and therefore don't introduce any fancy features. Remember, the server needs to be able to handle multiple requests at the same time, which basically means that no request is allowed to block any other request from further execution. Solving all of these requirements can be hard in an imperative way - with Akka Streams however, we shouldn't need more than a few lines to solve any of these. First, let's have an overview over the server itself:

Basically, there are only three main building blocks. The first one needs to accept incoming connections. The second one needs to handle incoming requests and the third one needs to send a response. Implementing all of these three building blocks is only a little bit more complicated than implementing the click stream:

def mkServer(address: String, port: Int)(implicit system: ActorSystem, materializer: Materializer): Unit = {
  import system.dispatcher

  val connectionHandler: Sink[Tcp.IncomingConnection, Future[Unit]] =
    Sink.foreach[Tcp.IncomingConnection] { conn =>
      println(s"Incoming connection from: ${conn.remoteAddress}")
      conn.handleWith(serverLogic)
    }

  val incomingCnnections: Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]] =
    Tcp().bind(address, port)

  val binding: Future[Tcp.ServerBinding] =
    incomingCnnections.to(connectionHandler).run()

  binding onComplete {
    case Success(b) =>
      println(s"Server started, listening on: ${b.localAddress}")
    case Failure(e) =>
      println(s"Server could not be bound to $address:$port: ${e.getMessage}")
  }
}

The function mkServer takes (besides from the address and the port of the server) also an actor system and a materializer as implicit parameters. The control flow of the server is represented by binding, which takes a source of incoming connections and forwards them to a sink of incoming connections. Inside of connectionHandler, which is our sink, we handle every connection by the flow serverLogic, which will be described later. binding returns a Future, which completes when the server has been started or the start failed, which could be the case when the port is already taken by another process. The code however, doesn't completely reflect the graphic as we can't see a building block that handles responses. The reason for this is that the connection already provides this logic by itself. It is a bidirectional flow and not just a unidirectional one as the flows we have seen in the previous examples. As it was the case for materialization, such complex flows shall not be explained here. The official documentation has plenty of material to cover more complex flow graphs. For now it is enough to know that Tcp.IncomingConnection represents a connection that knows how to receive requests and how to send responses. The part that is still missing is the serverLogic building block. It can look like this:

Once again, we are able to split the logic in several simple building blocks that all together form the flow of our program. First we want to split our sequence of bytes in lines, which we have to do whenever we find a newline character. After that, the bytes of each line need to be converted to a string because working with raw bytes is cumbersome. Overall we could receive a binary stream of a complicated protocol, which would make working with the incoming raw data extremely challenging. Once we have a readable string, we can create an answer. For simplicity reasons the answer can be anything in our case. In the end, we have to convert back our answer to a sequence of bytes that can be sent over the wire. The code for the entire logic may look like this:

val serverLogic: Flow[ByteString, ByteString, Unit] = {
  val delimiter = Framing.delimiter(
    ByteString("\n"),
    maximumFrameLength = 256,
    allowTruncation = true)

  val receiver = Flow[ByteString].map { bytes =>
    val message = bytes.utf8String
    println(s"Server received: $message")
    message
  }

  val responder = Flow[String].map { message =>
    val answer = s"Server hereby responds to message: $message\n"
    ByteString(answer)
  }

  Flow[ByteString]
    .via(delimiter)
    .via(receiver)
    .via(responder)
}

We already know that serverLogic is a flow that takes a ByteString and has to produce a ByteString. With delimiter we can split a ByteString in smaller parts - in our case it needs to happen whenever a newline character occurs. receiver is the flow that takes all of the split byte sequences and converts them to a string. This is of course a dangerous conversion, since only printable ASCII characters should be converted to a string but for our needs it is good enough. responder is the last component and is responsible for creating an answer and converting the answer back to a sequence of bytes. As opposed to the graphic we didn't split this last component in two, since the logic is trivial. At the end, we connect all of the flows through the via function. At this point one may ask whether we took care of the multi-user property that was mentioned at the beginning. And indeed we did even though it may not be obvious immediately. By looking at this graphic it should get more clear:

The serverLogic component is nothing but a flow that contains smaller flows. This component takes an input, which is a request, and produces an output, which is the response. Since flows can be constructed multiple times and they all work independently to each other, we achieve through this nesting our multi-user property. Every request is handled within its own request and therefore a short running request can overrun a previously started long running request. In case you wondered, the definition of serverLogic that was shown previously can of course be written a lot shorter by inlining most of its inner definitions:

val serverLogic = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .map(msg => s"Server hereby responds to message: $msg\n")
  .map(ByteString(_))

A test of the web server may look like this:

$ # Client
$ echo "Hello World\nHow are you?" | netcat 127.0.0.1 6666
Server hereby responds to message: Hello World
Server hereby responds to message: How are you?

In order for the above code example to function correctly, we first need to start the server, which is depicted by the startServer script:

$ # Server
$ ./startServer 127.0.0.1 6666
[DEBUG] Server started, listening on: /127.0.0.1:6666
[DEBUG] Incoming connection from: /127.0.0.1:37972
[DEBUG] Server received: Hello World
[DEBUG] Server received: How are you?

The full code example of this simple TCP server can be found here. We are not only able to write a server with Akka Streams but also the client. It may look like this:

val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .map(println)
  .map(_ ⇒ StdIn.readLine("> "))
  .map(_+"\n")
  .map(ByteString(_))

connection.join(flow).run()

The full code TCP client can be found here. The code looks quite similar but in contrast to the server we don't have to manage the incoming connections anymore.

Complex Graphs

In the previous sections we have seen how we can construct simple programs out of flows. However, in reality it is often not enough to just rely on already built-in functions to construct more complex streams. If we want to be able to use Akka Streams for arbitrary programs we need to know how to build our own custom control structures and combinable flows that allow us to tackle the complexity of our applications. The good news is that Akka Streams was designed to scale with the needs of the users and in order to give you a short introduction into the more complex parts of Akka Streams, we add some more features to our client/server example.

One thing we can't do yet is closing a connection. At this point it starts to get a little bit more complicated because the stream API we have seen so far doesn't allow us to stop a stream at an arbitrary point. However, there is the GraphStage abstraction, which can be used to create arbitrary graph processing stages with any number of input or output ports. Let's first have a look at the server side, where we introduce a new component, called closeConnection:

val closeConnection = new GraphStage[FlowShape[String, String]] {
  val in = Inlet[String]("closeConnection.in")
  val out = Outlet[String]("closeConnection.out")

  override val shape = FlowShape(in, out)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
    setHandler(in, new InHandler {
      override def onPush() = grab(in) match {
        case "q" ⇒
          push(out, "BYE")
          completeStage()
        case msg ⇒
          push(out, s"Server hereby responds to message: $msg\n")
      }
    })
    setHandler(out, new OutHandler {
      override def onPull() = pull(in)
    })
  }
}

This API looks a lot more cumbersome than the flow API. No wonder, we have to do a lot of imperative steps here. In exchange, we have more control over the behavior of our streams. In the above example, we only specify one input and one output port and make them available to the system by overriding the shape value. Furthermore we defined a so called InHandler and a OutHandler, which are in this order responsible for receiving and emitting elements. If you looked closely to the full click stream example you should recognize these components already. In the InHandler we grab an element and if it is a string with a single character 'q', we want to close the stream. In order to give the client a chance to find out that the stream will get closed soon, we emit the string "BYE" and then we immediately close the stage afterwards. The closeConnection component can be combined with a stream via the via method, which was introduced in the section about flows.

Beside from being able to close connections, it would also be nice if we could show a welcome message to a newly created connection. In order to do this we once again have to go a little bit further:

def serverLogic
    (conn: Tcp.IncomingConnection)
    (implicit system: ActorSystem)
    : Flow[ByteString, ByteString, NotUsed]
    = Flow.fromGraph(GraphDSL.create() { implicit b ⇒
  import GraphDSL.Implicits._
  val welcome = Source.single(ByteString(s"Welcome port ${conn.remoteAddress}!\n"))
  val logic = b.add(internalLogic)
  val concat = b.add(Concat[ByteString]())
  welcome ~> concat.in(0)
  logic.outlet ~> concat.in(1)

  FlowShape(logic.in, concat.out)
})

The function serverLogic now takes the incoming connection as a parameter. Inside of its body we use a DSL that allows us to describe complex stream behavior. With welcome we create a stream that can only emit one element - the welcome message. logic is what was described as serverLogic in the previous section. The only notable difference is that we added closeConnection to it. Now actually comes the interesting part of the DSL. The GraphDSL.create function makes a builder b available, which is used to express the stream as a graph. With the ~> function it is possible to connect input and output ports with each other. The Concat component that is used in the example can concatenate elements and is here used to prepend the welcome message in front of the other elements that come out of internalLogic. In the last line, we only make the input port of the server logic and the output port of the concatenated stream available because all the other ports shall remain an implementation detail of the serverLogic component. For an in-depth introduction to the graph DSL of Akka Streams, visit the corresponding section in the official documentation. The full code example of the complex TCP server and of a client that can communicate with it can be found here. Whenever you open a new connection from the client you should see a welcoming message and by typing "q" on the client you should see a message that tells you that the connection has been canceled.

There are still some topics which weren't covered by this answer. Especially materialization may scare one reader or another but I'm sure with the material that is covered here everyone should be able to go the next steps by themselves. As already said, the official documentation is a good place to continue learning about Akka Streams.

这篇关于如何开始使用Akka Streams?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-04 02:58