MQ发布预定心得
发布预定模式在MQ中提供了两种实现方式,在我原来的测试中每个预订者都是非持久的,也就是说只有当预订者与MQ保持连接时才能获得消息,而当连接断开时预订者不会接受到消息。如果以线程的方式来实现时刻保持连接的话,是可以的,但是现在采用的技术是以WebService服务来提供,对方发出一个请求,我返回一个请求的形式的话,使用线程时刻监听的这种方式能否可行是一个未知的问题。因此在查阅资料后,得知JMS的实现中有持久订阅这个模型后,问题有了初步的解决方向。 持久订阅的意思如其名,也就是说在预订者与MQ断开连接时,所有发布到预定主题的消息并不会丢失,当预定者与MQ再次连接后,可以接收所有断开连接时的消息。具体实现机制是用一个唯一的名称来标示预订者,在预订者断开连接后,MQ仍然会把所有发布的消息放到以这个唯一的名称来标示的地方存储,在预订者想要再次连接时,只要给出这个唯一的名称,就可以恢复原来的连接并取到所有在连接断开时发布的消息。具体实现代码如下: ?????? public synchronized void openDurable() { ????????????? try{?????????????? ???????????????????? factoryFactory =JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER); ???????????????????? connectionFactory =factoryFactory.createConnectionFactory(); ????????????? ???????????????????? connectionFactory.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE,WMQConstants.WMQ_CM_BINDINGS); ???????????????????? connectionFactory.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER,MQ_QUEUEMANAGER_NAME); ???????????????????? ???????????????????? connection =connectionFactory.createConnection(); ???????????????????? ???????????????????? connection.setClientID(clientID); ???????????????????? boolean transacted=false; ???????????????????? session =connection.createSession(transacted,Session.AUTO_ACKNOWLEDGE); ? ???????????????????? String subject = topicName; ???????????????????? topic =session.createTopic(subject); ? ???????????????????? consumer =session.createDurableSubscriber(topic,subName); ???????????????????? connection.start();??????????????? ???????????????????? isOpen = true;?????????????? ????????????? } catch (JMSException e) { ???????????????????? e.printStackTrace(); ????????????? } ?????? } 注意其中颜色标注的两部分,第一句话connection.setClientID(clientID);给连接一个clientID的名称,此内容是持久预定唯一标示的一部分。第二句话consumer = session.createDurableSubscriber(topic,subName);使用Session生成了一个持久预定者,此方法由两个参数,第一个参数标示预订者对应的主题,第二个参数标示预订者的名称,此名称与前面的连接名称clientID组合就成了持久预定的唯一标示。在连接断开后,用户只要再次连接,并配置同样的clientID与subName,就可以接受到所有连接断开期间的消息了。 在MQ中,持久预定的实现形式是采用了一个动态队列,每当用户新申请一个持久预定时,MQ会在队列管理中动态新增一个本地队列,此队列是系统维护的,有一段长的随机数字代码标示它。在一般情况下看不到它,只有选择“看到系统对象时”才可以看到。当持久预定者与MQ断开连接时,所有发布到预订者主题的消息都会放到队列中存储,当预订者再次连接时,就可以取得在队列中存储的消息了。 持久预定的实现不仅需要存储消息,更要在系统中维护所有持久预定对象以及与之对应的动态队列,因此必须有一种机制去管理它,不然会对系统的资源造成损失与消耗。在JMS规范中,session有一个函数unsubscribe(subName),此函数的作用就是在持久订阅不再需要时,可以删除它以保证系统资源的不损失。具体实现代码如下: ?????? publicsynchronized void unsubscribeSubscriber() { ????????????? try{ ???????????????????? if(isOpen == false) ???????????????????? { ??????????????????????????? factoryFactory= JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER); ??????????????????????????? connectionFactory= factoryFactory.createConnectionFactory(); ???????????????????? ??????????????????????????? connectionFactory.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE,WMQConstants.WMQ_CM_BINDINGS); ??????????????????????????? connectionFactory.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER,MQ_QUEUEMANAGER_NAME); ??????????????????????????? ??????????????????????????? connection= connectionFactory.createConnection(); ??????????????????????????? ??????????????????????????? connection.setClientID(clientID); ??????????????????????????? connection.start();? ??????????????????????????? ??????????????????????????? booleantransacted=false; ??????????????????????????? session= connection.createSession(transacted,Session.AUTO_ACKNOWLEDGE); ? ??????????????????????????? session.unsubscribe(subName); ????????????????????????????????????????? ??????????????????????????? isOpen= true; ??????????????????????????? ??????????????????????????? //关闭连接 ??????????????????????????? destination= null; ? ??????????????????????????? session.close(); ??????????????????????????? session= null; ??????????????????????????? ??????????????????????????? connection.close(); ??????????????????????????? connection= null; ? ??????????????????????????? connectionFactory= null; ???????????????????? ?????? factoryFactory = null; ??????????????????????????? isOpen= false;???????????????????? ???????????????????? }??????????? ????????????? }catch (JMSException e) { ???????????????????? e.printStackTrace(); ????????????? } ?????? } ? ?????? 在我们的WebService服务中,提供了三种接口,生成持久预定,获得持久消息,注销持久预定,在用户需要调用发布预定服务时,不同的用户机,不同的访问程序需要生成持久预定,并维护预定名称等信息,在使用完预定服务后,必须注销持久预定以保护系统的资源。如果他们没有做的话,MQ中会积累越来越多的临时队列以及队列中存储的消息,最终系统会因不堪重负而宕掉。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |