java – 我正确实现ActiveMQ吗?实现事务处理会话并重试
我正在尝试使用事务会话来支持回滚的JMS-ActiveMQ实现.
我是ActiveMQ的新手,我已经使用它的 Java库进行了第一次实现. 当我运行我的应用程序时,我看到消息已成功入队并出列.我还可以看到相应的DLQ是自动生成的.但是,我不确定我是否正确配置了redeliverypolicy.截至目前它已在生产者上配置,但有些examples将重新传递策略与监听器容器联系起来,所以我不能完全确定在我的情况下(如果有的话)是否会将有毒消息放在DLQ上.摘要中包含详细注释. 此外,到目前为止我遇到的所有示例都使用Spring.但是,我没有选择使用它需要重新布线整个项目(如果只涉及最小的开销,我会打开). 任何关于如何使用ActiveMQ api在Java中做到这一点的见解将不胜感激. 制片人 public void publishUpdate(final MessageBody payload) ??????????? throws JMSException { ??????? Session session = session(connection()); ??????? try { ??????????? Message message = message(session,payload); ??????????? LOGGER.info("About to put message on queue"); ??????????? producer(session).send(message); ??????????? // without session.commit()-- no messages get put on the queue. ??????????? session.commit();// messages seem to be enqueued now. ??????????? ??????? } catch ( BadRequestException e) { //to avoid badly formed requests? ??????????? LOGGER.info("Badly formed request. Not attempting retry!"); ??????????? return; ??????? } catch (JMSException jmsExcpetion) { ??????????? LOGGER.info("Caught JMSException will retry"); ??????????? session.rollback();// assume rollback is followed by a retry? ??????? } ??????? ??? } private MessageProducer producer(Session session) throws JMSException { return session.createProducer(destination()); } private Connection connection() throws JMSException { ActiveMQConnectionFactory connectionFactory= new ActiveMQConnectionFactory(); Connection connection = connectionFactory.createConnection(); connectionFactory.setRedeliveryPolicy(getRedeliveryPolicy());//redelivery policy with three retries and redelivery time of 1000ms return connection; } private Session session(Connection connection) throws JMSException { Session session = connection.createSession(true,Session.SESSION_TRANSACTED); connection.start(); return session; } 监听器: public class UpdateMessageListener implements MessageListener{ …. public void onMessage(Message message) { String json = null; try { //Does the listener need to do anything to deal with retry? json = ((TextMessage) message).getText(); MessageBody request = SerializeUtils.deserialize(json,MessageBody.class); processTransaction(request.getUpdateMessageBody(),headers);//perform some additional processing. } catch (Throwable e) { LOGGER.error("Error processing request: {}",json); } } } 消费者: private MessageConsumer consumer() throws JMSException { LOGGER.info("Creating consumer"); MessageConsumer consumer = session().createConsumer(destination()); consumer.setMessageListener(new UpdateMessageListener()); //wire listener to consumer return consumer; } private Session session() throws JMSException { Connection connection=connection(); Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);//create an auto-ack from the consumer side? Is this correct? connection.start(); return session; } 如果有必要,我也愿意提供更多代码. 解决方法
你的解决方案有一点缺陷.
根据JMS document,如果在onMessage函数中存在异常,则在Session.AUTO_ACKNOWLEDGE模式下,失败的执行消息将由消息队列(例如ActiveMQ)重新传递.但是这个流程被打破了,因为侦听器在onMessage函数中捕获了Throwable或Exception.数据流如下图所示: ACK_TYPE 如果要实现本地事务,则在消息处理程序执行失败时应该抛出异常.在pesudo代码中,异步重新传递消息使用者可能如下所示: Session session = connection.getSession(consumerId); sessionQueueBuffer.enqueue(message); Runnable runnable = new Ruannale(){ run(){ Consumer consumer = session.getConsumer(consumerId); Message md = sessionQueueBuffer.dequeue(); try{ consumer.messageListener.onMessage(md); ack(md);//send an STANDARD_ACK_TYPE,tell broker success }catch(Exception e){ redelivery();//send redelivery ack. DELIVERED_ACK_TYPE,require broker keep message and redeliver it. } } threadPool.execute(runnable); (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |