1.消息概述
??可通过消息服务中间件来提升系统异步通信、扩展解耦能力
??消息服务中两个重要概念:消息代理(message broker)和目的地(destination)当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。
消息队列主要有两种形式的目的地:
??队列(queue):点对点消息通信(point-to-point)
??主题(topic):发布(publish)/订阅(subscribe)消息通信
注:通过ActiveMQ的学习即可知道以上的概念
?
??在未引入消息中间件的情况下,响应时间并不能降到最低;在引入消息中间件时,响应时间由150ms降低为55ms;
?
??在秒杀系统中,我们可以引入消息队列进行流量削峰。如,5件商品,100人抢购,如果抢购完了,则后面抢购的消息全部拒绝。
(1).点对点式(队列)
??消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列
??消息只有唯一的发送者和接受者,但并不是说只能有一个接收者
(2).发布订阅式(主题)
??发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息.类比微信公众号
(3).JMS(Java Message Service)
??JAVA消息服务(Java Message Service),基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现
(4).AMQP(Advanced Message Queuing Protocol)
??高级消息队列协议(Advanced Message Queuing Protocol),也是一个消息代理的规范,兼容JMS.RabbitMQ是AMQP的实现;
(5).JMS和AMQP区别
?
(6).Spring支持
??spring-jms提供了对JMS的支持;
??spring-rabbit提供了对AMQP的支持;
??需要ConnectionFactory的实现来连接消息代理;
??提供JmsTemplate、RabbitTemplate来发送消息;
??@JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上监听消息代理发布的消息;
??@EnableJms、@EnableRabbit开启支持;
(7).Spring Boot自动配置
??JmsAutoConfiguration
??RabbitAutoConfiguration
2.RabbitMQ简介
??RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。
(1).核心概念
??Message:消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
??Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
??Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Exchange有4种类型:direct(默认),fanout,topic,和headers,不同类型的Exchange转发消息的策略有所区别.
??Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
??Binding:绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和Queue的绑定可以是多对多的关系。
??Connection:网络连接,比如一个TCP连接。
??Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
??Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
??Virtual Host虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是/。
??Broker:表示消息队列服务器实体。
?
(2).RabbitMQ运行机制
[1].AMQP(Advanced Message Queuing Protocol)消息路由
??AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别,AMQP 中增加了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。
?
[2].Direct Exchange类型
??Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers;headers 匹配 AMQP 消息的 header 而不是路由键, headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:
?
[3].Fanout Exchange类型?
[4].Topic Exchange类型
?
??如,usa.news和usa.weather会匹配到usa.#上;而usa.weather和europe.weather就会匹配到#.weather;类似于模糊匹配
3.RabbitMQ整合
(1).搭建docker
[1].安装docker
https://www.cnblogs.com/HOsystem/p/13789551.html
|
[2].加速docker
https://www.cnblogs.com/HOsystem/p/13789551.html
|
(2).搭建rabbitmq
??在docker上pull rabbitmq
[root@hosystem ~]# docker pull rabbitmq
Using default tag: latest
latest: Pulling from library/rabbitmq
171857c49d0f: Pull complete
419640447d26: Pull complete
61e52f862619: Pull complete
856781f94405: Pull complete
125d5ee3d600: Pull complete
42de77c4d197: Pull complete
4d65f87814dd: Pull complete
f6c0bf06039f: Pull complete
01671add1b7b: Pull complete
088ff84cf8cb: Pull complete
Digest: sha256:3da3bcd2167a1fc9bdbbc40ec0ae2b195df5df05e3c10c64569c969cb3d86435
Status: Downloaded newer image for rabbitmq:latest
docker.io/library/rabbitmq:latest
[root@hosystem ~]# docker images
REPOSITORY ?????????TAG ????????????????IMAGE ID ???????????CREATED ????????????SIZE
redis ??????????????latest ?????????????62f1d3402b78 ???????4 days ago ?????????104MB
rabbitmq ???????????latest ?????????????ea2bf0a30abf ???????4 weeks ago ????????156MB
hello-world ????????latest ?????????????bf756fb1ae65 ???????10 months ago ??????13.3kB
|
??通过docker启动rabbitmq
[root@hosystem ~]# docker images
REPOSITORY ?????????TAG ????????????????IMAGE ID ???????????CREATED ????????????SIZE
rabbitmq ???????????latest ?????????????ea2bf0a30abf ???????4 weeks ago ????????156MB
[root@hosystem ~]# docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq ea2bf0a30abf
e687835a6ea784d55717dc402d5d447d62e486e78f6c770ec703dfdec3d64f16
[root@hosystem ~]#
|
??-d:表示后台启动
??-p:进行端口映射
??--name:重新名,修改成我们想要的名字?
??访问rabbitmq,因为我的ip为192.168.188.198所以只要在浏览器上输入192.168.188.198:15672即可;账号:guest 密码:guest?
?
注:rabbitMQ启动后用web访问显示服务器拒绝访问,用以下方法解决
#添加防火墙规则
[root@hosystem ~]# firewall-cmd --permanent --zone=public --add-port=15672/tcp
success
[root@hosystem ~]# firewall-cmd --reload
success
|
#https://blog.csdn.net/tl1242616458/article/details/105586984
[root@hosystem ~]# docker exec -it myrabbitmq /bin/bash
[root@e687835a6ea7:/]# rabbitmq-plugins enable rabbitmq_management
|
(3).rabbitmq web操作
?
[1].添加exchange
??按照上图依次添加,exchange.direct、exchange.fanout、exchange.topic这三个exchange.效果如下
?
[2].添加queues
?
[3].绑定关系
??点击需要的exchange,进去后在bingdings里填写与之绑定的queues。
①.direct bindings
?
②.fanout bindings
?
③.topic bindings
?
[4].发送消息
①exchange.direct
?
②exchange.fanout
?
③.exchange.topic
?
??我们发送key为hello.news的消息,因为我们topic有#.news,所以只要有#.news都可以接收
?
[5].获取消息
①.hos queues
?
②.hosystem.news queues
?
?
(4).IDEA整合RabbitMQ
[1].创建工程
①.引入rabbit
<dependency>
???<groupId>org.springframework.amqp</groupId>
???<artifactId>spring-rabbit-test</artifactId>
???<scope>test</scope>
</dependency>
|
②.application.yml
#rabbitmq配置信息
spring.rabbitmq.host=192.168.188.198
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#spring.rabbitmq.port=
#spring.rabbitmq.virtual-host=
|
③.测试RabbitMQ
??AmqpAdmin:管理组件
??RabbitTemplate:消息发送处理组件
[2].查看RabbitAutoConfiguration
?
[3].查看CachingConnectionFactory
?
[4].查看RabbitProperties
??RabbitProperties是封装RabbitMQ相关配置的类
?
[5].查看RabbitTemplate
??RabbitTemplate是用于RabbitMQ发送和接收消息
?
[6].查看AmqpAdmin
??AmqpAdmin是RabbitMQ系统管理功能组件
?
[7].application.properties
??配置rabbitmq参数
#rabbitmq配置信息
spring.rabbitmq.host=192.168.188.198
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#spring.rabbitmq.port=
#spring.rabbitmq.virtual-host=
|
[8].发送消息
??发送消息到rabbitmq,默认使用java-serialized序列化
@SpringBootTest
class Springboot02AmqpApplicationTests {
?
@Autowired
RabbitTemplate rabbitTemplate;
?
/**
* 单播(点对点)
*/
@Test
void contextLoads() {
//message需要自己定义;定义消息体内容和消息头(org.springframework.amqp.core.Message())
// rabbitTemplate.send(exchange,routekey,message);
?
//object默认当成消息体,只需要传入要发送的对象,自动序列化发送给rabbitmq
// rabbitTemplate.convertAndSend(exchange,message);
Map<String,Object> map = new HashMap<>();
map.put("msg","test1");
map.put("data",Arrays.asList("helloworld",123,true));
//对象被默认序列化(java-serialized-object)后发送
// rabbitTemplate.convertAndSend("exchange.direct","hos.news",map);
?
}
|
?
[9].接收消息
/**
?* ?接收rabbitmsq消息
?* ?将数据转为json发送出去(private MessageConverter messageConverter = new SimpleMessageConverter();)
?*/
@Test
public void receive(){
????Object o = rabbitTemplate.receiveAndConvert("hos.news");
????System.out.println(o.getClass());
????System.out.println(o);
????Object o1 = rabbitTemplate.receiveAndConvert("hos.news");
????System.out.println(o1.getClass());
????System.out.println(o1);
?
}
|
?
[10].数据转json
??将数据转为json发送出去MessageConverter messageConverter = new SimpleMessageConverter();(org.springframework.amqp.support.converter.MessageConverter)
①.MyAMQPConfig.java
package com.hosystem.amqp.config;
?
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
?
@Configuration
public class MyAMQPConfig {
?
????@Bean
????public MessageConverter messageConverter(){
????????return new Jackson2JsonMessageConverter();
????}
}
|
②.Book.java
package com.hosystem.amqp.bean;
?
public class Book {
?
????private String bookName;
????private String author;
?
????public Book() {
????}
?
????public Book(String bookName,String author) {
?
????????this.bookName = bookName;
????????this.author = author;
????}
?
????public String getBookName() {
????????return bookName;
????}
?
????public String getAuthor() {
????????return author;
????}
?
????public void setBookName(String bookName) {
????????this.bookName = bookName;
????}
?
????public void setAuthor(String author) {
????????this.author = author;
????}
}
|
③.自定义对象
??使用自定义对象发送给rabbitmq
@SpringBootTest
class Springboot02AmqpApplicationTests {
?
@Autowired
RabbitTemplate rabbitTemplate;
?
/**
* 单播(点对点)
*/
@Test
void contextLoads() {
//message需要自己定义;定义消息体内容和消息头(org.springframework.amqp.core.Message())
// rabbitTemplate.send(exchange,true));
//发送自定义对象
????????rabbitTemplate.convertAndSend("exchange.direct",new Book("Linux","linux"));
?
}
}
|
④.接收json数据
@SpringBootTest
class Springboot02AmqpApplicationTests {
?
???@Autowired
???RabbitTemplate rabbitTemplate;
?
?
????/**
?????* ?接收rabbitmsq消息
?????* ?将数据转为json发送出去(private MessageConverter messageConverter = new SimpleMessageConverter();)
?????*/
????@Test
????public void receive(){
????????Object o = rabbitTemplate.receiveAndConvert("hos.news");
????????System.out.println(o.getClass());
????????System.out.println(o);
????}
|
?
[11].广播发送消息
package com.hosystem.amqp;
?
@SpringBootTest
class Springboot02AmqpApplicationTests {
?
???@Autowired
???RabbitTemplate rabbitTemplate;
?
????/**
?????* ?广播
?????*/
????@Test
????public void sendMsg(){
????????rabbitTemplate.convertAndSend("exchange.fanout","",new Book("python书籍","python作者"));
????}
}
|
[12].@EnableRabbit + @RabbitListener
??@EnableRabbit + @RabbitListener 监听消息队列内容
①.BookService.java
package com.hosystem.amqp.service;
?
import com.hosystem.amqp.bean.Book;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
?
@Service
public class BookService {
?
????@RabbitListener(queues = "hos.news")
????public void receive(Book book){
????????System.out.println("收到消息:"+book);
????}
?
????@RabbitListener(queues = "hos")
????public void receive02(Message message){
????????System.out.println(message.getBody());
????????System.out.println(message.getMessageProperties());
????}
}
|
②.Springboot02AmqpApplication.java
@EnableRabbit //开启基于注解的rabbitmq模式
@SpringBootApplication
public class Springboot02AmqpApplication {
?
???public static void main(String[] args) {
??????SpringApplication.run(Springboot02AmqpApplication.class,args);
???}
?
}
|
[13].AmqpAdmin
??AmqpAdmin(org.springframework.amqp.core.AmqpAdmin):RabbitMQ系统管理功能组件。
??AmqpAdmin:创建和删除Queue、exchange、binding
@SpringBootTest
class Springboot02AmqpApplicationTests {
?
???@Autowired
???RabbitTemplate rabbitTemplate;
?
???// ???????@Bean
????// ???????@ConditionalOnSingleCandidate(ConnectionFactory.class)
????// ???????@ConditionalOnProperty(
????// ???????????prefix = "spring.rabbitmq",
????// ???????????name = {"dynamic"},
????// ???????????matchIfMissing = true
????// ???????)
????// ???????@ConditionalOnMissingBean
????// ???????public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
????// ???????????return new RabbitAdmin(connectionFactory);
????// ???????}
???@Autowired
???AmqpAdmin amqpAdmin;
?
???//创建exchange queues binding
???@Test
???public void createExchange(){
???????//org.springframework.amqp.core.Exchange
????????//org.springframework.amqp.core.DirectExchange
????????//创建exchange
// ???????amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
// ???????System.out.println("创建成功");
?
????????//创建queues
????????//org.springframework.amqp.core.AmqpAdmin
// ???????amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));
?
????????//org.springframework.amqp.core.Binding
????????//创建binding
????????amqpAdmin.declareBinding(new Binding("amqpadmin.queue",Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqp.haha",null));
????}
}
|
? (编辑:李大同)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|