PHP 框架 Hyperf 实现处理超时未支付订单和延时队列
发布时间:2020-12-13 21:03:52 所属栏目:PHP教程 来源:网络整理
导读:延时队列 Delayproducer.Php Amqpbuilder.Php AmqpBuilder.php ?phpdeclare(strict_types = 1);namespace AppComponentsAmqp;use HyperfAmqpBuilderBuilder;use HyperfAmqpBuilderQueueBuilder;class AmqpBuilder extends QueueBuilder{ /** * @para
延时队列
AmqpBuilder.php <?php declare(strict_types = 1); namespace AppComponentsAmqp; use HyperfAmqpBuilderBuilder; use HyperfAmqpBuilderQueueBuilder; class AmqpBuilder extends QueueBuilder { /** * @param array|PhpAmqpLibWireAMQPTable $arguments * * @return HyperfAmqpBuilderBuilder */ public function setArguments($arguments) : Builder { $this->arguments = array_merge($this->arguments,$arguments); return $this; } /** * 设置延时队列相关参数 * * @param string $queueName * @param int $xMessageTtl * @param string $xDeadLetterExchange * @param string $xDeadLetterRoutingKey * * @return $this */ public function setDelayedQueue(string $queueName,int $xMessageTtl,string $xDeadLetterExchange,string $xDeadLetterRoutingKey) : self { $this->setArguments([ 'x-message-ttl' => ['I',$xMessageTtl * 1000],// 毫秒 'x-dead-letter-exchange' => ['S',$xDeadLetterExchange],'x-dead-letter-routing-key' => ['S',$xDeadLetterRoutingKey],]); $this->setQueue($queueName); return $this; } }
DelayProducer.php <?php declare(strict_types = 1); namespace AppComponentsAmqp; use HyperfAmqpAnnotationProducer; use HyperfAmqpBuilder; use HyperfAmqpMessageProducerMessageInterface; use HyperfDiAnnotationAnnotationCollector; use PhpAmqpLibMessageAMQPMessage; use Throwable; class DelayProducer extends Builder { /** * @param ProducerMessageInterface $producerMessage * @param AmqpBuilder $queueBuilder * @param bool $confirm * @param int $timeout * * @return bool * @throws Throwable */ public function produce(ProducerMessageInterface $producerMessage,AmqpBuilder $queueBuilder,bool $confirm = false,int $timeout = 5) : bool { return retry(1,function () use ($producerMessage,$queueBuilder,$confirm,$timeout) { return $this->produceMessage($producerMessage,$timeout); }); } /** * @param ProducerMessageInterface $producerMessage * @param AmqpBuilder $queueBuilder * @param bool $confirm * @param int $timeout * * @return bool * @throws Throwable */ private function produceMessage(ProducerMessageInterface $producerMessage,int $timeout = 5) : bool { $result = false; $this->injectMessageProperty($producerMessage); $message = new AMQPMessage($producerMessage->payload(),$producerMessage->getProperties()); $pool = $this->getConnectionPool($producerMessage->getPoolName()); /** @var HyperfAmqpConnection $connection */ $connection = $pool->get(); if ($confirm) { $channel = $connection->getConfirmChannel(); } else { $channel = $connection->getChannel(); } $channel->set_ack_handler(function () use (&$result) { $result = true; }); try { // 处理延时队列 $exchangeBuilder = $producerMessage->getExchangeBuilder(); // 队列定义 $channel->queue_declare($queueBuilder->getQueue(),$queueBuilder->isPassive(),$queueBuilder->isDurable(),$queueBuilder->isExclusive(),$queueBuilder->isAutoDelete(),$queueBuilder->isNowait(),$queueBuilder->getArguments(),$queueBuilder->getTicket()); // 路由定义 $channel->exchange_declare($exchangeBuilder->getExchange(),$exchangeBuilder->getType(),$exchangeBuilder->isPassive(),$exchangeBuilder->isDurable(),$exchangeBuilder->isAutoDelete(),$exchangeBuilder->isInternal(),$exchangeBuilder->isNowait(),$exchangeBuilder->getArguments(),$exchangeBuilder->getTicket()); // 队列绑定 $channel->queue_bind($queueBuilder->getQueue(),$producerMessage->getExchange(),$producerMessage->getRoutingKey()); // 消息发送 $channel->basic_publish($message,$producerMessage->getRoutingKey()); $channel->wait_for_pending_acks_returns($timeout); } catch (Throwable $exception) { // Reconnect the connection before release. $connection->reconnect(); throw $exception; } finally { $connection->release(); } return $confirm ? $result : true; } /** * @param ProducerMessageInterface $producerMessage */ private function injectMessageProperty(ProducerMessageInterface $producerMessage) : void { if (class_exists(AnnotationCollector::class)) { /** @var HyperfAmqpAnnotationProducer $annotation */ $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage),Producer::class); if ($annotation) { $annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey); $annotation->exchange && $producerMessage->setExchange($annotation->exchange); } } } }
处理超时订单
Orderqueueproducer.php <?php declare(strict_types = 1); namespace AppAmqpProducer; use HyperfAmqpAnnotationProducer; use HyperfAmqpBuilderExchangeBuilder; use HyperfAmqpMessageProducerMessage; /** * @Producer(exchange="order_exchange",routingKey="order_exchange") */ class OrderQueueProducer extends ProducerMessage { public function __construct($data) { $this->payload = $data; } public function getExchangeBuilder() : ExchangeBuilder { return parent::getExchangeBuilder(); // TODO: Change the autogenerated stub } }
Orderqueueconsumer.php
<?php declare(strict_types = 1); namespace AppAmqpConsumer; use AppServiceCityTransportOrderService; use HyperfAmqpResult; use HyperfAmqpAnnotationConsumer; use HyperfAmqpMessageConsumerMessage; /** * @Consumer(exchange="delay_exchange",routingKey="delay_route",queue="delay_queue",name ="OrderQueueConsumer",nums=1) */ class OrderQueueConsumer extends ConsumerMessage { public function consume($data) : string { ##业务处理 } public function isEnable() : bool { return true; } }
Demo $builder = new AmqpBuilder(); $builder->setDelayedQueue('order_exchange',1,'delay_exchange','delay_route'); $que = ApplicationContext::getContainer()->get(DelayProducer::class); var_dump($que->produce(new OrderQueueProducer(['order_sn' => (string)mt_rand(10000,90000)]),$builder))
更多学习内容请访问: 腾讯T3-T4标准精品PHP架构师教程目录大全,只要你看完保证薪资上升一个台阶(持续更新) ? (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |