问题描述
我需要遍历一个形状像树的 API.例如,目录结构或讨论线程.它可以通过以下流程建模:
I need to traverse an API that is shaped like a tree. For example, a directory structure or threads of discussion. It can be modeled via the following flow:
type ItemId = Int
type Data = String
case class Item(data: Data, kids: List[ItemId])
def randomData(): Data = scala.util.Random.alphanumeric.take(2).mkString
// 0 => [1, 9]
// 1 => [10, 19]
// 2 => [20, 29]
// ...
// 9 => [90, 99]
// _ => []
// NB. I don't have access to this function, only the itemFlow.
def nested(id: ItemId): List[ItemId] =
if (id == 0) (1 to 9).toList
else if (1 <= id && id <= 9) ((id * 10) to ((id + 1) * 10 - 1)).toList
else Nil
val itemFlow: Flow[ItemId, Item, NotUsed] =
Flow.fromFunction(id => Item(randomData, nested(id)))
如何遍历这些数据?我得到了以下工作:
How can I traverse this data? I got the following working:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import scala.concurrent.Await
import scala.concurrent.duration.Duration
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val loop =
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val source = b.add(Flow[Int])
val merge = b.add(Merge[Int](2))
val fetch = b.add(itemFlow)
val bcast = b.add(Broadcast[Item](2))
val kids = b.add(Flow[Item].mapConcat(_.kids))
val data = b.add(Flow[Item].map(_.data))
val buffer = Flow[Int].buffer(100, OverflowStrategy.dropHead)
source ~> merge ~> fetch ~> bcast ~> data
merge <~ buffer <~ kids <~ bcast
FlowShape(source.in, data.out)
}
val flow = Flow.fromGraph(loop)
Await.result(
Source.single(0).via(flow).runWith(Sink.foreach(println)),
Duration.Inf
)
system.terminate()
但是,由于我使用的是带有缓冲区的流,因此流将永远不会完成.
However, since I'm using a flow with a buffer, the Stream will never complete.
当上游完成并且缓冲的元素被排空时完成
我阅读了图周期、活性和死锁 部分多次,我仍在努力寻找答案.
I read the Graph cycles, liveness, and deadlocks section multiple times and I'm still struggling to find an answer.
这将创建一个活锁:
import java.util.concurrent.atomic.AtomicInteger
def unfold[S, E](seed: S, flow: Flow[S, E, NotUsed])(loop: E => List[S]): Source[E, NotUsed] = {
// keep track of how many element flows,
val remaining = new AtomicInteger(1) // 1 = seed
// should be > max loop(x)
val bufferSize = 10000
val (ref, publisher) =
Source.actorRef[S](bufferSize, OverflowStrategy.fail)
.toMat(Sink.asPublisher(true))(Keep.both)
.run()
ref ! seed
Source.fromPublisher(publisher)
.via(flow)
.map{x =>
loop(x).foreach{ c =>
remaining.incrementAndGet()
ref ! c
}
x
}
.takeWhile(_ => remaining.decrementAndGet > 0)
}
我添加了一个 git repo 来测试您的解决方案 https://github.com/MasseGuillaume/source-unfold
I added a git repo to test your solution https://github.com/MasseGuillaume/source-unfold
推荐答案
我通过编写自己的 GraphStage 解决了这个问题.
I solved this problem by writing my own GraphStage.
import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import scala.concurrent.ExecutionContext
import scala.collection.mutable
import scala.util.{Success, Failure, Try}
import scala.collection.mutable
def unfoldTree[S, E](seeds: List[S],
flow: Flow[S, E, NotUsed],
loop: E => List[S],
bufferSize: Int)(implicit ec: ExecutionContext): Source[E, NotUsed] = {
Source.fromGraph(new UnfoldSource(seeds, flow, loop, bufferSize))
}
object UnfoldSource {
implicit class MutableQueueExtensions[A](private val self: mutable.Queue[A]) extends AnyVal {
def dequeueN(n: Int): List[A] = {
val b = List.newBuilder[A]
var i = 0
while (i < n) {
val e = self.dequeue
b += e
i += 1
}
b.result()
}
}
}
class UnfoldSource[S, E](seeds: List[S],
flow: Flow[S, E, NotUsed],
loop: E => List[S],
bufferSize: Int)(implicit ec: ExecutionContext) extends GraphStage[SourceShape[E]] {
val out: Outlet[E] = Outlet("UnfoldSource.out")
override val shape: SourceShape[E] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler {
// Nodes to expand
val frontier = mutable.Queue[S]()
frontier ++= seeds
// Nodes expanded
val buffer = mutable.Queue[E]()
// Using the flow to fetch more data
var inFlight = false
// Sink pulled but the buffer was empty
var downstreamWaiting = false
def isBufferFull() = buffer.size >= bufferSize
def fillBuffer(): Unit = {
val batchSize = Math.min(bufferSize - buffer.size, frontier.size)
val batch = frontier.dequeueN(batchSize)
inFlight = true
val toProcess =
Source(batch)
.via(flow)
.runWith(Sink.seq)(materializer)
val callback = getAsyncCallback[Try[Seq[E]]]{
case Failure(ex) => {
fail(out, ex)
}
case Success(es) => {
val got = es.size
inFlight = false
es.foreach{ e =>
buffer += e
frontier ++= loop(e)
}
if (downstreamWaiting && buffer.nonEmpty) {
val e = buffer.dequeue
downstreamWaiting = false
sendOne(e)
} else {
checkCompletion()
}
()
}
}
toProcess.onComplete(callback.invoke)
}
override def preStart(): Unit = {
checkCompletion()
}
def checkCompletion(): Unit = {
if (!inFlight && buffer.isEmpty && frontier.isEmpty) {
completeStage()
}
}
def sendOne(e: E): Unit = {
push(out, e)
checkCompletion()
}
def onPull(): Unit = {
if (buffer.nonEmpty) {
sendOne(buffer.dequeue)
} else {
downstreamWaiting = true
}
if (!isBufferFull && frontier.nonEmpty) {
fillBuffer()
}
}
setHandler(out, this)
}
}
这篇关于如何从递归生成值的流创建 akka-stream 源?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!