perl – AnyEvent :: RabbitMQ问题与封闭渠道有关
我正在编写一个用于将消息发布到消息队列中的主程序(
RabbitMQ).该程序使用Perl 5编写,使用
AnyEvent::RabbitMQ与RabbitMQ进行通信.
以下最小示例(针对我遇到的问题)将在第二个命令上失败,该命令通过相同的通道发送,并显示错误“Channel closed”. use strictures 2; use AnyEvent::RabbitMQ; main(); ############################################################################ sub main { _log( debug => 'main' ); my $condvar = AnyEvent->condvar; my $ar = AnyEvent::RabbitMQ->new; $ar->load_xml_spec; _log( debug => 'Connecting to RabbitMQ...' ); $ar->connect( host => 'localhost',port => 5672,user => 'guest',pass => 'guest',vhost => '/',timeout => 1,tls => 0,on_success => sub { _on_connect_success( $condvar,$ar,@_ ) },on_failure => sub { _error( $condvar,'failure',on_read_failure => sub { _error( $condvar,'read_failure',on_return => sub { _error( $condvar,'return',on_close => sub { _error( $condvar,'close',); $condvar->recv; $ar->close; return; } ############################################################################ sub _on_connect_success { my ( $condvar,$new_ar ) = @_; _log( debug => 'Connected to RabbitMQ.' ); _open_channel( $condvar,$new_ar ); return; } ############################################################################ sub _open_channel { my ( $condvar,$ar ) = @_; _log( debug => 'Opening RabbitMQ channel...' ); $ar->open_channel( on_success => sub { _on_open_channel_success( $condvar,); return; } ############################################################################ sub _on_open_channel_success { my ( $condvar,$channel ) = @_; _log( debug => 'Opened RabbitMQ channel.' ); _declare_queue( $condvar,$channel ); return; } ############################################################################ sub _declare_queue { my ( $condvar,$channel ) = @_; _log( debug => 'Declaring RabbitMQ queue...' ); $channel->declare_queue( queue => 'test',auto_delete => 1,passive => 0,durable => 0,exclusive => 0,no_ack => 1,ticket => 0,on_success => sub { _on_declare_queue_success( $condvar,$channel,); return; } ############################################################################ sub _on_declare_queue_success { my ( $condvar,$channel ) = @_; _log( debug => 'Declared RabbitMQ queue.' ); _bind_queue( $condvar,$channel ); return; } ############################################################################ sub _bind_queue { my ( $condvar,$channel ) = @_; _log( debug => 'Binding RabbitMQ queue...' ); $channel->bind_queue( queue => 'test',exchange => '',routing_key => '',on_success => sub { _on_bind_queue_success( $condvar,); return; } ############################################################################ sub _on_bind_queue_success { my ( $condvar,$channel ) = @_; _log( debug => 'Binded RabbitMQ queue.' ); _log( info => 'Master ready to publish messages.' ); _publish_message( $condvar,'Hello,world!' ); return; } ############################################################################ sub _publish_message { my ( $condvar,$message ) = @_; _log( debug => "Publishing RabbitMQ message ($message)..." ); $channel->publish( queue => 'test',body => $message,header => {},mandatory => 0,immediate => 0,on_success => sub { _on_publish_message_success( $condvar,on_ack => sub { _error( $condvar,'ack',on_nack => sub { _error( $condvar,'nack',); return; } ############################################################################ sub _on_publish_message_success { my ( $condvar,$channel ) = @_; _log( debug => "Published RabbitMQ message." ); sleep 1; _publish_message( $condvar,world! Again ' . time ); return; } ############################################################################ sub _error { my ( $condvar,$type,@error ) = @_; _log( error => sprintf '%s - %s',join ',',@error ); $condvar->send( $condvar,@error ); return; } ############################################################################ sub _log { my ( $level,$message ) = @_; my @time = gmtime time; $time[5] += 1900; $time[4] += 1; my $time = sprintf '%04d-%02d-%02dT%02d:%02d:%02d+00:00',@time[ 5,4,3,2,1,0 ]; my @caller0 = caller(0); my @caller1 = caller(1); my $subroutine = $caller1[3]; $subroutine =~ s/^$caller0[0]:://; print STDERR "$time [$level] $message at $caller0[1] line $caller0[2] ($subroutine; from $caller1[1] line $caller1[2])n"; return; } 该计划应: >连接到RabbitMQ 该程序(主程序)不应使用消息.还有其他程序可以完成这项工作. 最小的例子(见上文)将产生以下输出: 2015-08-12T13:02:07+00:00 [debug] main at minimal.pl line 9 (main; from minimal.pl line 5) 2015-08-12T13:02:07+00:00 [debug] Connecting to RabbitMQ... at minimal.pl line 13 (main; from minimal.pl line 5) 2015-08-12T13:02:07+00:00 [debug] Connected to RabbitMQ. at minimal.pl line 36 (_on_connect_success; from minimal.pl line 22) 2015-08-12T13:02:07+00:00 [debug] Opening RabbitMQ channel... at minimal.pl line 44 (_open_channel; from minimal.pl line 37) 2015-08-12T13:02:07+00:00 [debug] Opened RabbitMQ channel. at minimal.pl line 58 (_on_open_channel_success; from minimal.pl line 46) 2015-08-12T13:02:07+00:00 [debug] Declaring RabbitMQ queue... at minimal.pl line 66 (_declare_queue; from minimal.pl line 59) 2015-08-12T13:02:07+00:00 [debug] Declared RabbitMQ queue. at minimal.pl line 88 (_on_declare_queue_success; from minimal.pl line 76) 2015-08-12T13:02:07+00:00 [debug] Binding RabbitMQ queue... at minimal.pl line 96 (_bind_queue; from minimal.pl line 89) 2015-08-12T13:02:07+00:00 [error] failure - Channel closed at minimal.pl line 155 (_error; from minimal.pl line 102) 2015-08-12T13:02:07+00:00 [error] close - Net::AMQP::Frame::Method=HASH(0x38fe1c8) at minimal.pl line 155 (_error; from minimal.pl line 50) 为什么AnyEvent :: RabbitMQ或RabbitMQ本身关闭了频道(不是连接或我错过了什么)? 解决方法
如果您查看RabbitMQ服务器日志,您将看到如下内容:
显然,它不允许您在默认交换上绑定队列.所以你需要先声明并绑定自己的交换. sub _declare_exchange { my ( $condvar,$channel ) = @_; _log( debug => 'Declaring RabbitMQ exchange...' ); $channel->declare_exchange( exchange => 'testest',type => 'fanout',on_success => sub { _on_declare_exchange_success( $condvar,); return; } ############################################################################ sub _on_declare_exchange_success { my ( $condvar,$channel ) = @_; _log( debug => 'Declared RabbitMQ exchange.' ); _bind_exchange( $condvar,$channel ); return; } ############################################################################ sub _bind_exchange { my ( $condvar,$channel ) = @_; _log( debug => 'Binding RabbitMQ exchange...' ); $channel->bind_exchange( source => 'testest',destination => 'testest',on_success => sub { _on_bind_exchange_success( $condvar,); return; } 一旦你设置了这些子,请告诉你的程序使用这个自定义交换. sub _on_open_channel_success { my ( $condvar,$channel ) = @_; _log( debug => 'Opened RabbitMQ channel.' ); $channel->confirm; _declare_exchange( $condvar,$channel ); return; } 当您向队列发送消息时,必须使用$channel->确认以使RabbitMQ回答确认.如果你不这样做,成功处理程序永远不会被调用,因为没有成功的响应回来. 然后在_bind_queue中,您需要将交换添加到bind_queue()调用. $channel->bind_queue( queue => 'test',exchange => 'testest',# <-- here routing_key => '',# ... ); 使用publish()调用需要在_publish_message中完成相同的操作.在那里你也应该用实际处理确认的东西替换on_ack处理程序.我认为你打算这样做,但有一个复制/粘贴错误1. $channel->publish( queue => 'test',# <-- here routing_key => '',# ... on_ack => sub { _on_publish_message_success( $condvar,@_ ); },); 还有一件事是,当您使用AnyEvent时,_on_publish_message_success中的睡眠调用不是一个好主意,因为这会停止整个程序.请改用 my $t; $t = AE::timer(1,sub { _publish_message( $condvar,world! Again ' . time ); undef $t; }); 这是包含所有更改的完整代码. use strictures 2; use AnyEvent::RabbitMQ; main(); ############################################################################ sub main { _log( debug => 'main' ); my $condvar = AnyEvent->condvar; my $ar = AnyEvent::RabbitMQ->new; $ar->load_xml_spec; _log( debug => 'Connecting to RabbitMQ...' ); $ar->connect( host => 'localhost',vhost => '/guest',$channel ); return; } ############################################################################ sub _declare_exchange { my ( $condvar,); return; } ############################################################################ sub _on_bind_exchange_success { my ( $condvar,$channel ) = @_; _log( debug => 'Binded RabbitMQ exchange.' ); _declare_queue( $condvar,$channel ); return; } ############################################################################ sub _declare_queue { my ( $condvar,on_ack => sub { _on_publish_message_success( $condvar,@_ ); # _error( $condvar,@_ ) },$channel ) = @_; _log( debug => "Published RabbitMQ message." ); my $t; $t=AE::timer(1,sub { _publish_message( $condvar,world! Again ' . time ); undef $t; }); return; } ############################################################################ sub _error { my ( $condvar,0 ]; my @caller0 = caller(0); my @caller1 = caller(1); my $subroutine = $caller1[3]; $subroutine =~ s/^$caller0[0]:://; print STDERR "$time [$level] $message at $caller0[1] line $caller0[2] ($subroutine; from $caller1[1] line $caller1[2])n"; return; } 1)在某些地方你需要为同事买一杯啤酒:) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |