php – 从RabbitMq消费不确认消息
我创建了一个简单的发布者和消费者,使用basic.consume在队列上预订.
我的消费者承认在作业运行时没有异常的消息.每当我遇到一个例外情况,我都不会回信,早点回来.只有确认的消息从队列中消失,这样才能正常工作. 我该如何处理这个用例? 安装代码 $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName('my-exchange'); $exchange->setType('fanout'); $exchange->declare(); $queue = new AMQPQueue($channel); $queue->setName('my-queue'); $queue->declare(); $queue->bind('my-exchange'); 消费者代码 $queue->consume(array($this,'callback')); public function callback(AMQPEnvelope $msg) { try { //Do some business logic } catch (Exception $ex) { //Log exception return; } return $queue->ack($msg->getDeliveryTag()); } 生产者代码 $exchange->publish('message');
如果消息未被确认并且应用程序发生故障,则将自动重新发送,并且将信封上的重新传递属性设置为true(除非使用no-ack = true标志消费).
UPD: 您必须在catch块中使用重新发送标志来消息 try { //Do some business logic } catch (Exception $ex) { //Log exception return $queue->nack($msg->getDeliveryTag(),AMQP_REQUEUE); } 当RabbitMQ和AMQP协议中没有实现重新传递计数时,请注意无限制的Nacked消息. 如果你不想混淆这样的消息,只是想添加一些延迟,你可能想在nack方法调用之前添加一些sleep()或usleep(),但这根本不是一个好主意. 有多种技术来处理循环重传问题: 依靠Dead Letter Exchanges >职业:可靠,标准,清晰 2.使用per message or per queue TTL >职业:容易实施,也是标准,明确 示例(注意,对于队列ttl,我们只传递数字和消息ttl – 任何将是数字字符串): 2.1每消息ttl: $queue = new AMQPQueue($channel); $queue->setName('my-queue'); $queue->declareQueue(); $queue->bind('my-exchange'); $exchange->publish( 'message at ' . microtime(true),null,AMQP_NOPARAM,array( 'expiration' => '1000' ) ); 2.2.每队列ttl: $queue = new AMQPQueue($channel); $queue->setName('my-queue'); $queue->setArgument('x-message-ttl',1000); $queue->declareQueue(); $queue->bind('my-exchange'); $exchange->publish('message at ' . microtime(true)); 3.在消息正文或标题中保留重新传送者计数或者重新设置重定向器号码(也称为IP堆栈中的跳数限制或ttl) >专业人士:在应用程序级别上给予您对消息生命周期的额外控制 码: $queue = new AMQPQueue($channel); $queue->setName('my-queue'); $queue->declareQueue(); $queue->bind('my-exchange'); $exchange->publish( 'message at ' . microtime(true),array( 'headers' => array( 'ttl' => 100 ) ) ); $queue->consume( function (AMQPEnvelope $msg,AMQPQueue $queue) use ($exchange) { $headers = $msg->getHeaders(); echo $msg->isRedelivery() ? 'redelivered' : 'origin',' '; echo $msg->getDeliveryTag(),' '; echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl',' '; echo $msg->getBody(),PHP_EOL; try { //Do some business logic throw new Exception('business logic failed'); } catch (Exception $ex) { //Log exception if (isset($headers['ttl'])) { // with ttl logic if ($headers['ttl'] > 0) { $headers['ttl']--; $exchange->publish($msg->getBody(),$msg->getRoutingKey(),array('headers' => $headers)); } return $queue->ack($msg->getDeliveryTag()); } else { // without ttl logic return $queue->nack($msg->getDeliveryTag(),AMQP_REQUEUE); // or drop it without requeue } } return $queue->ack($msg->getDeliveryTag()); } ); 可能还有其他一些方法可以更好地控制消息重传流. 结论:没有银弹解决方案.你必须决定什么解决方案适合你的需要最好或找出其他的东西,但不要忘记在这里分享) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |