ruby – 使用EventMachine和RabbitMQ的RPC
我一直在开始使用AMQP gem doc中提供的RabbitMQ
RPC sample code,尝试编写执行同步远程调用的非常简单的代码:
require "amqp" module RPC class Base include EM::Deferrable def rabbit(rabbit_callback) rabbit_loop = Proc.new { AMQP.connect do |connection| AMQP::Channel.new(connection) do |channel| channel.queue("rpc.queue",:exclusive => false,:durable => true) do |requests_queue| self.callback(&rabbit_callback) self.succeed(connection,channel,requests_queue) end # requests_queue end # AMQP.channel end # AMQP.connect Signal.trap("INT") { connection.close { EM.stop } } Signal.trap("TERM") { connection.close { EM.stop } } } if !EM.reactor_running? EM.run do rabbit_loop.call end else rabbit_loop.call end end end class Server < Base def run server_loop = Proc.new do |connection,requests_queue| consumer = AMQP::Consumer.new(channel,requests_queue).consume consumer.on_delivery do |metadata,payload| puts "[requests] Got a request #{metadata.message_id}. Sending a reply to #{metadata.reply_to}..." channel.default_exchange.publish(Time.now.to_s,:routing_key => metadata.reply_to,:correlation_id => metadata.message_id,:mandatory => true) metadata.ack end end rabbit(server_loop) end end class Client < Base def sync_push(request) result = nil sync_request = Proc.new do |connection,requests_queue| message_id = Kernel.rand(10101010).to_s response_queue = channel.queue("",:exclusive => true,:auto_delete => true) response_queue.subscribe do |headers,payload| if headers.correlation_id == message_id result = payload connection.close { EM.stop } end end EM.add_timer(0.1) do puts "[request] Sending a request...#{request} with id #{message_id}" channel.default_exchange.publish(request,:routing_key => requests_queue.name,:reply_to => response_queue.name,:message_id => message_id) end end rabbit(sync_request) result end end end 这个想法很简单:我想让一个消息队列始终准备好(这是由rabbit方法处理的).每当客户端想要发送请求时,它首先创建响应的临时队列以及消息ID;然后,它将请求发布到主消息队列,并等待临时队列中具有相同消息ID的响应,以便知道该特定请求的答案何时准备就绪.我想,message_id在某种程度上与临时队列是多余的(因为队列也应该是唯一的). 我现在使用此客户端/服务器代码运行虚拟脚本 # server session >> server = RPC::Server.new => #<RPC::Server:0x007faaa23bb5b0> >> server.run Updating client properties [requests] Got a request 3315740. Sending a reply to amq.gen-QCv8nP2dI5Qd6bg2Q1Xhk0... 和 # client session >> client = RPC::Client.new => #<RPC::Client:0x007ffb6be6aed8> >> client.sync_push "test 1" Updating client properties [request] Sending a request...test 1 with id 3315740 => "2012-11-02 21:58:45 +0100" >> client.sync_push "test 2" AMQ::Client::ConnectionClosedError: Trying to send frame through a closed connection. Frame is #<AMQ::Protocol::MethodFrame:0x007ffb6b9c83d0 @payload="x002x00nx00x00x00fx00x00x00x00",@channel=1> 有两点我真的不明白: >与EventMachine相关:在客户端代码中,如果我希望我的消息实际发布,为什么我必须调用EM.add_timer?为什么使用EM.next_tick不起作用?我的理解是,在这里调用发布时,“一切”应该是“准备好”. 遗憾的是,很少有在线处理EM / AMQP的代码,所以任何帮助都将深表感谢! 解决方法
挖掘文档,我终于发现我实际上需要
once_declared 回调以确保在客户端开始使用它时队列已准备就绪.
关于连接问题,似乎不知何故,使用EM :: Deferrable导致问题,因此(非常不令人满意)解决方案只包括不包括EM :: Deferrable. require "amqp" module RPC module Base def rabbit(rabbit_callback) rabbit_loop = Proc.new { AMQP.start do |connection| AMQP::Channel.new(connection) do |channel| channel.queue("rpc.queue",:durable => true) do |requests_queue| requests_queue.once_declared do rabbit_callback.call(connection,requests_queue) end end end end Signal.trap("INT") { AMQP.stop { EM.stop } } Signal.trap("TERM") { AMQP.stop { EM.stop } } } if !EM.reactor_running? @do_not_stop_reactor = false EM.run do rabbit_loop.call end else @do_not_stop_reactor = true rabbit_loop.call end end end class Server include Base def run server_loop = Proc.new do |connection,:mandatory => true) metadata.ack end end rabbit(server_loop) end end class Client include Base def sync_push(request) result = nil sync_request = Proc.new do |connection,payload| if headers.correlation_id == message_id result = payload AMQP.stop { EM.stop unless @do_not_stop_reactor } end end response_queue.once_declared do puts "[request] Sending a request...#{request} with id #{message_id}" channel.default_exchange.publish(request,:message_id => message_id) end end rabbit(sync_request) result end end end (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |