加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Python > 正文

python 与rabbitmq

发布时间:2020-12-16 23:56:15 所属栏目:Python 来源:网络整理
导读:table style="height: 30px; background-color: #afeeee; width: 1266px; ; width: 1266px;" border="0" tr tdspan style="font-size: 16px;" 一、rabbitmq简介、安装 /td /tr /table 简介: MQ全称为Message Queue,?(MQ)是一种应用程序对应用程序的通信方
<tr>
<td><span style="font-size: 16px;">一、rabbitmq简介、安装</td>
</tr></table>

简介:

MQ全称为Message Queue,?(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现的产品,遵循Mozilla Public License开源协议,RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者以下简称P,相对应的“消费者”乃message接收者以下简称C,message通过queue由P到C,queue存在于RabbitMQ,可存储尽可能多的message,多个P可向同一queue发送message,多个C可从同一个queue接收messag。

安装(linux)

1、安装erlang
以root身份执行下面命令
?
yum install erlang xmlto
?
2、安装epel源
rpm -ivh http://download.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
wget -O /etc/yum.repos.d/epel-erlang.repo http://repos.fedorapeople.org/repos/peter/erlang/epel-erlang.repo
?
3、安装rabbitmq rpm包
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.1.5/rabbitmq-server-3.1.5-1.noarch.rpm??? ?
rpm -ivh? rabbitmq-server-3.1.5-1.noarch.rpm
?
4、启动rabbitmq,并验证启动情况?
rabbitmq-server --detached &ps aux |grep rabbitmq
?
5、以服务的方式启动
service rabbitmq-server start
?
6、检查端口5672是否打开
/sbin/iptables -I INPUT -p tcp --dport 5672 -j ACCEPT
/etc/rc.d/init.d/iptables save
/etc/init.d/iptables restart??? ?
/etc/init.d/iptables status
?
7、启用维护插件
rabbitmq-plugins enable rabbitmq_management?
?
8、重启rabbitmq
service rabbitmq-server restart
?
9、登录
http://192.168.110.60:15672/ 用户名密码 guest
?
无法登陆解决办法
vim /etc/rabbitmq/rabbitmq.config
写入信息,并保存
[{rabbit,[{loopback_users,[]}]}].
?
?
?其他相关:
?
1、服务器启动与关闭
启动:service rabbitmq-server start
关闭:service rabbitmq-server stop
重启:service rabbitmq-server restart
?
2、用户管理
新增 rabbitmqctl add_user admin admin
删除 rabbitmqctl delete_user admin
修改 rabbitmqctl change_password admin admin123
?
用户列表 rabbitmqctl? list_users
设置角色 rabbitmqctl set_user_tags admin administrator monitoring policymaker management
?
设置用户权限 rabbitmqctl? set_permissions? -p? VHostPath? admin? ConfP? WriteP? ReadP
查询所有权限 rabbitmqctl? list_permissions? [-p? VHostPath]
指定用户权限 rabbitmqctl? list_user_permissions? admin
清除用户权限 rabbitmqctl? clear_permissions? [-p VHostPath]? admin
?
tips:
设置远程用户密码
?

<table style="height: 30px; background-color: #afeeee; width: 1266px; ; width: 1266px;" border="0">

二、rabbitmq python API
?详细的api请查看rabbitmq官网:http://www.rabbitmq.com/devtools.html
安装:pip install pika
1.简单的消费者生产者模型

生产者:
==,port=5672)) channel = connection.channel() channel.queue_declare(queue=<span style="color: #800000;">'<span style="color: #800000;">hello<span style="color: #800000;">')<span style="color: #008000;">#<span style="color: #008000;">声明queue

<span style="color: #008000;">#<span style="color: #008000;"> n RabbitMQ a message can never be sent directly to the queue,it always needs to go through an exchange.
channel.basic_publish(exchange=<span style="color: #800000;">''<span style="color: #000000;">,routing_key=<span style="color: #800000;">'<span style="color: #800000;">hello<span style="color: #800000;">'<span style="color: #000000;">,body=<span style="color: #800000;">'<span style="color: #800000;">Hello World!<span style="color: #800000;">'<span style="color: #000000;">)
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> Sent 'Hello World!'<span style="color: #800000;">"<span style="color: #000000;">)
connection.close()

消费者:

==,port=5672)) channel = connection.channel() =)

<span style="color: #0000ff;">def callback(ch,method,properties,body):<span style="color: #008000;">#<span style="color: #008000;">回调函数,收到消息后执行的函数,body指消息主题
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> [x] Received %r<span style="color: #800000;">" %<span style="color: #000000;"> body)

channel.basic_consume(callback,queue=<span style="color: #800000;">'<span style="color: #800000;">hello<span style="color: #800000;">'<span style="color: #000000;">,no_ack=True) <span style="color: #008000;">#<span style="color: #008000;">如果设置no_ack=Flase,会把消费的消息重写添加到队列中

<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;"> [*] Waiting for messages.<span style="color: #800000;">'<span style="color: #000000;">)
channel.start_consuming()<span style="color: #008000;">#<span style="color: #008000;">阻塞模式

2.work模式(轮询)

  • 在这种模式下,RabbitMQ会默认把p发的消息依次分发给连接该条队列的各个消费者(c),跟负载均衡差类似,如果在消费者段设置了no_ack=Flase(默认),也就是确认消息,如果在回调函数中不手动进行确认,那么该消息将一直存在,此时我们需要在回调函数周手动确认消息接收完毕,此时队列中的消息才会被删除。
  • 假如消费者处理消息需要15秒,当消费者断开了,那这个消息处理明显还没处理完,并设置了no_ack=Flase(默认),此时该条消息会发给下一个消费者。
  • 上面的效果消费端断了就转到另外一个消费端去了,但是生产者怎么知道消费端断了呢??因为生产者和消费者是通过socket连接的,socket断了,就说明消费端断开了。

生产者:

connection =<span style="color: #000000;"> pika.BlockingConnection(pika.ConnectionParameters(
<span style="color: #800000;">'
<span style="color: #800000;">10.0.0.241
<span style="color: #800000;">'
<span style="color: #000000;">))
channel
=<span style="color: #000000;"> connection.channel()

<span style="color: #008000;">#<span style="color: #008000;"> 声明queue
channel.queue_declare(queue=<span style="color: #800000;">'<span style="color: #800000;">task_queue<span style="color: #800000;">'<span style="color: #000000;">)

message = <span style="color: #800000;">"<span style="color: #800000;">Hello World! %s<span style="color: #800000;">" %<span style="color: #000000;"> time.time()
channel.basic_publish(exchange=<span style="color: #800000;">''<span style="color: #000000;">,routing_key=<span style="color: #800000;">'<span style="color: #800000;">task_queue<span style="color: #800000;">'<span style="color: #000000;">,body=<span style="color: #000000;">message,)
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> [x] Sent %r<span style="color: #800000;">" %<span style="color: #000000;"> message)
connection.close()

消费者:

connection =<span style="color: #000000;"> pika.BlockingConnection(pika.ConnectionParameters(
<span style="color: #800000;">'
<span style="color: #800000;">10.0.0.241
<span style="color: #800000;">'
<span style="color: #000000;">))
channel
=<span style="color: #000000;"> connection.channel()

<span style="color: #0000ff;">def<span style="color: #000000;"> callback(ch,body):
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> [x] Received %r<span style="color: #800000;">" %<span style="color: #000000;"> body)
time.sleep(10<span style="color: #000000;">)
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> [x] Done<span style="color: #800000;">"<span style="color: #000000;">)
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;">method.delivery_tag<span style="color: #800000;">"<span style="color: #000000;">,method.delivery_tag)
ch.basic_ack(delivery_tag=method.delivery_tag)<span style="color: #008000;">#<span style="color: #008000;">主动向服务器发确认消息,此时delivery_tag为消费消息的tag号
<span style="color: #000000;">

channel.basic_consume(callback,queue=<span style="color: #800000;">'<span style="color: #800000;">task_queue<span style="color: #800000;">'<span style="color: #000000;">,<span style="color: #008000;">#<span style="color: #008000;"> no_ack=True 如果在回掉函数中手动确认必须把no_ack设置为Flase或者不带该参数
<span style="color: #000000;"> )

<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;"> [*] Waiting for messages. To exit press CTRL+C<span style="color: #800000;">'<span style="color: #000000;">)
channel.start_consuming()

?公平的分发消息:

在实际的应用中,每个客户端的消费消息的能力是不一样的,如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。如下图:

消费者:

