java – Kafka – 使用高级消费者的延迟队列实现
想使用高级消费者api来实施延迟消费者
大意: >通过键生成消息(每个msg包含创建时间戳),这样可以确保每个分区已按生成时间排序消息. while (it.hasNext()) { val msg = it.next().message() //checks timestamp in msg to see delay period exceeded while (!delayedPeriodPassed(msg)) { waitSomeTime() //Thread.sleep or something.... } //certain that the msg was delayed and can now be handled Try { process(msg) } //the msg process will never fail the consumer consumer.commitOffsets //commit each msg } 对此实施有一些担忧: >提交每个偏移可能会减慢ZK 谢谢! 解决方法
有一种方法可以使用不同的主题来推送所有要延迟的消息.如果所有延迟的消息都应该在相同的时间延迟之后处理,这将是相当简单的:
while(it.hasNext()) { val message = it.next().message() if(shouldBeDelayed(message)) { val delay = 24 hours val delayTo = getCurrentTime() + delay putMessageOnDelayedQueue(message,delay,delayTo) } else { process(message) } consumer.commitOffset() } 所有常规邮件现在都将尽快处理,而那些需要延迟的邮件将被放在另一个主题上. 好的是,我们知道延迟主题的头部的消息是应该首先处理的消息,因为它的delayTo值将是最小的.因此,我们可以设置另一个读取头信息的消费者,检查时间戳是否在过去,如果处理消息并提交偏移量.如果不是,它不会提交偏移量,而是直到那时睡觉: while(it.hasNext()) { val delayedMessage = it.peek().message() if(delayedMessage.delayTo < getCurrentTime()) { val readMessage = it.next().message process(readMessage.originalMessage) consumer.commitOffset() } else { delayProcessingUntil(delayedMessage.delayTo) } } 如果有不同的延迟时间,您可以对延迟进行分区(例如24小时,12小时,6小时).如果延迟时间比这更复杂,那么延迟时间就会更加动态.您可以通过引入两个延迟主题来解决这个问题.读取所有消息关闭延迟主题A并处理所有延迟过去的消息.其他的你只是找到一个最接近的delayTo,然后把它们放在主题B上.睡觉,直到最近的一个应该被处理,并且完全相反,即处理来自主题B的消息,并将一次不应该被处理回主题A. 回答您的具体问题(有些已在您的问题的评论中解决)
您可以考虑切换到Kafka中存储偏移量(可从0.8.2获取的功能,检查消费者配置中的offsets.storage属性)
我相信如果它不能与偏移量存储进行通信,例如.正如你所说,使用幂等信息解决了这个问题.
这不会是上述概述的解决方案的问题,除非消息本身的处理超过会话超时.
再次用上面的方法,你不需要设置一个很长的会话超时.
总是有) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
- java – 我可以调用位于另一个jar文件中的jar库中的类文件吗
- java – 来自多个源的Spring MVC复杂模型人口
- java – 用于云服务“沙盒”的SecurityManager
- java 下执行mysql 批量插入的几种方法及用时
- Java,MultiPart:确定上传的多部分是否是一种图像
- java – 一个按钮的Android背景文本图标
- java – 类Cast异常:com.sun.org.apache.xerces.internal
- 使用Java将SOAP消息格式转换为Socket消息格式转换,反之亦然
- java – 如何在JTable中动态设置RowHeight
- java – 在Windows上安装IBM JRE(非IBM机器)