java-ee – HornetQ:如何重用XAConnection和XASession
我在尝试在JBoss应用程序中的多个worker上重用XAConnection和XASession时遇到了一些问题.我已经设法将问题简化为一种方法.它应该能够使用相同的连接和会话生成和消费者消息.目前,我的应用程序有很多队列和工作人员,每个工作人员当前正在启动并启动每个自己的连接和会话,而不是共享它.应该不可能吗?
这是我的代码示例: import org.apache.log4j.Logger; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.ejb.Singleton; import javax.ejb.Startup; import javax.jms.*; import javax.jms.Queue; import javax.naming.InitialContext; @Singleton @Startup public class QueueTest { private Logger logger = Logger.getLogger(QueueTest.class); @PostConstruct public void startup() { try { String queue = "queue/Queue1"; String message = "test"; //setting up connection InitialContext iniCtx = new InitialContext(); XAConnectionFactory qcf = (XAConnectionFactory) iniCtx.lookup("java:/JmsXA"); XAConnection connection = qcf.createXAConnection(); connection.start(); logger.debug("creating connection at " + new java.util.Date()); //setting up session XASession session = connection.createXASession(); logger.debug("creating session at " + new java.util.Date()); //find the queue Object queueObj = iniCtx.lookup(queue); Queue jmsQueue = (javax.jms.Queue)queueObj; //adding message to queue javax.jms.MessageProducer producer = session.createProducer(jmsQueue); javax.jms.TextMessage textMessage = session.createTextMessage(message); producer.send(textMessage); producer.close(); logger.debug("Message added to queue"); //receiving message from queue javax.jms.MessageConsumer consumer = session.createConsumer(jmsQueue); javax.jms.TextMessage messageReceived = (javax.jms.TextMessage)consumer.receive(5000); if (messageReceived==null) throw new Exception("No message reveived"); logger.debug("Got message:"+messageReceived.getText()); consumer.close(); } catch(Exception e) { logger.debug("Error: " + e.getMessage(),e); } } @PreDestroy public void shutdown() { } } 它导致此输出: 11:47:17,905 DEBUG [QueueTest] (MSC service thread 1-8) creating connection at Thu Sep 05 11:47:17 CEST 2013 11:47:18,041 DEBUG [QueueTest] (MSC service thread 1-8) creating session at Thu Sep 05 11:47:18 CEST 2013 11:47:18,065 DEBUG [QueueTest] (MSC service thread 1-8) Message added to queue 11:47:23,081 DEBUG [QueueTest] (MSC service thread 1-8) Error: No message reveived 如您所见,消费者未收到任何消息.为什么? 编辑1: package dk.energimidt.uapi.zigbee.services; import org.apache.log4j.Logger; import javax.ejb.Stateless; import javax.ejb.TransactionAttribute; import javax.ejb.TransactionAttributeType; import javax.jms.Queue; import javax.jms.XAConnection; import javax.jms.XAConnectionFactory; import javax.jms.XASession; import javax.naming.InitialContext; @TransactionAttribute(TransactionAttributeType.REQUIRED) @Stateless public class QueueTestWorkerBean implements QueueTestWorker { private Logger logger = Logger.getLogger(QueueTestWorkerBean.class); public void run() { try { String queue = "queue/Queue1"; String message = "test"; //setting up connection InitialContext iniCtx = new InitialContext(); XAConnectionFactory qcf = (XAConnectionFactory) iniCtx.lookup("java:/JmsXA"); XAConnection connection = qcf.createXAConnection(); connection.start(); logger.debug("creating connection at " + new java.util.Date()); //setting up session XASession session = connection.createXASession(); logger.debug("creating session at " + new java.util.Date()); //find the queue Object queueObj = iniCtx.lookup(queue); Queue jmsQueue = (javax.jms.Queue)queueObj; //adding message to queue javax.jms.MessageProducer producer = session.createProducer(jmsQueue); javax.jms.TextMessage textMessage = session.createTextMessage(message); producer.send(textMessage); producer.close(); session.commit(); logger.debug("Message added to queue"); //receiving message from queue javax.jms.MessageConsumer consumer = session.createConsumer(jmsQueue); javax.jms.TextMessage messageReceived = (javax.jms.TextMessage)consumer.receive(5000); if (messageReceived==null) throw new Exception("No message reveived"); logger.debug("Got message:"+messageReceived.getText()); consumer.close(); connection.close(); } catch(Exception e) { logger.debug("Error: " + e.getMessage(),e); } } } 现在我在Session.Commit()上得到一个例外: 10:46:03,697 DEBUG [QueueTestWorkerBean] (MSC service thread 1-14) creating connection at Tue Sep 17 10:46:03 CEST 2013 10:46:04,343 DEBUG [QueueTestWorkerBean] (MSC service thread 1-14) creating session at Tue Sep 17 10:46:04 CEST 2013 10:46:04,355 DEBUG [QueueTestWorkerBean] (MSC service thread 1-14) Error: XA connection: javax.jms.TransactionInProgressException: XA connection at org.hornetq.ra.HornetQRASession.commit(HornetQRASession.java:386) at QueueTestWorkerBean.run(QueueTestWorkerBean.java:45) [library-1.0.0.jar:] 解决方法
我在那里看到了一些(实际上是2个)混音:
i – 您正在使用XA会话,但您没有声明任何事务边界…通常在会话Bean和MDB上完成.我不确定你能否在这个无国籍人身上做到这一点. 如果您不使用任何声明性事务,则必须手动登记XID. ii – jmsXA是默认的资源适配器连接工厂.它上面有一个游泳池.因此,无论何时创建新会话,您都要从池中取出.当你关闭它时,你将它返回到游泳池. 您可以使用常规连接工厂.就在InVMConnectionFactory中(或者你在独立版中定义的任何东西,在PooledConnectionFactories之外,假设你在JBoss上……然后只使用常规JMS. 即使是常规的连接工厂也可以与XA一起使用,但在这种情况下,您需要确保使用事务管理器的api直接登记它. 如果您使用常规连接工厂,则可以根据需要随时保持连接. 请告诉我它是怎么回事,我会帮助你.我知道你开始了赏金..但我会免费回复:) 我在EJB教程中找不到关于将事务与Singleton一起使用的任何示例. 我建议您通过Statless或Stateful Session Bean使用它,然后将@TransactionAttribute应用于Bean. Java EE 6教程有一些很好的信息: http://docs.oracle.com/javaee/6/tutorial/doc/bncij.html 请注意,在您提交之前,邮件将不可用.因此,如果您在交易中发送消息,您将无法在同一交易中收到消息. 在您的edit1示例中,您正在发送消息并在同一事务上使用它.这将不起作用,因为您首先需要提交生产方法,然后才能使用它.在这种情况下你需要两个事务,因此Edit1被破坏了. 另外:确保在最后关闭Connection.由于您使用的是JmsXA(或池化连接工厂),因此您将通过Application Server自动完成轮询. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |