PHP 框架 Hyperf 实现处理超时未支付订单和延时队列
发布时间:2020-12-13 21:03:52  所属栏目:PHP教程  来源:网络整理 
            导读:延时队列 Delayproducer.Php Amqpbuilder.Php AmqpBuilder.php ?phpdeclare(strict_types = 1);namespace AppComponentsAmqp;use HyperfAmqpBuilderBuilder;use HyperfAmqpBuilderQueueBuilder;class AmqpBuilder extends QueueBuilder{ /** * @para
                
                
                
            | 
                         延时队列 
 AmqpBuilder.php <?php
declare(strict_types = 1);
namespace AppComponentsAmqp;
use HyperfAmqpBuilderBuilder;
use HyperfAmqpBuilderQueueBuilder;
class AmqpBuilder extends QueueBuilder
{
    /**
     * @param array|PhpAmqpLibWireAMQPTable $arguments
     *
     * @return HyperfAmqpBuilderBuilder
     */
    public function setArguments($arguments) : Builder
    {
        $this->arguments = array_merge($this->arguments,$arguments);
        return $this;
    }
    /**
     * 设置延时队列相关参数
     *
     * @param string $queueName
     * @param int    $xMessageTtl
     * @param string $xDeadLetterExchange
     * @param string $xDeadLetterRoutingKey
     *
     * @return $this
     */
    public function setDelayedQueue(string $queueName,int $xMessageTtl,string $xDeadLetterExchange,string $xDeadLetterRoutingKey) : self
    {
        $this->setArguments([
            'x-message-ttl'             => ['I',$xMessageTtl * 1000],// 毫秒
            'x-dead-letter-exchange'    => ['S',$xDeadLetterExchange],'x-dead-letter-routing-key' => ['S',$xDeadLetterRoutingKey],]);
        $this->setQueue($queueName);
        return $this;
    }
}
 DelayProducer.php <?php
declare(strict_types = 1);
namespace AppComponentsAmqp;
use HyperfAmqpAnnotationProducer;
use HyperfAmqpBuilder;
use HyperfAmqpMessageProducerMessageInterface;
use HyperfDiAnnotationAnnotationCollector;
use PhpAmqpLibMessageAMQPMessage;
use Throwable;
class DelayProducer extends Builder
{
 /**
 * @param ProducerMessageInterface $producerMessage
 * @param AmqpBuilder              $queueBuilder
 * @param bool                     $confirm
 * @param int                      $timeout
 *
 * @return bool
 * @throws Throwable
 */
 public function produce(ProducerMessageInterface $producerMessage,AmqpBuilder $queueBuilder,bool $confirm = false,int $timeout = 5) : bool
 {
 return retry(1,function () use ($producerMessage,$queueBuilder,$confirm,$timeout)
 {
 return $this->produceMessage($producerMessage,$timeout);
 });
 }
 /**
 * @param ProducerMessageInterface $producerMessage
 * @param AmqpBuilder              $queueBuilder
 * @param bool                     $confirm
 * @param int                      $timeout
 *
 * @return bool
 * @throws Throwable
 */
 private function produceMessage(ProducerMessageInterface $producerMessage,int $timeout = 5) : bool
 {
 $result = false;
 $this->injectMessageProperty($producerMessage);
 $message = new AMQPMessage($producerMessage->payload(),$producerMessage->getProperties());
 $pool = $this->getConnectionPool($producerMessage->getPoolName());
 /** @var HyperfAmqpConnection $connection */
 $connection = $pool->get();
 if ($confirm) {
 $channel = $connection->getConfirmChannel();
 } else {
 $channel = $connection->getChannel();
 }
 $channel->set_ack_handler(function () use (&$result)
 {
 $result = true;
 });
 try {
 // 处理延时队列
 $exchangeBuilder = $producerMessage->getExchangeBuilder();
 // 队列定义
 $channel->queue_declare($queueBuilder->getQueue(),$queueBuilder->isPassive(),$queueBuilder->isDurable(),$queueBuilder->isExclusive(),$queueBuilder->isAutoDelete(),$queueBuilder->isNowait(),$queueBuilder->getArguments(),$queueBuilder->getTicket());
 // 路由定义
 $channel->exchange_declare($exchangeBuilder->getExchange(),$exchangeBuilder->getType(),$exchangeBuilder->isPassive(),$exchangeBuilder->isDurable(),$exchangeBuilder->isAutoDelete(),$exchangeBuilder->isInternal(),$exchangeBuilder->isNowait(),$exchangeBuilder->getArguments(),$exchangeBuilder->getTicket());
 // 队列绑定
 $channel->queue_bind($queueBuilder->getQueue(),$producerMessage->getExchange(),$producerMessage->getRoutingKey());
 // 消息发送
 $channel->basic_publish($message,$producerMessage->getRoutingKey());
 $channel->wait_for_pending_acks_returns($timeout);
 } catch (Throwable $exception) {
 // Reconnect the connection before release.
 $connection->reconnect();
 throw $exception;
 }
 finally {
 $connection->release();
 }
 return $confirm ? $result : true;
 }
 /**
 * @param ProducerMessageInterface $producerMessage
 */
 private function injectMessageProperty(ProducerMessageInterface $producerMessage) : void
 {
 if (class_exists(AnnotationCollector::class)) {
 /** @var HyperfAmqpAnnotationProducer $annotation */
 $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage),Producer::class);
 if ($annotation) {
 $annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey);
 $annotation->exchange && $producerMessage->setExchange($annotation->exchange);
 }
 }
 }
}
 处理超时订单 
 Orderqueueproducer.php <?php
declare(strict_types = 1);
namespace AppAmqpProducer;
use HyperfAmqpAnnotationProducer;
use HyperfAmqpBuilderExchangeBuilder;
use HyperfAmqpMessageProducerMessage;
/**
 * @Producer(exchange="order_exchange",routingKey="order_exchange")
 */
class OrderQueueProducer extends ProducerMessage
{
 public function __construct($data)
 {
 $this->payload = $data;
 }
 public function getExchangeBuilder() : ExchangeBuilder
 {
 return parent::getExchangeBuilder(); // TODO: Change the autogenerated stub
 }
}
 Orderqueueconsumer.php
<?php
declare(strict_types = 1);
namespace AppAmqpConsumer;
use AppServiceCityTransportOrderService;
use HyperfAmqpResult;
use HyperfAmqpAnnotationConsumer;
use HyperfAmqpMessageConsumerMessage;
/**
 * @Consumer(exchange="delay_exchange",routingKey="delay_route",queue="delay_queue",name ="OrderQueueConsumer",nums=1)
 */
class OrderQueueConsumer extends ConsumerMessage
{
 public function consume($data) : string
 {
 ##业务处理
 }
 public function isEnable() : bool
 {
 return true;
 }
} 
 Demo $builder = new AmqpBuilder();
 $builder->setDelayedQueue('order_exchange',1,'delay_exchange','delay_route');
 $que = ApplicationContext::getContainer()->get(DelayProducer::class);
 var_dump($que->produce(new OrderQueueProducer(['order_sn' => (string)mt_rand(10000,90000)]),$builder)) 
 更多学习内容请访问: 腾讯T3-T4标准精品PHP架构师教程目录大全,只要你看完保证薪资上升一个台阶(持续更新) ? (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!  | 
                  