connection =<span style="color: #000000;"> pika.BlockingConnection(pika.ConnectionParameters(
host
=<span style="color: #800000;">'
<span style="color: #800000;">10.0.0.241
<span style="color: #800000;">'
<span style="color: #000000;">))
channel
=<span style="color: #000000;"> connection.channel()

channel.queue_declare(queue=<span style="color: #800000;">'<span style="color: #800000;">task_queue<span style="color: #800000;">'<span style="color: #000000;">)
<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;"> [*] Waiting for messages. To exit press CTRL+C<span style="color: #800000;">'<span style="color: #000000;">)

<span style="color: #0000ff;">def<span style="color: #000000;"> callback(ch,body):
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> [x] Received %r<span style="color: #800000;">" %<span style="color: #000000;"> body)
time.sleep(body.count(b<span style="color: #800000;">'<span style="color: #800000;">.<span style="color: #800000;">'<span style="color: #000000;">))
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> [x] Done<span style="color: #800000;">"<span style="color: #000000;">)
ch.basic_ack(delivery_tag=<span style="color: #000000;">method.delivery_tag)

channel.basic_qos(prefetch_count=1)<span style="color: #008000;">#<span style="color: #008000;">设置消费的条数为1,当当前消费者有一条消息未消费完时,该消费者不会主动接受消息了。
<span style="color: #000000;">channel.basic_consume(callback,queue=<span style="color: #800000;">'<span style="color: #800000;">task_queue<span style="color: #800000;">'<span style="color: #000000;">)

channel.start_consuming()

<table style="height: 30px; background-color: #afeeee; width: 1266px; ; width: 1266px;" border="0">

<tr>
<td><span style="font-size: 16px;">三、消息持久化</td>
</tr></table>

?当rabbitmq队列中有很多消息,此时rabbitmq server宕机了,会导致数据丢下,那么如何将消息进行持久化呢。分两步:

1.持久化管道:

在生产者和消费者两端声明管道时候加参数:

channel.queue_

2.持久化消息:

在生产者端设置properties参数:

properties=pika

完整的demo:

生产者:

connection =<span style="color: #000000;"> pika.BlockingConnection(pika.ConnectionParameters(
<span style="color: #800000;">'<span style="color: #800000;">localhost<span style="color: #800000;">',5672)) <span style="color: #008000;">#<span style="color: #008000;"> 默认端口5672,可不写
channel =<span style="color: #000000;"> connection.channel()
<span style="color: #008000;">#<span style="color: #008000;">声明queue
channel.queue_declare(queue=<span style="color: #800000;">'<span style="color: #800000;">hello2<span style="color: #800000;">',durable=<span style="color: #000000;">True)
channel.basic_publish(exchange=<span style="color: #800000;">''<span style="color: #000000;">,routing_key=<span style="color: #800000;">'<span style="color: #800000;">hello2<span style="color: #800000;">'<span style="color: #000000;">,body=<span style="color: #800000;">'<span style="color: #800000;">Hello World!<span style="color: #800000;">'<span style="color: #000000;">,properties=<span style="color: #000000;">pika.BasicProperties(
delivery_mode=2,<span style="color: #008000;">#<span style="color: #008000;"> make message persistent
<span style="color: #000000;"> )
)

<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> Sent 'Hello World!'<span style="color: #800000;">"<span style="color: #000000;">)
connection.close()

消费者:

connection =<span style="color: #000000;"> pika.BlockingConnection(pika.ConnectionParameters(
<span style="color: #800000;">'
<span style="color: #800000;">localhost
<span style="color: #800000;">'<span style="color: #000000;">))
channel =<span style="color: #000000;"> connection.channel()
channel.queue_declare(queue=<span style="color: #800000;">'<span style="color: #800000;">hello2<span style="color: #800000;">',durable=<span style="color: #000000;">True)

<span style="color: #0000ff;">def<span style="color: #000000;"> callback(ch,body):
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> [x] Received %r<span style="color: #800000;">" %<span style="color: #000000;"> body)
time.sleep(10<span style="color: #000000;">)
ch.basic_ack(delivery_tag = method.delivery_tag) <span style="color: #008000;">#<span style="color: #008000;"> 告诉生产者,消息处理完成
<span style="color: #000000;">
channel.basic_qos(prefetch_count=1) <span style="color: #008000;">#<span style="color: #008000;"> 类似权重,按能力分发,如果有一个消息,就不在给你发
channel.basic_consume( <span style="color: #008000;">#<span style="color: #008000;"> 消费消息
callback,<span style="color: #008000;">#<span style="color: #008000;"> 如果收到消息,就调用callback
queue=<span style="color: #800000;">'<span style="color: #800000;">hello2<span style="color: #800000;">'<span style="color: #000000;">,<span style="color: #008000;">#<span style="color: #008000;"> no_ack=True # 一般不写,处理完接收处理结果。宕机则发给其他消费者
<span style="color: #000000;"> )

<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;"> [*] Waiting for messages. To exit press CTRL+C<span style="color: #800000;">'<span style="color: #000000;">)
channel.start_consuming()

<table style="height: 30px; background-color: #afeeee; width: 1266px; ; width: 1266px;" border="0">

<tr>
<td><span style="font-size: 16px;">四、rabbitmq发布/订阅的三种模式</td>
</tr></table>

之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,定义的类型有三种:

  • fanout: 所有绑定到此exchange的queue都可以接收消息
  • direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
  • topic: 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

1.fanout

fanout模式是纯广播模式,所有绑定了相同的exchange的消费者都能收到来自生产者的一条消息,收取消息时需要queue和exchange绑定,因为消费者不是和exchange直连的,消费者是连在queue上,queue绑定在exchange上,消费者只会在queu里收消息。如下图:

?demo:

发布者:

connection =<span style="color: #000000;"> pika.BlockingConnection(pika.ConnectionParameters(
host
=<span style="color: #800000;">'
<span style="color: #800000;">localhost
<span style="color: #800000;">'<span style="color: #000000;">))
channel =<span style="color: #000000;"> connection.channel()
<span style="color: #008000;">#<span style="color: #008000;"> 注意:这里是广播,不需要声明queue
channel.exchange_declare(exchange=<span style="color: #800000;">'<span style="color: #800000;">logs<span style="color: #800000;">',<span style="color: #008000;">#<span style="color: #008000;"> 声明广播管道
type=<span style="color: #800000;">'<span style="color: #800000;">fanout<span style="color: #800000;">'<span style="color: #000000;">)

<span style="color: #008000;">#<span style="color: #008000;"> message = ' '.join(sys.argv[1:]) or "info: Hello World!"
message = <span style="color: #800000;">"<span style="color: #800000;">info: Hello World!<span style="color: #800000;">"<span style="color: #000000;">
channel.basic_publish(exchange=<span style="color: #800000;">'<span style="color: #800000;">logs<span style="color: #800000;">'<span style="color: #000000;">,routing_key=<span style="color: #800000;">'',<span style="color: #008000;">#<span style="color: #008000;"> 注意此处空,必须有
body=<span style="color: #000000;">message)
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> [x] Sent %r<span style="color: #800000;">" %<span style="color: #000000;"> message)
connection.close()

订阅者:

connection =<span style="color: #000000;"> pika.BlockingConnection(pika.ConnectionParameters(
host
=<span style="color: #800000;">'<span style="color: #800000;">localhost<span style="color: #800000;">'<span style="color: #000000;">))
channel =<span style="color: #000000;"> connection.channel()

channel.exchange_declare(exchange=<span style="color: #800000;">'<span style="color: #800000;">logs<span style="color: #800000;">'<span style="color: #000000;">,type=<span style="color: #800000;">'<span style="color: #800000;">fanout<span style="color: #800000;">'<span style="color: #000000;">)
<span style="color: #008000;">#<span style="color: #008000;"> 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
result = channel.queue_declare(exclusive=<span style="color: #000000;">True)
<span style="color: #008000;">#<span style="color: #008000;"> 获取随机的queue名字
queue_name =<span style="color: #000000;"> result.method.queue
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;">random queuename:<span style="color: #800000;">"<span style="color: #000000;">,queue_name)

channel.queue_bind(exchange=<span style="color: #800000;">'<span style="color: #800000;">logs<span style="color: #800000;">',<span style="color: #008000;">#<span style="color: #008000;"> queue绑定到转发器上
queue=<span style="color: #000000;">queue_name)

<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;"> [*] Waiting for logs. To exit press CTRL+C<span style="color: #800000;">'<span style="color: #000000;">)

<span style="color: #0000ff;">def<span style="color: #000000;"> callback(ch,body):
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> [x] %r<span style="color: #800000;">" %<span style="color: #000000;"> body)

channel.basic_consume(callback,queue=<span style="color: #000000;">queue_name,<span style="color: #000000;">)

channel.start_consuming()

?2.direct模式

RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列,此时的关键字由参数routing_key指定。模式如下图:

发布者:

random = pika.PlainCredentials(,) connection ==,port=5672,virtual_host=,credentials==channel.exchange_declare(exchange=<span style="color: #800000;">'<span style="color: #800000;">direct_logs<span style="color: #800000;">'<span style="color: #000000;">,type=<span style="color: #800000;">'<span style="color: #800000;">direct<span style="color: #800000;">')<span style="color: #008000;">#<span style="color: #008000;">声明type类型
<span style="color: #000000;">
index
=randint(0,3<span style="color: #000000;">)
log_level
=[<span style="color: #800000;">'
<span style="color: #800000;">info
<span style="color: #800000;">'
,<span style="color: #800000;">'
<span style="color: #800000;">wraning
<span style="color: #800000;">'
,<span style="color: #800000;">'
<span style="color: #800000;">error
<span style="color: #800000;">'
,<span style="color: #800000;">'
<span style="color: #800000;">nothing
<span style="color: #800000;">'
<span style="color: #000000;">]
message
=<span style="color: #800000;">'
<span style="color: #800000;">{}--->Hello World!
<span style="color: #800000;">'
<span style="color: #000000;">.format(log_level[index])
channel.basic_publish(exchange=<span style="color: #800000;">'<span style="color: #800000;">direct_logs<span style="color: #800000;">'<span style="color: #000000;">,routing_key=log_level[index],<span style="color: #008000;">#<span style="color: #008000;">发消息随机绑定一个关键字
body=<span style="color: #000000;">message)
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> [x] Sent %r:%r<span style="color: #800000;">" % (log_level[index],message))

订阅者:

= pika.PlainCredentials(,type=result = channel.queue_declare(exclusive=True)<span style="color: #008000;">#<span style="color: #008000;">随机生成队列名字,断开后删除
queue_name =<span style="color: #000000;"> result.method.queue
<span style="color: #008000;">#
<span style="color: #008000;"> 获取运行脚本所有的参数

<span style="color: #000000;">

channel.queue_bind(exchange=<span style="color: #800000;">'<span style="color: #800000;">direct_logs<span style="color: #800000;">'<span style="color: #000000;">,routing_key=<span style="color: #800000;">'<span style="color: #800000;">info<span style="color: #800000;">')<span style="color: #008000;">#<span style="color: #008000;">只绑定了info关键字,接受只接受info关键字的消息

<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;"> [*] Waiting for logs. To exit press CTRL+C<span style="color: #800000;">'<span style="color: #000000;">)

<span style="color: #0000ff;">def<span style="color: #000000;"> callback(ch,body):
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> [x] %r:%r<span style="color: #800000;">" %<span style="color: #000000;"> (method.routing_key,body))

channel.basic_consume(callback,<span style="color: #000000;">)

channel.start_consuming()

3.topic(主题)模式

topic相比于dirct而言,提供了更为详细的消息接受规则,可使用*、#等来匹配关键字来接受消息。

发往主题类型的转发器的消息不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。标识符可以是任何东西,但是一般都与消息的某些特性相关。一些合法的选择键的例子:"stock.usd.nyse","nyse.vmw","quick.orange.rabbit".你可以定义任何数量的标识符,上限为255个字节。绑定键和选择键的形式一样。主题类型的转发器背后的逻辑和直接类型的转发器很类似:一个附带特殊的选择键将会被转发到绑定键与之匹配的队列中。需要注意的是:关于绑定键有两种特殊的情况。*可以匹配一个标识符。#可以匹配0个或多个标识符。

例如:#.a会匹配a.a,aa.a,aaa.a等? ? ? ? ? *.a会匹配a.a,b.a,c.a等

?topic消费模式如下图:

demo:

发布者:

= pika.PlainCredentials(,==,virtual_host=,credentials==channel.exchange_declare(exchange=<span style="color: #800000;">'<span style="color: #800000;">topic_logs<span style="color: #800000;">'<span style="color: #000000;">,type=<span style="color: #800000;">'<span style="color: #800000;">topic<span style="color: #800000;">'<span style="color: #000000;">)

routing_key = sys.argv[1] <span style="color: #0000ff;">if len(sys.argv) > 1 <span style="color: #0000ff;">else <span style="color: #800000;">'<span style="color: #800000;">anonymous.info<span style="color: #800000;">'<span style="color: #000000;">
message = <span style="color: #800000;">' <span style="color: #800000;">'.join(sys.argv[2:]) <span style="color: #0000ff;">or <span style="color: #800000;">'<span style="color: #800000;">Hello World!<span style="color: #800000;">'<span style="color: #000000;">
channel.basic_publish(exchange=<span style="color: #800000;">'<span style="color: #800000;">topic_logs<span style="color: #800000;">'<span style="color: #000000;">,routing_key=<span style="color: #000000;">routing_key,body=<span style="color: #000000;">message)
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> [x] Sent %r:%r<span style="color: #800000;">" %<span style="color: #000000;"> (routing_key,message))
connection.close()

订阅者:

= pika.PlainCredentials(,credentials====result = channel.queue_declare(exclusive=<span style="color: #000000;">True)
queue_name
=<span style="color: #000000;"> result.method.queue

binding_keys = sys.argv[1<span style="color: #000000;">:]
<span style="color: #0000ff;">if <span style="color: #0000ff;">not<span style="color: #000000;"> binding_keys:
sys.stderr.write(<span style="color: #800000;">"<span style="color: #800000;">Usage: %s [binding_key]...n<span style="color: #800000;">" %<span style="color: #000000;"> sys.argv[0])
sys.exit(1<span style="color: #000000;">)

<span style="color: #0000ff;">for binding_key <span style="color: #0000ff;">in binding_keys:<span style="color: #008000;">#<span style="color: #008000;">循环绑定routing_key,如果绑定*.info,就接受以.info结尾的routing_key所发的消息。
channel.queue_bind(exchange=<span style="color: #800000;">'<span style="color: #800000;">topic_logs<span style="color: #800000;">'<span style="color: #000000;">,routing_key=<span style="color: #000000;">binding_key)

<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;"> [*] Waiting for logs. To exit press CTRL+C<span style="color: #800000;">'<span style="color: #000000;">)

<span style="color: #0000ff;">def<span style="color: #000000;"> callback(ch,body))

channel.basic_consume(callback,<span style="color: #000000;">)

channel.start_consuming()

<table style="height: 30px; background-color: #afeeee; width: 1266px; ; width: 1266px;" border="0">

<tr>
<td><span style="font-size: 16px;">五、rabbitmq应用场景(简单RPC)</td>
</tr></table>

RPC,即 Remote Procedure Call(远程过程调用),说得通俗一点就是:调用远程计算机上的服务,就像调用本地服务一样。真正的RPC有更为标准的定义,这里我们可以使用rabbitmq来实现简单的RPC模型,其原理图如下:

上述图中,client和server对于rabbitmq来说都具有两个角色,即:即是生产者又是消费者。client端通过生产者角色发送命令,服务端此时充当消费者接受客户端的命令消息,当接受到消息以后又以生产者角色发送命令结果给客户端,此时客户端是消费者接受客户端的消息。

过程:

  • 客户端 Client 设置消息的 routing key 为 Service 的队列 op_q,设置消息的 reply-to 属性为返回的 response 的目标队列 reponse_q,设置其 correlation_id 为以随机UUID,然后将消息发到 exchange。比如channel.basic_publish(exchange='',routing_key='op_q',properties=pika.BasicProperties(reply_to = reponse_q,correlation_id = self.corr_id),body=request)
  • Exchange 将消息转发到 Service 的 op_q
  • Service 收到该消息后进行处理,然后将response 发到 exchange,并设置消息的 routing_key 为原消息的 reply_to 属性,以及设置其 correlation_id 为原消息的 correlation_id?
  • ch.basic_publish(exchange='',routing_key=props.reply_to,properties=pika.BasicProperties(correlation_id = props.correlation_id),body=str(response))Exchange 将消息转发到 reponse_q
  • Client 逐一接受 response_q 中的消息,检查消息的 correlation_id 是否为等于它发出的消息的correlation_id,是的话表明该消息为它需要的response。

代码实现:

clinet:

<span style="color: #0000ff;">class<span style="color: #000000;"> FibonacciRpcClient(object):
<span style="color: #0000ff;">def
<span style="color: #800080;">init<span style="color: #000000;">(self):
self.connection =<span style="color: #000000;"> pika.BlockingConnection(pika.ConnectionParameters(
host=<span style="color: #800000;">'<span style="color: #800000;">localhost<span style="color: #800000;">'<span style="color: #000000;">))

    self.channel </span>=<span style="color: #000000;"&gt; self.connection.channel()

    result </span>= self.channel.queue_declare(exclusive=<span style="color: #000000;"&gt;True)
    self.callback_queue </span>=<span style="color: #000000;"&gt; result.method.queue

    self.channel.basic_consume(self.on_response,no_ack</span>=<span style="color: #000000;"&gt;True,queue</span>=<span style="color: #000000;"&gt;self.callback_queue)

</span><span style="color: #0000ff;"&gt;def</span><span style="color: #000000;"&gt; on_response(self,ch,props,body):
    </span><span style="color: #0000ff;"&gt;if</span> self.corr_id ==<span style="color: #000000;"&gt; props.correlation_id:
        self.response </span>=<span style="color: #000000;"&gt; body

</span><span style="color: #0000ff;"&gt;def</span><span style="color: #000000;"&gt; call(self,n):
    self.response </span>=<span style="color: #000000;"&gt; None
    self.corr_id </span>=<span style="color: #000000;"&gt; str(uuid.uuid4())
    self.channel.basic_publish(exchange</span>=<span style="color: #800000;"&gt;''</span><span style="color: #000000;"&gt;,routing_key</span>=<span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;rpc_queue</span><span style="color: #800000;"&gt;'</span><span style="color: #000000;"&gt;,properties</span>=<span style="color: #000000;"&gt;pika.BasicProperties(
                                     reply_to </span>=<span style="color: #000000;"&gt; self.callback_queue,correlation_id </span>=<span style="color: #000000;"&gt; self.corr_id,),body</span>=<span style="color: #000000;"&gt;str(n))
    </span><span style="color: #0000ff;"&gt;while</span> self.response <span style="color: #0000ff;"&gt;is</span><span style="color: #000000;"&gt; None:
        self.connection.process_data_events()
    </span><span style="color: #0000ff;"&gt;return</span><span style="color: #000000;"&gt; int(self.response)

fibonacci_rpc =<span style="color: #000000;"> FibonacciRpcClient()

<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> Requesting fib(30)<span style="color: #800000;">"<span style="color: #000000;">)
response = fibonacci_rpc.call(30<span style="color: #000000;">)
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> [.] Got %r<span style="color: #800000;">" % response)

server:

==channel =<span style="color: #000000;"> connection.channel()

channel.queue_declare(queue=<span style="color: #800000;">'<span style="color: #800000;">rpc_queue<span style="color: #800000;">'<span style="color: #000000;">)

<span style="color: #0000ff;">def<span style="color: #000000;"> fib(n):
<span style="color: #0000ff;">if n ==<span style="color: #000000;"> 0:
<span style="color: #0000ff;">return<span style="color: #000000;"> 0
<span style="color: #0000ff;">elif n == 1<span style="color: #000000;">:
<span style="color: #0000ff;">return 1
<span style="color: #0000ff;">else<span style="color: #000000;">:
<span style="color: #0000ff;">return fib(n-1) + fib(n-2<span style="color: #000000;">)

<span style="color: #0000ff;">def<span style="color: #000000;"> on_request(ch,body):
n =<span style="color: #000000;"> int(body)

</span><span style="color: #0000ff;"&gt;print</span>(<span style="color: #800000;"&gt;"</span><span style="color: #800000;"&gt; [.] fib(%s)</span><span style="color: #800000;"&gt;"</span> %<span style="color: #000000;"&gt; n)
response </span>=<span style="color: #000000;"&gt; fib(n)

ch.basic_publish(exchange</span>=<span style="color: #800000;"&gt;''</span><span style="color: #000000;"&gt;,routing_key</span>=<span style="color: #000000;"&gt;props.reply_to,properties</span>=pika.BasicProperties(correlation_id =<span style="color: #000000;"&gt; 
                                                     props.correlation_id),body</span>=<span style="color: #000000;"&gt;str(response))
ch.basic_ack(delivery_tag </span>=<span style="color: #000000;"&gt; method.delivery_tag)

channel.basic_qos(prefetch_count=1<span style="color: #000000;">)
channel.basic_consume(on_request,queue=<span style="color: #800000;">'<span style="color: #800000;">rpc_queue<span style="color: #800000;">'<span style="color: #000000;">)

<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> Awaiting RPC requests<span style="color: #800000;">"<span style="color: #000000;">)
channel.start_consuming()

?

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读