scala – Akka流如何不断实现?
我在
Scala中使用
Akka Streams使用
AWS Java SDK从
AWS SQS队列中进行轮询.我创建了一个
ActorPublisher,它以两秒的间隔使消息出列:
class SQSSubscriber(name: String) extends ActorPublisher[Message] { implicit val materializer = ActorMaterializer() val schedule = context.system.scheduler.schedule(0 seconds,2 seconds,self,"dequeue") val client = new AmazonSQSClient() client.setRegion(RegionUtils.getRegion("us-east-1")) val url = client.getQueueUrl(name).getQueueUrl val MaxBufferSize = 100 var buf = Vector.empty[Message] override def receive: Receive = { case "dequeue" => val messages = iterableAsScalaIterable(client.receiveMessage(new ReceiveMessageRequest(url).getMessages).toList messages.foreach(self ! _) case message: Message if buf.size == MaxBufferSize => log.error("The buffer is full") case message: Message => if (buf.isEmpty && totalDemand > 0) onNext(message) else { buf :+= message deliverBuf() } case Request(_) => deliverBuf() case Cancel => context.stop(self) } @tailrec final def deliverBuf(): Unit = if (totalDemand > 0) { if (totalDemand <= Int.MaxValue) { val (use,keep) = buf.splitAt(totalDemand.toInt) buf = keep use foreach onNext } else { val (use,keep) = buf.splitAt(Int.MaxValue) buf = keep use foreach onNext deliverBuf() } } } 在我的应用程序中,我也试图以2秒的间隔运行流程: val system = ActorSystem("system") val sqsSource = Source.actorPublisher[Message](SQSSubscriber.props("queue-name")) val flow = Flow[Message] .map { elem => system.log.debug(s"${elem.getBody} (${elem.getMessageId})"); elem } .to(Sink.ignore) system.scheduler.schedule(0 seconds,2 seconds) { flow.runWith(sqsSource)(ActorMaterializer()(system)) } 但是,当我运行我的应用程序时,我收到java.util.concurrent.TimeoutException:Futures在[20000毫秒]之后超时,以及由ActorMaterializer引起的后续死信通知. 是否有推荐的方法来持续实现Akka Stream? 解决方法
我认为你不需要每2秒创建一个新的ActorPublisher.这似乎是多余的,浪费了内存.另外,我认为不需要ActorPublisher.从我所知的代码来看,你的实现将有越来越多的Streams查询相同的数据.来自客户端的每条消息将由N个不同的akka?? Streams处理,更糟糕的是,N将随着时间的推移而增长.
无限循环查询的迭代器 您可以使用scala的Iterator从ActorPublisher获得相同的行为.可以创建一个不断查询客户端的Iterator: //setup the client val client = { val sqsClient = new AmazonSQSClient() sqsClient setRegion (RegionUtils getRegion "us-east-1") sqsClient } val url = client.getQueueUrl(name).getQueueUrl //single query def queryClientForMessages : Iterable[Message] = iterableAsScalaIterable { client receiveMessage (new ReceiveMessageRequest(url).getMessages) } def messageListIteartor : Iterator[Iterable[Message]] = Iterator continually messageListStream //messages one-at-a-time "on demand",no timer pushing you around def messageIterator() : Iterator[Message] = messageListIterator flatMap identity 此实现仅在所有先前消息已被使用时查询客户端,因此确实是reactive.无需跟踪具有固定大小的缓冲区.您的解决方案需要一个缓冲区,因为消息(通过计时器)的创建与消息消息(通过println)分离.在我的实施,创作和通过背压消耗量为tightly coupled. Akka Stream Source 然后,您可以使用此Iterator生成器函数来提供akka流源: def messageSource : Source[Message,_] = Source fromIterator messageIterator 流动形成 最后,您可以使用此Source来执行println(作为旁注:您的流量值实际上是一个接收器,因为Flow Sink = Sink).使用问题中的流量值: messageSource runWith flow 一个akka Stream处理所有消息. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |