java – 无法让ActiveMQ重新发送我的消息
发布时间:2020-12-15 00:41:54 所属栏目:Java 来源:网络整理
导读:我有一个用 Java编写的单线程ActiveMQ使用者.我所要做的就是从队列中接收()一个消息,尝试将其发送到Web服务,如果成功则确认()它.如果Web服务调用失败,我希望消息保留在队列中并在超时后重新发送. 它或多或少都在工作,除了重发部分:每次重新启动我的消费者时
我有一个用
Java编写的单线程ActiveMQ使用者.我所要做的就是从队列中接收()一个消息,尝试将其发送到Web服务,如果成功则确认()它.如果Web服务调用失败,我希望消息保留在队列中并在超时后重新发送.
它或多或少都在工作,除了重发部分:每次重新启动我的消费者时,它会为每个仍然在队列中的消息收到一条消息,但是在发送它们之后,消息永远不会被重新发送. 我的代码看起来像: public boolean init() throws JMSException,FileNotFoundException,IOException { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url); RedeliveryPolicy policy = new RedeliveryPolicy(); policy.setInitialRedeliveryDelay(500); policy.setBackOffMultiplier(2); policy.setUseExponentialBackOff(true); connectionFactory.setRedeliveryPolicy(policy); connectionFactory.setUseRetroactiveConsumer(true); // ???? Connection connection = connectionFactory.createConnection(); connection.setExceptionListener(this); connection.start(); session = connection.createSession(transacted,ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); destination = session.createQueue(subject); //??? consumer = session.createConsumer(destination); //consumer.setMessageListener(this); // message listener had same behaviour } private void process() { while(true) { System.out.println("Waiting..."); try { Message message = consumer.receive(); onMessage(message); } catch (JMSException e) { e.printStackTrace(); } try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } @Override public void onMessage(Message message) { System.out.println("onMessage"); messagesReceived++; if (message instanceof TextMessage) { try { TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText(); if(!client.sendMessage(msg)) { System.out.println("Webservice call failed. Keeping message"); //message. } else { message.acknowledge(); } if (transacted) { if ((messagesReceived % batch) == 0) { System.out.println("Commiting transaction for last " + batch + " messages; messages so far = " + messagesReceived); session.commit(); } } } catch (JMSException e) { e.printStackTrace(); } } } 我目前没有使用交易(也许我应该这样做?). 我确定我错过了一些简单的东西,很快就会拍打我的额头,但我似乎无法弄清楚这是怎么回事.谢谢! 编辑:我自己也不能回答这个问题: 好的,经过一些实验,事实证明交易是实现这一目标的唯一方法.这是新代码: public boolean init() throws JMSException,url); RedeliveryPolicy policy = new RedeliveryPolicy(); policy.setInitialRedeliveryDelay(1000L); policy.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES); connectionFactory.setRedeliveryPolicy(policy); connectionFactory.setUseRetroactiveConsumer(true); Connection connection = connectionFactory.createConnection(); connection.setExceptionListener(this); connection.start(); session = connection.createSession(transacted,ActiveMQSession.CLIENT_ACKNOWLEDGE); destination = session.createQueue(subject); consumer = session.createConsumer(destination); } @Override public void onMessage(Message message) { System.out.println("onMessage"); messagesReceived++; if (message instanceof TextMessage) { try { TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText(); if(client.sendMessage(msg)) { if(transacted) { System.out.println("Call succeeded - committing message"); session.commit(); } //message.acknowledge(); } else { if(transacted) { System.out.println("Webservice call failed. Rolling back message"); session.rollback(); } } } catch (JMSException e) { e.printStackTrace(); } } } 现在,重新传送策略中指定的消息每1000毫秒重新发送一次. 希望这有助于其他人! (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
相关内容
推荐文章
站长推荐
热点阅读