本文介绍了使用Spark DStream作为Akka流源的惯用方式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在构建一个REST API,该API在Spark集群中开始一些计算,并以分块的结果流作为响应。给定具有计算结果的Spark流,我可以使用

I'm building a REST API that starts some calculation in a Spark cluster and responds with a chunked stream of the results. Given the Spark stream with calculation results, I can use

dstream.foreachRDD()

将数据发送到Spark之外。我正在使用akka-http发送分块的HTTP响应:

to send the data out of Spark. I'm sending the chunked HTTP response with akka-http:

val requestHandler: HttpRequest => HttpResponse = {
  case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) =>
    HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source))
}

为简单起见,我试图先使纯文本工作,稍后再添加JSON编组。

For simplicity, I'm trying to get plain text working first, will add JSON marshalling later.

但是使用的惯用方式是什么Spark DStream作为Akka流的源?我以为我应该可以通过套接字来执行此操作,但由于Spark驱动程序和REST端点位于同一JVM上,因此仅打开套接字就可以了,因为这似乎有点过头了。

But what is the idiomatic way of using the Spark DStream as a Source for the Akka stream? I figured I should be able to do it via a socket but since the Spark driver and the REST endpoint are sitting on the same JVM opening a socket just for this seems a bit of an overkill.

推荐答案

编辑:此答案仅适用于旧版本的spark和akka。 PH88的答案是使用最新版本的正确方法。

您可以使用中间 akka.actor.Actor 可以提供来源(类似于)。下面的解决方案不是被动的,因为底层的Actor需要维护RDD消息的缓冲区,如果下游http客户端没有足够快地消耗块,则该缓冲区可能会被丢弃。但是,无论实现细节如何,都会出现此问题,因为您无法将akka流反压的限制连接到DStream上以减慢数据速度。这是由于DStream无法实现。

You can use an intermediate akka.actor.Actor that feeds a Source (similar to this question). The solution below is not "reactive" because the underlying Actor would need to maintain a buffer of RDD messages that may be dropped if the downstream http client isn't consuming chunks quickly enough. But this problem occurs regardless of the implementation details since you cannot connect the "throttling" of the akka stream back-pressure to the DStream in order to slow down the data. This is due to the fact that DStream does not implement org.reactivestreams.Publisher .

基本拓扑是:

DStream --> Actor with buffer --> Source

要构建此拓扑,您必须创建类似于实现:

To construct this toplogy you have to create an Actor similar to the implementation here :

//JobManager definition is provided in the link
val actorRef = actorSystem actorOf JobManager.props

基于JobManager创建ByteString(消息)流源。另外,将 ByteString 转换为 HttpEntity.ChunkStreamPart ,这是HttpResponse要求的:

Create a stream Source of ByteStrings (messages) based on the JobManager. Also, convert the ByteString to HttpEntity.ChunkStreamPart which is what the HttpResponse requires:

import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.HttpEntity
import akka.util.ByteString

type Message = ByteString

val messageToChunkPart =
  Flow[Message].map(HttpEntity.ChunkStreamPart(_))

//Actor with buffer --> Source
val source : Source[HttpEntity.ChunkStreamPart, Unit] =
  Source(ActorPublisher[Message](actorRef)) via messageToChunkPart

将Spark DStream链接到Actor,以便将每个传入的RDD转换为ByteString的Iterable,然后转发给Actor:

Link the Spark DStream to the Actor so that each incomining RDD is converted to an Iterable of ByteString and then forwarded to the Actor:

import org.apache.spark.streaming.dstream.Dstream
import org.apache.spark.rdd.RDD

val dstream : DStream = ???

//This function converts your RDDs to messages being sent
//via the http response
def rddToMessages[T](rdd : RDD[T]) : Iterable[Message] = ???

def sendMessageToActor(message : Message) = actorRef ! message

//DStream --> Actor with buffer
dstream foreachRDD {rddToMessages(_) foreach sendMessageToActor}

将源提供给HttpResponse:

Provide the Source to the HttpResponse:

val requestHandler: HttpRequest => HttpResponse = {
  case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) =>
    HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source))
}

注意:在 dstream foreachRDD 行和HttpReponse之间应该只有很少的时间/代码,因为Actor的内部缓冲区将立即开始用ByteString填充 foreach 行执行后,DStream中就会出现一条消息。

Note: there should be very little time/code between the dstream foreachRDD line and the HttpReponse since the Actor's internal buffer will immediately begin to fill with ByteString message coming from the DStream after the foreach line is executed.

这篇关于使用Spark DStream作为Akka流源的惯用方式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-20 13:49
查看更多