AWS SQS Java.并非所有消息都从SQS队列中检索
发布时间:2020-12-15 04:46:16 所属栏目:Java 来源:网络整理
导读:我一直在尝试使用AWS SDK for Java从SQS队列中检索所有消息的几种方法无济于事.我已经了解了AWS SQS的分布式特性,并且消息存储在不同的服务器上.但我不明白为什么这个架构不会被最终用户隐藏.我需要在Java代码中应用哪些技巧来检索所有消息,并且100%确定没
我一直在尝试使用AWS SDK for
Java从SQS队列中检索所有消息的几种方法无济于事.我已经了解了AWS SQS的分布式特性,并且消息存储在不同的服务器上.但我不明白为什么这个架构不会被最终用户隐藏.我需要在Java代码中应用哪些技巧来检索所有消息,并且100%确定没有人错过?
我试着用“长轮询”: ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl); List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages(); for (Message message : messages) { System.out.println(" Message"); System.out.println(" MessageId: " + message.getMessageId()); System.out.println(" ReceiptHandle: " + message.getReceiptHandle()); System.out.println(" MD5OfBody: " + message.getMD5OfBody()); System.out.println(" Body: " + message.getBody()); for (Entry<String,String> entry : message.getAttributes().entrySet()) { System.out.println(" Attribute"); System.out.println(" Name: " + entry.getKey()); System.out.println(" Value: " + entry.getValue()); } } System.out.println(); 这与请求批处理/客户端缓??冲: // Create the basic Amazon SQS async client AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient(); // Create the buffered client AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync); CreateQueueRequest createRequest = new CreateQueueRequest().withQueueName("MyTestQueue"); CreateQueueResult res = bufferedSqs.createQueue(createRequest); SendMessageRequest request = new SendMessageRequest(); String body = "test message_" + System.currentTimeMillis(); request.setMessageBody( body ); request.setQueueUrl(res.getQueueUrl()); SendMessageResult sendResult = bufferedSqs.sendMessage(request); ReceiveMessageRequest receiveRq = new ReceiveMessageRequest() .withMaxNumberOfMessages(10) .withQueueUrl(queueUrl); ReceiveMessageResult rx = bufferedSqs.receiveMessage(receiveRq); List<Message> messages = rx.getMessages(); for (Message message : messages) { System.out.println(" Message"); System.out.println(" MessageId: " + message.getMessageId()); System.out.println(" ReceiptHandle: " + message.getReceiptHandle()); System.out.println(" MD5OfBody: " + message.getMD5OfBody()); System.out.println(" Body: " + message.getBody()); for (Entry<String,String> entry : message.getAttributes().entrySet()) { System.out.println(" Attribute"); System.out.println(" Name: " + entry.getKey()); System.out.println(" Value: " + entry.getValue()); } } 但我仍然无法检索所有消息. 任何的想法? AWS论坛对我的帖子保持沉默. 解决方法
从SQS队列接收消息时,需要重复调??用sqs:ReceiveMessage.
在每次调用sqs:ReceiveMessage时,您将从队列中获得0个或更多消息,您需要迭代这些消息.对于每条消息,您还需要调用sqs:DeleteMessage,以便在处理完每条消息后从队列中删除消息. 在上面的“长轮询”示例周围添加一个循环以接收所有消息. for (;;) { ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl); List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages(); for (Message message : messages) { System.out.println(" Message"); System.out.println(" MessageId: " + message.getMessageId()); System.out.println(" ReceiptHandle: " + message.getReceiptHandle()); System.out.println(" MD5OfBody: " + message.getMD5OfBody()); System.out.println(" Body: " + message.getBody()); for (Entry<String,String> entry : message.getAttributes().entrySet()) { System.out.println(" Attribute"); System.out.println(" Name: " + entry.getKey()); System.out.println(" Value: " + entry.getValue()); } } System.out.println(); } 另请注意,您可能会多次收到相同的消息.因此,允许您的工作“重新处理”相同的消息,或检测重复的消息. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |