加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Akka流如何不断实现?

发布时间:2020-12-16 09:58:23 所属栏目:安全 来源:网络整理
导读:我在 Scala中使用 Akka Streams使用 AWS Java SDK从 AWS SQS队列中进行轮询.我创建了一个 ActorPublisher,它以两秒的间隔使消息出列: class SQSSubscriber(name: String) extends ActorPublisher[Message] { implicit val materializer = ActorMaterializer
我在 Scala中使用 Akka Streams使用 AWS Java SDK从 AWS SQS队列中进行轮询.我创建了一个 ActorPublisher,它以两秒的间隔使消息出列:

class SQSSubscriber(name: String) extends ActorPublisher[Message] {
  implicit val materializer = ActorMaterializer()

  val schedule = context.system.scheduler.schedule(0 seconds,2 seconds,self,"dequeue")

  val client = new AmazonSQSClient()
  client.setRegion(RegionUtils.getRegion("us-east-1"))
  val url = client.getQueueUrl(name).getQueueUrl

  val MaxBufferSize = 100
  var buf = Vector.empty[Message]

  override def receive: Receive = {
    case "dequeue" =>
      val messages = iterableAsScalaIterable(client.receiveMessage(new ReceiveMessageRequest(url).getMessages).toList
      messages.foreach(self ! _)
    case message: Message if buf.size == MaxBufferSize =>
      log.error("The buffer is full")
    case message: Message =>
      if (buf.isEmpty && totalDemand > 0)
        onNext(message)
      else {
        buf :+= message
        deliverBuf()
      }
    case Request(_) =>
      deliverBuf()
    case Cancel =>
      context.stop(self)
  }

  @tailrec final def deliverBuf(): Unit =
    if (totalDemand > 0) {
      if (totalDemand <= Int.MaxValue) {
        val (use,keep) = buf.splitAt(totalDemand.toInt)
        buf = keep
        use foreach onNext
      } else {
        val (use,keep) = buf.splitAt(Int.MaxValue)
        buf = keep
        use foreach onNext
        deliverBuf()
      }
    }
}

在我的应用程序中,我也试图以2秒的间隔运行流程:

val system = ActorSystem("system")
val sqsSource = Source.actorPublisher[Message](SQSSubscriber.props("queue-name"))
val flow = Flow[Message]
  .map { elem => system.log.debug(s"${elem.getBody} (${elem.getMessageId})"); elem }
  .to(Sink.ignore)

system.scheduler.schedule(0 seconds,2 seconds) {
  flow.runWith(sqsSource)(ActorMaterializer()(system))
}

但是,当我运行我的应用程序时,我收到java.util.concurrent.TimeoutException:Futures在[20000毫秒]之后超时,以及由ActorMaterializer引起的后续死信通知.

是否有推荐的方法来持续实现Akka Stream?

解决方法

我认为你不需要每2秒创建一个新的ActorPublisher.这似乎是多余的,浪费了内存.另外,我认为不需要ActorPublisher.从我所知的代码来看,你的实现将有越来越多的Streams查询相同的数据.来自客户端的每条消息将由N个不同的akka?? Streams处理,更糟糕的是,N将随着时间的推移而增长.

无限循环查询的迭代器

您可以使用scala的Iterator从ActorPublisher获得相同的行为.可以创建一个不断查询客户端的Iterator:

//setup the client
val client = {
  val sqsClient = new AmazonSQSClient()
  sqsClient setRegion (RegionUtils getRegion "us-east-1")
  sqsClient
}

val url = client.getQueueUrl(name).getQueueUrl

//single query
def queryClientForMessages : Iterable[Message] = iterableAsScalaIterable {
  client receiveMessage (new ReceiveMessageRequest(url).getMessages)
}

def messageListIteartor : Iterator[Iterable[Message]] = 
  Iterator continually messageListStream

//messages one-at-a-time "on demand",no timer pushing you around
def messageIterator() : Iterator[Message] = messageListIterator flatMap identity

此实现仅在所有先前消息已被使用时查询客户端,因此确实是reactive.无需跟踪具有固定大小的缓冲区.您的解决方案需要一个缓冲区,因为消息(通过计时器)的创建与消息消息(通过println)分离.在我的实施,创作和通过背压消耗量为tightly coupled.

Akka Stream Source

然后,您可以使用此Iterator生成器函数来提供akka流源:

def messageSource : Source[Message,_] = Source fromIterator messageIterator

流动形成

最后,您可以使用此Source来执行println(作为旁注:您的流量值实际上是一个接收器,因为Flow Sink = Sink).使用问题中的流量值:

messageSource runWith flow

一个akka Stream处理所有消息.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读