python 与rabbitmq
<table style="height: 30px; background-color: #afeeee; width: 1266px; ; width: 1266px;" border="0"> |
<span style="color: #008000;">#
<span style="color: #008000;"> n RabbitMQ a message can never be sent directly to the queue,it always needs to go through an exchange.channel.basic_publish(exchange=<span style="color: #800000;">''<span style="color: #000000;">,routing_key=<span style="color: #800000;">'<span style="color: #800000;">hello<span style="color: #800000;">'<span style="color: #000000;">,body=<span style="color: #800000;">'<span style="color: #800000;">Hello World!<span style="color: #800000;">'<span style="color: #000000;">)
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> Sent 'Hello World!'<span style="color: #800000;">"<span style="color: #000000;">)
connection.close()
消费者:
<span style="color: #0000ff;">def
callback(ch,method,properties,body):<span style="color: #008000;">#<span style="color: #008000;">回调函数,收到消息后执行的函数,body指消息主题<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> [x] Received %r<span style="color: #800000;">" %<span style="color: #000000;"> body)
channel.basic_consume(callback,queue
=<span style="color: #800000;">'<span style="color: #800000;">hello<span style="color: #800000;">'<span style="color: #000000;">,no_ack=True) <span style="color: #008000;">#<span style="color: #008000;">如果设置no_ack=Flase,会把消费的消息重写添加到队列中<span style="color: #0000ff;">print
(<span style="color: #800000;">'<span style="color: #800000;"> [*] Waiting for messages.<span style="color: #800000;">'<span style="color: #000000;">)channel.start_consuming()<span style="color: #008000;">#<span style="color: #008000;">阻塞模式
2.work模式(轮询)
- 在这种模式下,RabbitMQ会默认把p发的消息依次分发给连接该条队列的各个消费者(c),跟负载均衡差类似,如果在消费者段设置了no_ack=Flase(默认),也就是确认消息,如果在回调函数中不手动进行确认,那么该消息将一直存在,此时我们需要在回调函数周手动确认消息接收完毕,此时队列中的消息才会被删除。
- 假如消费者处理消息需要15秒,当消费者断开了,那这个消息处理明显还没处理完,并设置了no_ack=Flase(默认),此时该条消息会发给下一个消费者。
- 上面的效果消费端断了就转到另外一个消费端去了,但是生产者怎么知道消费端断了呢??因为生产者和消费者是通过socket连接的,socket断了,就说明消费端断开了。
生产者:
<span style="color: #800000;">'<span style="color: #800000;">10.0.0.241<span style="color: #800000;">'<span style="color: #000000;">))
channel =<span style="color: #000000;"> connection.channel() <span style="color: #008000;">#<span style="color: #008000;"> 声明queue
channel.queue_declare(queue=<span style="color: #800000;">'<span style="color: #800000;">task_queue<span style="color: #800000;">'<span style="color: #000000;">)
message = <span style="color: #800000;">"<span style="color: #800000;">Hello World! %s<span style="color: #800000;">" %<span style="color: #000000;"> time.time()
channel.basic_publish(exchange=<span style="color: #800000;">''<span style="color: #000000;">,routing_key=<span style="color: #800000;">'<span style="color: #800000;">task_queue<span style="color: #800000;">'<span style="color: #000000;">,body=<span style="color: #000000;">message,)
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> [x] Sent %r<span style="color: #800000;">" %<span style="color: #000000;"> message)
connection.close()
消费者:
<span style="color: #800000;">'<span style="color: #800000;">10.0.0.241<span style="color: #800000;">'<span style="color: #000000;">))
channel =<span style="color: #000000;"> connection.channel() <span style="color: #0000ff;">def<span style="color: #000000;"> callback(ch,body):
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> [x] Received %r<span style="color: #800000;">" %<span style="color: #000000;"> body)
time.sleep(10<span style="color: #000000;">)
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> [x] Done<span style="color: #800000;">"<span style="color: #000000;">)
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;">method.delivery_tag<span style="color: #800000;">"<span style="color: #000000;">,method.delivery_tag)
ch.basic_ack(delivery_tag=method.delivery_tag)<span style="color: #008000;">#<span style="color: #008000;">主动向服务器发确认消息,此时delivery_tag为消费消息的tag号
<span style="color: #000000;">
channel.basic_consume(callback,queue=<span style="color: #800000;">'<span style="color: #800000;">task_queue<span style="color: #800000;">'<span style="color: #000000;">,<span style="color: #008000;">#<span style="color: #008000;"> no_ack=True 如果在回掉函数中手动确认必须把no_ack设置为Flase或者不带该参数
<span style="color: #000000;"> )
<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;"> [*] Waiting for messages. To exit press CTRL+C<span style="color: #800000;">'<span style="color: #000000;">)
channel.start_consuming()
?公平的分发消息:
在实际的应用中,每个客户端的消费消息的能力是不一样的,如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。如下图:
消费者:
host=<span style="color: #800000;">'<span style="color: #800000;">10.0.0.241<span style="color: #800000;">'<span style="color: #000000;">))
channel =<span style="color: #000000;"> connection.channel()
channel.queue_declare(queue
=<span style="color: #800000;">'<span style="color: #800000;">task_queue<span style="color: #800000;">'<span style="color: #000000;">)<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;"> [*] Waiting for messages. To exit press CTRL+C<span style="color: #800000;">'<span style="color: #000000;">)
<span style="color: #0000ff;">def<span style="color: #000000;"> callback(ch,body):
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> [x] Received %r<span style="color: #800000;">" %<span style="color: #000000;"> body)
time.sleep(body.count(b<span style="color: #800000;">'<span style="color: #800000;">.<span style="color: #800000;">'<span style="color: #000000;">))
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> [x] Done<span style="color: #800000;">"<span style="color: #000000;">)
ch.basic_ack(delivery_tag=<span style="color: #000000;">method.delivery_tag)
channel.basic_qos(prefetch_count=1)<span style="color: #008000;">#<span style="color: #008000;">设置消费的条数为1,当当前消费者有一条消息未消费完时,该消费者不会主动接受消息了。
<span style="color: #000000;">channel.basic_consume(callback,queue=<span style="color: #800000;">'<span style="color: #800000;">task_queue<span style="color: #800000;">'<span style="color: #000000;">)
channel.start_consuming()
<table style="height: 30px; background-color: #afeeee; width: 1266px; ; width: 1266px;" border="0">
<tr><td><span style="font-size: 16px;">三、消息持久化</td>
</tr></table>
?当rabbitmq队列中有很多消息,此时rabbitmq server宕机了,会导致数据丢下,那么如何将消息进行持久化呢。分两步:
1.持久化管道:
在生产者和消费者两端声明管道时候加参数:
channel.queue_
2.持久化消息:
在生产者端设置properties参数:
properties=pika
完整的demo:
生产者:
<span style="color: #800000;">'<span style="color: #800000;">localhost<span style="color: #800000;">',5672)) <span style="color: #008000;">#<span style="color: #008000;"> 默认端口5672,可不写
channel =<span style="color: #000000;"> connection.channel()
<span style="color: #008000;">#<span style="color: #008000;">声明queue
channel.queue_declare(queue=<span style="color: #800000;">'<span style="color: #800000;">hello2<span style="color: #800000;">',durable=<span style="color: #000000;">True)
channel.basic_publish(exchange=<span style="color: #800000;">''<span style="color: #000000;">,routing_key=<span style="color: #800000;">'<span style="color: #800000;">hello2<span style="color: #800000;">'<span style="color: #000000;">,body=<span style="color: #800000;">'<span style="color: #800000;">Hello World!<span style="color: #800000;">'<span style="color: #000000;">,properties=<span style="color: #000000;">pika.BasicProperties(
delivery_mode=2,<span style="color: #008000;">#<span style="color: #008000;"> make message persistent
<span style="color: #000000;"> )
)
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> Sent 'Hello World!'<span style="color: #800000;">"<span style="color: #000000;">)
connection.close()
消费者:
<span style="color: #800000;">'<span style="color: #800000;">localhost<span style="color: #800000;">'<span style="color: #000000;">))
channel =<span style="color: #000000;"> connection.channel()
channel.queue_declare(queue=<span style="color: #800000;">'<span style="color: #800000;">hello2<span style="color: #800000;">',durable=<span style="color: #000000;">True)
<span style="color: #0000ff;">def<span style="color: #000000;"> callback(ch,body):
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> [x] Received %r<span style="color: #800000;">" %<span style="color: #000000;"> body)
time.sleep(10<span style="color: #000000;">)
ch.basic_ack(delivery_tag = method.delivery_tag) <span style="color: #008000;">#<span style="color: #008000;"> 告诉生产者,消息处理完成
<span style="color: #000000;">
channel.basic_qos(prefetch_count=1) <span style="color: #008000;">#<span style="color: #008000;"> 类似权重,按能力分发,如果有一个消息,就不在给你发
channel.basic_consume( <span style="color: #008000;">#<span style="color: #008000;"> 消费消息
callback,<span style="color: #008000;">#<span style="color: #008000;"> 如果收到消息,就调用callback
queue=<span style="color: #800000;">'<span style="color: #800000;">hello2<span style="color: #800000;">'<span style="color: #000000;">,<span style="color: #008000;">#<span style="color: #008000;"> no_ack=True # 一般不写,处理完接收处理结果。宕机则发给其他消费者
<span style="color: #000000;"> )
<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;"> [*] Waiting for messages. To exit press CTRL+C<span style="color: #800000;">'<span style="color: #000000;">)
channel.start_consuming()
<table style="height: 30px; background-color: #afeeee; width: 1266px; ; width: 1266px;" border="0">
<tr><td><span style="font-size: 16px;">四、rabbitmq发布/订阅的三种模式</td>
</tr></table>
之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,定义的类型有三种:
- fanout: 所有绑定到此exchange的queue都可以接收消息
- direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
- topic: 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
1.fanout
fanout模式是纯广播模式,所有绑定了相同的exchange的消费者都能收到来自生产者的一条消息,收取消息时需要queue和exchange绑定,因为消费者不是和exchange直连的,消费者是连在queue上,queue绑定在exchange上,消费者只会在queu里收消息。如下图:
?demo:
发布者:
host=<span style="color: #800000;">'<span style="color: #800000;">localhost<span style="color: #800000;">'<span style="color: #000000;">))
channel =<span style="color: #000000;"> connection.channel()
<span style="color: #008000;">#<span style="color: #008000;"> 注意:这里是广播,不需要声明queue
channel.exchange_declare(exchange=<span style="color: #800000;">'<span style="color: #800000;">logs<span style="color: #800000;">',<span style="color: #008000;">#<span style="color: #008000;"> 声明广播管道
type=<span style="color: #800000;">'<span style="color: #800000;">fanout<span style="color: #800000;">'<span style="color: #000000;">)
<span style="color: #008000;">#<span style="color: #008000;"> message = ' '.join(sys.argv[1:]) or "info: Hello World!"
message = <span style="color: #800000;">"<span style="color: #800000;">info: Hello World!<span style="color: #800000;">"<span style="color: #000000;">
channel.basic_publish(exchange=<span style="color: #800000;">'<span style="color: #800000;">logs<span style="color: #800000;">'<span style="color: #000000;">,routing_key=<span style="color: #800000;">'',<span style="color: #008000;">#<span style="color: #008000;"> 注意此处空,必须有
body=<span style="color: #000000;">message)
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> [x] Sent %r<span style="color: #800000;">" %<span style="color: #000000;"> message)
connection.close()
订阅者:
host=<span style="color: #800000;">'<span style="color: #800000;">localhost<span style="color: #800000;">'<span style="color: #000000;">))
channel =<span style="color: #000000;"> connection.channel()
channel.exchange_declare(exchange=<span style="color: #800000;">'<span style="color: #800000;">logs<span style="color: #800000;">'<span style="color: #000000;">,type=<span style="color: #800000;">'<span style="color: #800000;">fanout<span style="color: #800000;">'<span style="color: #000000;">)
<span style="color: #008000;">#<span style="color: #008000;"> 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
result = channel.queue_declare(exclusive=<span style="color: #000000;">True)
<span style="color: #008000;">#<span style="color: #008000;"> 获取随机的queue名字
queue_name =<span style="color: #000000;"> result.method.queue
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;">random queuename:<span style="color: #800000;">"<span style="color: #000000;">,queue_name)
channel.queue_bind(exchange=<span style="color: #800000;">'<span style="color: #800000;">logs<span style="color: #800000;">',<span style="color: #008000;">#<span style="color: #008000;"> queue绑定到转发器上
queue=<span style="color: #000000;">queue_name)
<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;"> [*] Waiting for logs. To exit press CTRL+C<span style="color: #800000;">'<span style="color: #000000;">)
<span style="color: #0000ff;">def<span style="color: #000000;"> callback(ch,body):
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> [x] %r<span style="color: #800000;">" %<span style="color: #000000;"> body)
channel.basic_consume(callback,queue=<span style="color: #000000;">queue_name,<span style="color: #000000;">)
channel.start_consuming()
?2.direct模式
RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列,此时的关键字由参数routing_key指定。模式如下图:
发布者:
<span style="color: #000000;">
index=randint(0,3<span style="color: #000000;">)
log_level=[<span style="color: #800000;">'<span style="color: #800000;">info<span style="color: #800000;">',<span style="color: #800000;">'<span style="color: #800000;">wraning<span style="color: #800000;">',<span style="color: #800000;">'<span style="color: #800000;">error<span style="color: #800000;">',<span style="color: #800000;">'<span style="color: #800000;">nothing<span style="color: #800000;">'<span style="color: #000000;">]
message =<span style="color: #800000;">'<span style="color: #800000;">{}--->Hello World!<span style="color: #800000;">'<span style="color: #000000;">.format(log_level[index])
channel.basic_publish(exchange=<span style="color: #800000;">'<span style="color: #800000;">direct_logs<span style="color: #800000;">'<span style="color: #000000;">,routing_key=log_level[index],<span style="color: #008000;">#<span style="color: #008000;">发消息随机绑定一个关键字
body=<span style="color: #000000;">message)
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> [x] Sent %r:%r<span style="color: #800000;">" % (log_level[index],message))
订阅者:
queue_name =<span style="color: #000000;"> result.method.queue
<span style="color: #008000;">#<span style="color: #008000;"> 获取运行脚本所有的参数
<span style="color: #000000;">
channel.queue_bind(exchange
=<span style="color: #800000;">'<span style="color: #800000;">direct_logs<span style="color: #800000;">'<span style="color: #000000;">,routing_key=<span style="color: #800000;">'<span style="color: #800000;">info<span style="color: #800000;">')<span style="color: #008000;">#<span style="color: #008000;">只绑定了info关键字,接受只接受info关键字的消息<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;"> [*] Waiting for logs. To exit press CTRL+C<span style="color: #800000;">'<span style="color: #000000;">)
<span style="color: #0000ff;">def<span style="color: #000000;"> callback(ch,body):
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> [x] %r:%r<span style="color: #800000;">" %<span style="color: #000000;"> (method.routing_key,body))
channel.basic_consume(callback,<span style="color: #000000;">)
channel.start_consuming()
3.topic(主题)模式
topic相比于dirct而言,提供了更为详细的消息接受规则,可使用*、#等来匹配关键字来接受消息。
发往主题类型的转发器的消息不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。标识符可以是任何东西,但是一般都与消息的某些特性相关。一些合法的选择键的例子:"stock.usd.nyse","nyse.vmw","quick.orange.rabbit".你可以定义任何数量的标识符,上限为255个字节。绑定键和选择键的形式一样。主题类型的转发器背后的逻辑和直接类型的转发器很类似:一个附带特殊的选择键将会被转发到绑定键与之匹配的队列中。需要注意的是:关于绑定键有两种特殊的情况。*可以匹配一个标识符。#可以匹配0个或多个标识符。
例如:#.a会匹配a.a,aa.a,aaa.a等? ? ? ? ? *.a会匹配a.a,b.a,c.a等
?topic消费模式如下图:
demo:
发布者:
routing_key
= sys.argv[1] <span style="color: #0000ff;">if len(sys.argv) > 1 <span style="color: #0000ff;">else <span style="color: #800000;">'<span style="color: #800000;">anonymous.info<span style="color: #800000;">'<span style="color: #000000;">message = <span style="color: #800000;">' <span style="color: #800000;">'.join(sys.argv[2:]) <span style="color: #0000ff;">or <span style="color: #800000;">'<span style="color: #800000;">Hello World!<span style="color: #800000;">'<span style="color: #000000;">
channel.basic_publish(exchange=<span style="color: #800000;">'<span style="color: #800000;">topic_logs<span style="color: #800000;">'<span style="color: #000000;">,routing_key=<span style="color: #000000;">routing_key,body=<span style="color: #000000;">message)
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> [x] Sent %r:%r<span style="color: #800000;">" %<span style="color: #000000;"> (routing_key,message))
connection.close()
订阅者:
queue_name =<span style="color: #000000;"> result.method.queue
binding_keys
= sys.argv[1<span style="color: #000000;">:]<span style="color: #0000ff;">if <span style="color: #0000ff;">not<span style="color: #000000;"> binding_keys:
sys.stderr.write(<span style="color: #800000;">"<span style="color: #800000;">Usage: %s [binding_key]...n<span style="color: #800000;">" %<span style="color: #000000;"> sys.argv[0])
sys.exit(1<span style="color: #000000;">) <span style="color: #0000ff;">for binding_key <span style="color: #0000ff;">in binding_keys:<span style="color: #008000;">#<span style="color: #008000;">循环绑定routing_key,如果绑定*.info,就接受以.info结尾的routing_key所发的消息。
channel.queue_bind(exchange=<span style="color: #800000;">'<span style="color: #800000;">topic_logs<span style="color: #800000;">'<span style="color: #000000;">,routing_key=<span style="color: #000000;">binding_key) <span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;"> [*] Waiting for logs. To exit press CTRL+C<span style="color: #800000;">'<span style="color: #000000;">)
<span style="color: #0000ff;">def<span style="color: #000000;"> callback(ch,body))
channel.basic_consume(callback,<span style="color: #000000;">)
channel.start_consuming()
<table style="height: 30px; background-color: #afeeee; width: 1266px; ; width: 1266px;" border="0">
<tr><td><span style="font-size: 16px;">五、rabbitmq应用场景(简单RPC)</td>
</tr></table>
RPC,即 Remote Procedure Call(远程过程调用),说得通俗一点就是:调用远程计算机上的服务,就像调用本地服务一样
。真正的RPC有更为标准的定义,这里我们可以使用rabbitmq来实现简单的RPC模型,其原理图如下:
上述图中,client和server对于rabbitmq来说都具有两个角色,即:即是生产者又是消费者。client端通过生产者角色发送命令,服务端此时充当消费者接受客户端的命令消息,当接受到消息以后又以生产者角色发送命令结果给客户端,此时客户端是消费者接受客户端的消息。
过程:
- 客户端 Client 设置消息的 routing key 为 Service 的队列 op_q,设置消息的 reply-to 属性为返回的 response 的目标队列 reponse_q,设置其 correlation_id 为以随机UUID,然后将消息发到 exchange。比如channel.basic_publish(exchange='',routing_key='op_q',properties=pika.BasicProperties(reply_to = reponse_q,correlation_id = self.corr_id),body=request)
- Exchange 将消息转发到 Service 的 op_q
- Service 收到该消息后进行处理,然后将response 发到 exchange,并设置消息的 routing_key 为原消息的 reply_to 属性,以及设置其 correlation_id 为原消息的 correlation_id?
- ch.basic_publish(exchange='',routing_key=props.reply_to,properties=pika.BasicProperties(correlation_id = props.correlation_id),body=str(response))Exchange 将消息转发到 reponse_q
- Client 逐一接受 response_q 中的消息,检查消息的 correlation_id 是否为等于它发出的消息的correlation_id,是的话表明该消息为它需要的response。
代码实现:
clinet:
<span style="color: #0000ff;">def <span style="color: #800080;">init<span style="color: #000000;">(self):
self.connection =<span style="color: #000000;"> pika.BlockingConnection(pika.ConnectionParameters(
host=<span style="color: #800000;">'<span style="color: #800000;">localhost<span style="color: #800000;">'<span style="color: #000000;">))
self.channel </span>=<span style="color: #000000;"> self.connection.channel()
result </span>= self.channel.queue_declare(exclusive=<span style="color: #000000;">True)
self.callback_queue </span>=<span style="color: #000000;"> result.method.queue
self.channel.basic_consume(self.on_response,no_ack</span>=<span style="color: #000000;">True,queue</span>=<span style="color: #000000;">self.callback_queue)
</span><span style="color: #0000ff;">def</span><span style="color: #000000;"> on_response(self,ch,props,body):
</span><span style="color: #0000ff;">if</span> self.corr_id ==<span style="color: #000000;"> props.correlation_id:
self.response </span>=<span style="color: #000000;"> body
</span><span style="color: #0000ff;">def</span><span style="color: #000000;"> call(self,n):
self.response </span>=<span style="color: #000000;"> None
self.corr_id </span>=<span style="color: #000000;"> str(uuid.uuid4())
self.channel.basic_publish(exchange</span>=<span style="color: #800000;">''</span><span style="color: #000000;">,routing_key</span>=<span style="color: #800000;">'</span><span style="color: #800000;">rpc_queue</span><span style="color: #800000;">'</span><span style="color: #000000;">,properties</span>=<span style="color: #000000;">pika.BasicProperties(
reply_to </span>=<span style="color: #000000;"> self.callback_queue,correlation_id </span>=<span style="color: #000000;"> self.corr_id,),body</span>=<span style="color: #000000;">str(n))
</span><span style="color: #0000ff;">while</span> self.response <span style="color: #0000ff;">is</span><span style="color: #000000;"> None:
self.connection.process_data_events()
</span><span style="color: #0000ff;">return</span><span style="color: #000000;"> int(self.response)
fibonacci_rpc =<span style="color: #000000;"> FibonacciRpcClient()
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> Requesting fib(30)<span style="color: #800000;">"<span style="color: #000000;">)
response = fibonacci_rpc.call(30<span style="color: #000000;">)
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> [.] Got %r<span style="color: #800000;">" % response)
server:
channel.queue_declare(queue
=<span style="color: #800000;">'<span style="color: #800000;">rpc_queue<span style="color: #800000;">'<span style="color: #000000;">) <span style="color: #0000ff;">def<span style="color: #000000;"> fib(n):<span style="color: #0000ff;">if n ==<span style="color: #000000;"> 0:
<span style="color: #0000ff;">return<span style="color: #000000;"> 0
<span style="color: #0000ff;">elif n == 1<span style="color: #000000;">:
<span style="color: #0000ff;">return 1
<span style="color: #0000ff;">else<span style="color: #000000;">:
<span style="color: #0000ff;">return fib(n-1) + fib(n-2<span style="color: #000000;">)
<span style="color: #0000ff;">def<span style="color: #000000;"> on_request(ch,body):
n =<span style="color: #000000;"> int(body)
</span><span style="color: #0000ff;">print</span>(<span style="color: #800000;">"</span><span style="color: #800000;"> [.] fib(%s)</span><span style="color: #800000;">"</span> %<span style="color: #000000;"> n)
response </span>=<span style="color: #000000;"> fib(n)
ch.basic_publish(exchange</span>=<span style="color: #800000;">''</span><span style="color: #000000;">,routing_key</span>=<span style="color: #000000;">props.reply_to,properties</span>=pika.BasicProperties(correlation_id =<span style="color: #000000;">
props.correlation_id),body</span>=<span style="color: #000000;">str(response))
ch.basic_ack(delivery_tag </span>=<span style="color: #000000;"> method.delivery_tag)
channel.basic_qos(prefetch_count=1<span style="color: #000000;">)
channel.basic_consume(on_request,queue=<span style="color: #800000;">'<span style="color: #800000;">rpc_queue<span style="color: #800000;">'<span style="color: #000000;">)
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;"> Awaiting RPC requests<span style="color: #800000;">"<span style="color: #000000;">)
channel.start_consuming()
(编辑:李大同)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!