springboot整合rabbitmq的示例代码
概述
基本概念 Broker 用来处理数据的消息队列服务器实体 vhost 由RabbitMQ服务器创建的虚拟消息主机,拥有自己的权限机制,一个broker里可以开设多个vhost,用于不同用户的权限隔离,vhost之间是也完全隔离的。 productor 产生用于消息通信的数据 channel 消息通道,在AMQP中可以建立多个channel,每个channel代表一个会话任务。 exchange direct 转发消息到routing-key指定的队列 fanout 转发消息到所有绑定的队列,类似于一种广播发送的方式。 topic 按照规则转发消息,这种规则多为模式匹配,也显得更加灵活 queue
binding 表示交换机和队列之间的关系,在进行绑定时,带有一个额外的参数binding-key,来和routing-key相匹配。 consumer 监听消息队列来进行消息数据的读取 springboot下三种Exchange模式(fanout,direct,topic)实现 pom.xml中引用spring-boot-starter-amqp <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 增加rabbitmq配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest direct direct模式一般情况下只需要定义queue 使用自带交换机(defaultExchange)无需绑定交换机 @Configuration public class RabbitP2PConfigure { public static final String QUEUE_NAME = "p2p-queue"; @Bean public Queue queue() { return new Queue(QUEUE_NAME,true); } } @RunWith(SpringRunner.class) @SpringBootTest(classes = BootCoreTestApplication.class) @Slf4j public class RabbitTest { @Autowired private AmqpTemplate amqpTemplate; /** * 发送 */ @Test public void sendLazy() throws InterruptedException { City city = new City(234556666L,"direct_name","direct_code"); amqpTemplate.convertAndSend(RabbitLazyConfigure.QUEUE_NAME,city); } /** * 领取 */ @Test public void receive() throws InterruptedException { Object obj = amqpTemplate.receiveAndConvert(RabbitLazyConfigure.QUEUE_NAME); Assert.notNull(obj,""); log.debug(obj.toString()); } } 适用场景:点对点 fanout fanout则模式需要将多个queue绑定在同一个交换机上 @Configuration public class RabbitFanoutConfigure { public static final String EXCHANGE_NAME = "fanout-exchange"; public static final String FANOUT_A = "fanout.A"; public static final String FANOUT_B = "fanout.B"; public static final String FANOUT_C = "fanout.C"; @Bean public Queue AMessage() { return new Queue(FANOUT_A); } @Bean public Queue BMessage() { return new Queue(FANOUT_B); } @Bean public Queue CMessage() { return new Queue(FANOUT_C); } @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(EXCHANGE_NAME); } @Bean public Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean public Binding bindingExchangeB(Queue BMessage,FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean public Binding bindingExchangeC(Queue CMessage,FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); } } 发送者 @Slf4j public class Sender { @Autowired private AmqpTemplate rabbitTemplate; public void sendFanout(Object message) { log.debug("begin send fanout message<" + message + ">"); rabbitTemplate.convertAndSend(RabbitFanoutConfigure.EXCHANGE_NAME,"",message); } } 我们可以通过@RabbitListener监听多个queue来进行消费 @Slf4j @RabbitListener(queues = { RabbitFanoutConfigure.FANOUT_A,RabbitFanoutConfigure.FANOUT_B,RabbitFanoutConfigure.FANOUT_C }) public class Receiver { @RabbitHandler public void receiveMessage(String message) { log.debug("Received <" + message + ">"); } } 适用场景 topic 这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”,Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列。 在进行绑定时,要提供一个该队列关心的主题,如“topic.# (“#”表示0个或若干个关键字,“*”表示一个关键字。 ) @Configuration public class RabbitTopicConfigure { public static final String EXCHANGE_NAME = "topic-exchange"; public static final String TOPIC = "topic"; public static final String TOPIC_A = "topic.A"; public static final String TOPIC_B = "topic.B"; @Bean public Queue queueTopic() { return new Queue(RabbitTopicConfigure.TOPIC); } @Bean public Queue queueTopicA() { return new Queue(RabbitTopicConfigure.TOPIC_A); } @Bean public Queue queueTopicB() { return new Queue(RabbitTopicConfigure.TOPIC_B); } @Bean public TopicExchange exchange() { TopicExchange topicExchange = new TopicExchange(EXCHANGE_NAME); topicExchange.setDelayed(true); return new TopicExchange(EXCHANGE_NAME); } @Bean public Binding bindingExchangeTopic(Queue queueTopic,TopicExchange exchange) { return BindingBuilder.bind(queueTopic).to(exchange).with(RabbitTopicConfigure.TOPIC); } @Bean public Binding bindingExchangeTopics(Queue queueTopicA,TopicExchange exchange) { return BindingBuilder.bind(queueTopicA).to(exchange).with("topic.#"); } } 同时去监听三个queue @Slf4j @RabbitListener(queues = { RabbitTopicConfigure.TOPIC,RabbitTopicConfigure.TOPIC_A,RabbitTopicConfigure.TOPIC_B }) public class Receiver { @RabbitHandler public void receiveMessage(String message) { log.debug("Received <" + message + ">"); } } 通过测试我们可以发现 @RunWith(SpringRunner.class) @SpringBootTest(classes = BootCoreTestApplication.class) public class RabbitTest { @Autowired private AmqpTemplate rabbitTemplate; @Test public void sendAll() { rabbitTemplate.convertAndSend(RabbitTopicConfigure.EXCHANGE_NAME,"topic.test","send All"); } @Test public void sendTopic() { rabbitTemplate.convertAndSend(RabbitTopicConfigure.EXCHANGE_NAME,RabbitTopicConfigure.TOPIC,"send Topic"); } @Test public void sendTopicA() { rabbitTemplate.convertAndSend(RabbitTopicConfigure.EXCHANGE_NAME,"send TopicA"); } } 适用场景 延迟队列 延迟消费:
延迟重试:
设置交换机延迟属性为true @Configuration public class RabbitLazyConfigure { public static final String QUEUE_NAME = "lazy-queue-t"; public static final String EXCHANGE_NAME = "lazy-exchange-t"; @Bean public Queue queue() { return new Queue(QUEUE_NAME,true); } @Bean public DirectExchange defaultExchange() { DirectExchange directExchange = new DirectExchange(EXCHANGE_NAME,true,false); directExchange.setDelayed(true); return directExchange; } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(defaultExchange()).with(QUEUE_NAME); } } 发送时设置延迟时间即可 @Slf4j public class Sender { @Autowired private AmqpTemplate rabbitTemplate; public void sendLazy(Object msg) { log.debug("begin send lazy message<" + msg + ">"); rabbitTemplate.convertAndSend(RabbitLazyConfigure.EXCHANGE_NAME,RabbitLazyConfigure.QUEUE_NAME,msg,message -> { message.getMessageProperties().setHeader("x-delay",10000); return message; } ); } } 结束 各种使用案例请直接查看官方文档 以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持编程小技巧。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |