scala – 使用Spark DStream作为Akka流的源的惯用方法
我正在构建一个REST API,它在Spark集群中启动一些计算,并使用一个分块的结果流进行响应.鉴于Spark流与计算结果,我可以使用
dstream.foreachRDD() 从Spark发送数据.我正在使用akka-http发送分块的HTTP响应: val requestHandler: HttpRequest => HttpResponse = { case HttpRequest(HttpMethods.GET,Uri.Path("/data"),_,_) => HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`,source)) } 为简单起见,我试图首先使纯文本工作,稍后将添加JSON编组. 但是使用Spark DStream作为Akka流源的惯用方法是什么?我认为我应该可以通过套接字来实现它,但由于Spark驱动程序和REST端点位于相同的JVM上,因此开放套接字似乎有点过分. 解决方法
编辑:此答案仅适用于旧版本的spark和akka. PH88的答案是最新版本的正确方法.
您可以使用提供源的中间akka.actor.Actor(类似于this question).下面的解决方案不是“反应性”,因为底层的Actor需要维护RDD消息的缓冲区,如果下游的http客户端没有足够快地消耗块,则可能会丢弃这些消息.但是无论实现细节如何都会出现此问题,因为您无法将akka流背压的“限制”连接到DStream以减慢数据速度.这是因为DStream没有实现 基本拓扑是: DStream --> Actor with buffer --> Source 要构建此toplogy,您必须创建一个类似于实现here的Actor: //JobManager definition is provided in the link val actorRef = actorSystem actorOf JobManager.props 基于JobManager创建流ByteStrings(消息)的源.另外,将ByteString转换为HttpEntity.ChunkStreamPart,这是HttpResponse所需要的: import akka.stream.actor.ActorPublisher import akka.stream.scaladsl.Source import akka.http.scaladsl.model.HttpEntity import akka.util.ByteString type Message = ByteString val messageToChunkPart = Flow[Message].map(HttpEntity.ChunkStreamPart(_)) //Actor with buffer --> Source val source : Source[HttpEntity.ChunkStreamPart,Unit] = Source(ActorPublisher[Message](actorRef)) via messageToChunkPart 将Spark DStream链接到Actor,以便将每个incomining RDD转换为Iterable of ByteString,然后转发给Actor: import org.apache.spark.streaming.dstream.Dstream import org.apache.spark.rdd.RDD val dstream : DStream = ??? //This function converts your RDDs to messages being sent //via the http response def rddToMessages[T](rdd : RDD[T]) : Iterable[Message] = ??? def sendMessageToActor(message : Message) = actorRef ! message //DStream --> Actor with buffer dstream foreachRDD {rddToMessages(_) foreach sendMessageToActor} 为HttpResponse提供源: val requestHandler: HttpRequest => HttpResponse = { case HttpRequest(HttpMethods.GET,source)) } 注意:dstream foreachRDD行和HttpReponse之间的时间/代码应该非常少,因为在执行foreach行之后,Actor的内部缓冲区将立即开始填充来自DStream的ByteString消息. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |