scala – Akka,SQS和Camel的消费者民意调查
我正在研究的项目需要从SQS读取消息,我决定使用Akka来分发这些消息的处理.
由于SQS是Camel支持的,并且内置了在Consumer类中使用Akka的功能,我想最好以这种方式实现端点和读取消息,尽管我没有看到很多人这样做的例子. 我的问题是我不能足够快地轮询我的队列以保持我的队列空,或接近空.我最初的想法是,我可以让消费者从SQS以X / s的速率接收来自Camel的消息.从那里,我可以简单地创建更多的消费者,以达到我需要处理消息的速度. 我的消费者: import akka.camel.{CamelMessage,Consumer} import akka.actor.{ActorRef,ActorPath} class MyConsumer() extends Consumer { def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)" var count = 0 def receive = { case msg: CamelMessage => { count += 1 } case _ => { println("Got something else") } } override def postStop(){ println("Count for actor: " + count) } } 如图所示,我设置了delay = 1以及& maxMessagesPerPoll = 10以提高消息速率,但是我无法使用相同的端点生成多个消费者. 我在文档中读到,默认情况下,端点被认为不支持多个使用者.我相信这也适用于SQS端点,因为产生多个消费者只会给我一个消费者,在运行系统一分钟之后,输出消息是Count for actor:x而不是其他输出Count for actor: 0. 如果这是有用的;我能够在单个消费者的当前实现中读取大约33条消息/秒. 这是从Akka的SQS队列中读取消息的正确方法吗?如果是这样,我有没有办法让它向外扩展,以便我可以将消息消耗率提高到接近900消息/秒的速度? 解决方法
可悲的是,Camel目前不支持并行消费SQS上的消息.
http://camel.465427.n5.nabble.com/Amazon-SQS-listener-as-multi-threaded-td5741541.html 为了解决这个问题,我编写了自己的Actor,使用aws-java-sdk轮询批量消息SQS. def receive = { case BeginPolling => { // re-queue sending asynchronously self ! BeginPolling // traverse the response val deleteMessageList = new ArrayList[DeleteMessageBatchRequestEntry] val messages = sqs.receiveMessage(receiveMessageRequest).getMessages messages.toList.foreach { node => { deleteMessageList.add(new DeleteMessageBatchRequestEntry(node.getMessageId,node.getReceiptHandle)) //log.info("Node body: {}",node.getBody) filterSupervisor ! node.getBody } } if(deleteEntryList.size() > 0){ val deleteMessageBatchRequest = new DeleteMessageBatchRequest(queueName,deleteMessageList) sqs.deleteMessageBatch(deleteMessageBatchRequest) } } case _ => { log.warning("Unknown message") } } 虽然我不确定这是否是最好的实现,并且它当然可以改进,以便请求不会经常命中空队列,但它确实适合我当前需要能够轮询来自同一队列的消息. 从SQS获得大约133(消息/秒)/演员. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |