加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

MQ发布预定心得

发布时间:2020-12-17 01:02:45 所属栏目:安全 来源:网络整理
导读:发布预定模式在MQ中提供了两种实现方式,在我原来的测试中每个预订者都是非持久的,也就是说只有当预订者与MQ保持连接时才能获得消息,而当连接断开时预订者不会接受到消息。如果以线程的方式来实现时刻保持连接的话,是可以的,但是现在采用的技术是以WebSe

发布预定模式在MQ中提供了两种实现方式,在我原来的测试中每个预订者都是非持久的,也就是说只有当预订者与MQ保持连接时才能获得消息,而当连接断开时预订者不会接受到消息。如果以线程的方式来实现时刻保持连接的话,是可以的,但是现在采用的技术是以WebService服务来提供,对方发出一个请求,我返回一个请求的形式的话,使用线程时刻监听的这种方式能否可行是一个未知的问题。因此在查阅资料后,得知JMS的实现中有持久订阅这个模型后,问题有了初步的解决方向。

持久订阅的意思如其名,也就是说在预订者与MQ断开连接时,所有发布到预定主题的消息并不会丢失,当预定者与MQ再次连接后,可以接收所有断开连接时的消息。具体实现机制是用一个唯一的名称来标示预订者,在预订者断开连接后,MQ仍然会把所有发布的消息放到以这个唯一的名称来标示的地方存储,在预订者想要再次连接时,只要给出这个唯一的名称,就可以恢复原来的连接并取到所有在连接断开时发布的消息。具体实现代码如下:

?????? public synchronized void openDurable() {

????????????? try{??????????????

???????????????????? factoryFactory =JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);

???????????????????? connectionFactory =factoryFactory.createConnectionFactory();

?????????????

???????????????????? connectionFactory.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE,WMQConstants.WMQ_CM_BINDINGS);

???????????????????? connectionFactory.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER,MQ_QUEUEMANAGER_NAME);

????????????????????

???????????????????? connection =connectionFactory.createConnection();

????????????????????

???????????????????? connection.setClientID(clientID);

???????????????????? boolean transacted=false;

???????????????????? session =connection.createSession(transacted,Session.AUTO_ACKNOWLEDGE);

?

???????????????????? String subject = topicName;

???????????????????? topic =session.createTopic(subject);

?

???????????????????? consumer =session.createDurableSubscriber(topic,subName);

???????????????????? connection.start();???????????????

???????????????????? isOpen = true;??????????????

????????????? } catch (JMSException e) {

???????????????????? e.printStackTrace();

????????????? }

?????? }

注意其中颜色标注的两部分,第一句话connection.setClientID(clientID);给连接一个clientID的名称,此内容是持久预定唯一标示的一部分。第二句话consumer = session.createDurableSubscriber(topic,subName);使用Session生成了一个持久预定者,此方法由两个参数,第一个参数标示预订者对应的主题,第二个参数标示预订者的名称,此名称与前面的连接名称clientID组合就成了持久预定的唯一标示。在连接断开后,用户只要再次连接,并配置同样的clientID与subName,就可以接受到所有连接断开期间的消息了。

在MQ中,持久预定的实现形式是采用了一个动态队列,每当用户新申请一个持久预定时,MQ会在队列管理中动态新增一个本地队列,此队列是系统维护的,有一段长的随机数字代码标示它。在一般情况下看不到它,只有选择“看到系统对象时”才可以看到。当持久预定者与MQ断开连接时,所有发布到预订者主题的消息都会放到队列中存储,当预订者再次连接时,就可以取得在队列中存储的消息了。

持久预定的实现不仅需要存储消息,更要在系统中维护所有持久预定对象以及与之对应的动态队列,因此必须有一种机制去管理它,不然会对系统的资源造成损失与消耗。在JMS规范中,session有一个函数unsubscribe(subName),此函数的作用就是在持久订阅不再需要时,可以删除它以保证系统资源的不损失。具体实现代码如下:

?????? publicsynchronized void unsubscribeSubscriber() {

????????????? try{

???????????????????? if(isOpen == false)

???????????????????? {

??????????????????????????? factoryFactory= JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);

??????????????????????????? connectionFactory= factoryFactory.createConnectionFactory();

????????????????????

??????????????????????????? connectionFactory.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE,WMQConstants.WMQ_CM_BINDINGS);

??????????????????????????? connectionFactory.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER,MQ_QUEUEMANAGER_NAME);

???????????????????????????

??????????????????????????? connection= connectionFactory.createConnection();

???????????????????????????

??????????????????????????? connection.setClientID(clientID);

??????????????????????????? connection.start();?

???????????????????????????

??????????????????????????? booleantransacted=false;

??????????????????????????? session= connection.createSession(transacted,Session.AUTO_ACKNOWLEDGE);

?

??????????????????????????? session.unsubscribe(subName);

?????????????????????????????????????????

??????????????????????????? isOpen= true;

???????????????????????????

??????????????????????????? //关闭连接

??????????????????????????? destination= null;

?

??????????????????????????? session.close();

??????????????????????????? session= null;

???????????????????????????

??????????????????????????? connection.close();

??????????????????????????? connection= null;

?

??????????????????????????? connectionFactory= null;

???????????????????? ?????? factoryFactory = null;

??????????????????????????? isOpen= false;????????????????????

???????????????????? }???????????

????????????? }catch (JMSException e) {

???????????????????? e.printStackTrace();

????????????? }

?????? }

?

?????? 在我们的WebService服务中,提供了三种接口,生成持久预定,获得持久消息,注销持久预定,在用户需要调用发布预定服务时,不同的用户机,不同的访问程序需要生成持久预定,并维护预定名称等信息,在使用完预定服务后,必须注销持久预定以保护系统的资源。如果他们没有做的话,MQ中会积累越来越多的临时队列以及队列中存储的消息,最终系统会因不堪重负而宕掉。

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读