Java中消息队列任务的平滑关闭详解
前言 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ 消息队列应用场景 消息队列在实际应用中常用的使用场景:异步处理,应用解耦,流量削锋和消息通讯四个场景。 本文主要给大家介绍的是关于Java中消息队列任务平滑关闭的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧。 1.问题背景 对于消息队列任务的监听,我们一般使用Java写一个独立的程序,在Linux服务器上运行。当订阅者程序启动后,会通过消息队列客户端接收消息,放入线程池中并发的处理。 那么问题来了,当我们修改程序后,需要重新启动时,如何保证消息都能够被处理呢? 一些开源的消息队列中间件,会提供ACK机制(消息确认机制),当订阅者处理完消息后,会通知服务端删除对应消息,如果订阅者出现异常,服务端未收到确认消费,则会重试发送。 那如果消息队列中间件没有提供ACK机制,或者为了高吞度量的考虑关闭了ACK功能,如何最大可能保证消息都能够被处理呢? 正常来说,订阅者程序关闭后,消息会在队列中堆积,等待订阅者下次订阅消费,所以未接收的消息是不会丢失的。可能出现的问题就是在关闭的一瞬间,已经从消息队列中取出,但还没有被处理的消息。 因此我们需要一套平滑关闭的机制,保证在重启的时候,已接收的消息可以得到正常处理。 2.问题分析 平滑关闭的思路如下:
关闭消息订阅:消息队列的客户端都会提供关闭连接的方法,具体可以自行查看API。 关闭线程池:Java的ThreadPoolExecutor线程池提供 那么问题又来了,我们如何通知到程序,需要执行关闭操作呢? 在Linux中,进程的关闭是通过信号传递的,我们可以用kill -9 pid关闭进程,除了-9之外,我们可以通过 kill -l,查看kill命令的其它信号量。 这里提供两种关闭方法:
补充说明:addShutdownHook方法和handle方法中如果再调用 伪代码分别如下 Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { //关闭订阅者 //关闭线程池 //退出 } }); //注册linux kill信号量 kill -12 Signal sig = new Signal("USR2"); Signal.handle(sig,new SignalHandler() { @Override public void handle(Signal signal) { //关闭订阅者 //关闭线程池 //退出 } }); 模拟Demo 下面通过一个demo模拟相关逻辑操作 首先模拟一个生产者,每秒生产5个消息 然后模拟一个订阅者,收到消息后,放入线程池进行处理,线程池固定4个线程,每个线程处理时间1秒,这样线程池每秒会积压1个消息。 package com.lujianing.demo; import sun.misc.Signal; import sun.misc.SignalHandler; import java.util.concurrent.*; /** * @author lujianing01@58.com * @Description: * @date 2016/11/14 */ public class MsgClient { //模拟消费线程池 同时4个线程处理 private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor) Executors.newFixedThreadPool(4); //模拟消息生产任务 private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor(); //用于判断是否关闭订阅 private static volatile boolean isClose = false; public static void main(String[] args) throws InterruptedException { //注册钩子方法 Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { close(); } }); BlockingQueue <String> queue = new ArrayBlockingQueue<String>(100); producer(queue); consumer(queue); } //模拟消息队列生产者 private static void producer(final BlockingQueue queue){ //每200毫秒向队列中放入一个消息 SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() { public void run() { queue.offer(""); } },0L,200L,TimeUnit.MILLISECONDS); } //模拟消息队列消费者 生产者每秒生产5个 消费者4个线程消费1个1秒 每秒积压1个 private static void consumer(final BlockingQueue queue) throws InterruptedException { while (!isClose){ getPoolBacklogSize(); //从队列中拿到消息 final String msg = (String)queue.take(); //放入线程池处理 if(!THREAD_POOL.isShutdown()) { THREAD_POOL.execute(new Runnable() { public void run() { try { //System.out.println(msg); TimeUnit.MILLISECONDS.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } } //查看线程池堆积消息个数 private static long getPoolBacklogSize(){ long backlog = THREAD_POOL.getTaskCount()- THREAD_POOL.getCompletedTaskCount(); System.out.println(String.format("[%s]THREAD_POOL backlog:%s",System.currentTimeMillis(),backlog)); return backlog; } private static void close(){ System.out.println("收到kill消息,执行关闭操作"); //关闭订阅消费 isClose = true; //关闭线程池,等待线程池积压消息处理 THREAD_POOL.shutdown(); //判断线程池是否关闭 while (!THREAD_POOL.isTerminated()) { try { //每200毫秒 判断线程池积压数量 getPoolBacklogSize(); TimeUnit.MILLISECONDS.sleep(200L); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("订阅者关闭,线程池处理完毕"); } static { String osName = System.getProperty("os.name").toLowerCase(); if(osName != null && osName.indexOf("window") == -1) { //注册linux kill信号量 kill -12 Signal sig = new Signal("USR2"); Signal.handle(sig,new SignalHandler() { @Override public void handle(Signal signal) { close(); } }); } } } 当我们在服务上运行时,通过控制台可以看到相关的输出信息,demo中输出了线程池的积压消息个数 java -cp /home/work/lujianing/msg-queue-client/* com.lujianing.demo.MsgClient 另打开一个终端,通过ps命令查看进程号,或者通过nohup启动Java进程拿到进程id ps -fe|grep MsgClient 当我们执行 3.总结 其实不单单消息队列任务,在常见的RPC服务中也会见到类似的功能,比如58的SCF,在源码中,也会分别注册了USR2信号量和addShutdownHook钩子方法。 在重启脚本中,首先会发送 好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对编程小技巧的支持。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
- Java中启动线程start和run的两种方法
- java – 如何将常量重定向到Eclipse中的枚举?
- java – 为什么System.nanoTime()需要4400纳秒
- 如何使用Reflection使用泛型实例化java.util.ArrayList
- “消息传递/共享状态”困境(并发和分发)是否采取了“Holywa
- java – 在ElasticSearch中获取SearchResponse的结果
- JVM虚拟机查找类文件的顺序方法
- java.lang.IllegalArgumentException:由于无效的密钥而无法
- java – 在Spring注释中使用静态变量
- 是否有一个开源java库与图像扫描仪连接?