加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

AWS SQS Java.并非所有消息都从SQS队列中检索

发布时间:2020-12-15 04:46:16 所属栏目:Java 来源:网络整理
导读:我一直在尝试使用AWS SDK for Java从SQS队列中检索所有消息的几种方法无济于事.我已经了解了AWS SQS的分布式特性,并且消息存储在不同的服务器上.但我不明白为什么这个架构不会被最终用户隐藏.我需要在Java代码中应用哪些技巧来检索所有消息,并且100%确定没
我一直在尝试使用AWS SDK for Java从SQS队列中检索所有消息的几种方法无济于事.我已经了解了AWS SQS的分布式特性,并且消息存储在不同的服务器上.但我不明白为什么这个架构不会被最终用户隐藏.我需要在Java代码中应用哪些技巧来检索所有消息,并且100%确定没有人错过?

我试着用“长轮询”:

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
for (Message message : messages) {
System.out.println(" Message");
System.out.println(" MessageId: " + message.getMessageId());
System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
System.out.println(" MD5OfBody: " + message.getMD5OfBody());
System.out.println(" Body: " + message.getBody());
for (Entry<String,String> entry : message.getAttributes().entrySet()) {
System.out.println(" Attribute");
System.out.println(" Name: " + entry.getKey());
System.out.println(" Value: " + entry.getValue());
}
}
System.out.println();

这与请求批处理/客户端缓??冲:

// Create the basic Amazon SQS async client
    AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient();

    // Create the buffered client
    AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);

    CreateQueueRequest createRequest = new CreateQueueRequest().withQueueName("MyTestQueue");

    CreateQueueResult res = bufferedSqs.createQueue(createRequest);

    SendMessageRequest request = new SendMessageRequest();
    String body = "test message_" + System.currentTimeMillis();
    request.setMessageBody( body );
    request.setQueueUrl(res.getQueueUrl());

    SendMessageResult sendResult = bufferedSqs.sendMessage(request);

    ReceiveMessageRequest receiveRq = new ReceiveMessageRequest()
    .withMaxNumberOfMessages(10)
    .withQueueUrl(queueUrl);
    ReceiveMessageResult rx = bufferedSqs.receiveMessage(receiveRq);

    List<Message> messages = rx.getMessages();
    for (Message message : messages) {
    System.out.println(" Message");
    System.out.println(" MessageId: " + message.getMessageId());
    System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
    System.out.println(" MD5OfBody: " + message.getMD5OfBody());
    System.out.println(" Body: " + message.getBody());
    for (Entry<String,String> entry : message.getAttributes().entrySet()) {
    System.out.println(" Attribute");
    System.out.println(" Name: " + entry.getKey());
    System.out.println(" Value: " + entry.getValue());
    }
    }

但我仍然无法检索所有消息.

任何的想法?

AWS论坛对我的帖子保持沉默.

解决方法

从SQS队列接收消息时,需要重复调??用sqs:ReceiveMessage.

在每次调用sqs:ReceiveMessage时,您将从队列中获得0个或更多消息,您需要迭代这些消息.对于每条消息,您还需要调用sqs:DeleteMessage,以便在处理完每条消息后从队列中删除消息.

在上面的“长轮询”示例周围添加一个循环以接收所有消息.

for (;;) {
    ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
    List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
    for (Message message : messages) {
        System.out.println(" Message");
        System.out.println(" MessageId: " + message.getMessageId());
        System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
        System.out.println(" MD5OfBody: " + message.getMD5OfBody());
        System.out.println(" Body: " + message.getBody());
        for (Entry<String,String> entry : message.getAttributes().entrySet()) {
            System.out.println(" Attribute");
            System.out.println(" Name: " + entry.getKey());
            System.out.println(" Value: " + entry.getValue());
        }
    }
    System.out.println();
}

另请注意,您可能会多次收到相同的消息.因此,允许您的工作“重新处理”相同的消息,或检测重复的消息.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读