加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 使用Spark DStream作为Akka流的源的惯用方法

发布时间:2020-12-16 18:41:51 所属栏目:安全 来源:网络整理
导读:我正在构建一个REST API,它在Spark集群中启动一些计算,并使用一个分块的结果流进行响应.鉴于Spark流与计算结果,我可以使用 dstream.foreachRDD() 从Spark发送数据.我正在使用akka-http发送分块的HTTP响应: val requestHandler: HttpRequest = HttpResponse
我正在构建一个REST API,它在Spark集群中启动一些计算,并使用一个分块的结果流进行响应.鉴于Spark流与计算结果,我可以使用

dstream.foreachRDD()

从Spark发送数据.我正在使用akka-http发送分块的HTTP响应:

val requestHandler: HttpRequest => HttpResponse = {
  case HttpRequest(HttpMethods.GET,Uri.Path("/data"),_,_) =>
    HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`,source))
}

为简单起见,我试图首先使纯文本工作,稍后将添加JSON编组.

但是使用Spark DStream作为Akka流源的惯用方法是什么?我认为我应该可以通过套接字来实现它,但由于Spark驱动程序和REST端点位于相同的JVM上,因此开放套接字似乎有点过分.

解决方法

编辑:此答案仅适用于旧版本的spark和akka. PH88的答案是最新版本的正确方法.

您可以使用提供源的中间akka.actor.Actor(类似于this question).下面的解决方案不是“反应性”,因为底层的Actor需要维护RDD消息的缓冲区,如果下游的http客户端没有足够快地消耗块,则可能会丢弃这些消息.但是无论实现细节如何都会出现此问题,因为您无法将akka流背压的“限制”连接到DStream以减慢数据速度.这是因为DStream没有实现org.reactivestreams.Publisher.

基本拓扑是:

DStream --> Actor with buffer --> Source

要构建此toplogy,您必须创建一个类似于实现here的Actor:

//JobManager definition is provided in the link
val actorRef = actorSystem actorOf JobManager.props

基于JobManager创建流ByteStrings(消息)的源.另外,将ByteString转换为HttpEntity.ChunkStreamPart,这是HttpResponse所需要的:

import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.HttpEntity
import akka.util.ByteString

type Message = ByteString

val messageToChunkPart = 
  Flow[Message].map(HttpEntity.ChunkStreamPart(_))

//Actor with buffer --> Source
val source : Source[HttpEntity.ChunkStreamPart,Unit] = 
  Source(ActorPublisher[Message](actorRef)) via messageToChunkPart

将Spark DStream链接到Actor,以便将每个incomining RDD转换为Iterable of ByteString,然后转发给Actor:

import org.apache.spark.streaming.dstream.Dstream
import org.apache.spark.rdd.RDD

val dstream : DStream = ???

//This function converts your RDDs to messages being sent
//via the http response
def rddToMessages[T](rdd : RDD[T]) : Iterable[Message] = ???

def sendMessageToActor(message : Message) = actorRef ! message

//DStream --> Actor with buffer
dstream foreachRDD {rddToMessages(_) foreach sendMessageToActor}

为HttpResponse提供源:

val requestHandler: HttpRequest => HttpResponse = {
  case HttpRequest(HttpMethods.GET,source))
}

注意:dstream foreachRDD行和HttpReponse之间的时间/代码应该非常少,因为在执行foreach行之后,Actor的内部缓冲区将立即开始填充来自DStream的ByteString消息.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读