加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

perl – AnyEvent :: RabbitMQ问题与封闭渠道有关

发布时间:2020-12-16 06:26:37 所属栏目:大数据 来源:网络整理
导读:我正在编写一个用于将消息发布到消息队列中的主程序( RabbitMQ).该程序使用Perl 5编写,使用 AnyEvent::RabbitMQ与RabbitMQ进行通信. 以下最小示例(针对我遇到的问题)将在第二个命令上失败,该命令通过相同的通道发送,并显示错误“Channel closed”. use stric
我正在编写一个用于将消息发布到消息队列中的主程序( RabbitMQ).该程序使用Perl 5编写,使用 AnyEvent::RabbitMQ与RabbitMQ进行通信.

以下最小示例(针对我遇到的问题)将在第二个命令上失败,该命令通过相同的通道发送,并显示错误“Channel closed”.

use strictures 2;

use AnyEvent::RabbitMQ;

main();

############################################################################
sub main {
  _log( debug => 'main' );
  my $condvar = AnyEvent->condvar;
  my $ar      = AnyEvent::RabbitMQ->new;
  $ar->load_xml_spec;
  _log( debug => 'Connecting to RabbitMQ...' );
  $ar->connect(
    host            => 'localhost',port            => 5672,user            => 'guest',pass            => 'guest',vhost           => '/',timeout         => 1,tls             => 0,on_success      => sub { _on_connect_success( $condvar,$ar,@_ ) },on_failure      => sub { _error( $condvar,'failure',on_read_failure => sub { _error( $condvar,'read_failure',on_return       => sub { _error( $condvar,'return',on_close        => sub { _error( $condvar,'close',);
  $condvar->recv;
  $ar->close;
  return;
}

############################################################################
sub _on_connect_success {
  my ( $condvar,$new_ar ) = @_;
  _log( debug => 'Connected to RabbitMQ.' );
  _open_channel( $condvar,$new_ar );
  return;
}

############################################################################
sub _open_channel {
  my ( $condvar,$ar ) = @_;
  _log( debug => 'Opening RabbitMQ channel...' );
  $ar->open_channel(
    on_success => sub { _on_open_channel_success( $condvar,);
  return;
}

############################################################################
sub _on_open_channel_success {
  my ( $condvar,$channel ) = @_;
  _log( debug => 'Opened RabbitMQ channel.' );
  _declare_queue( $condvar,$channel );
  return;
}

############################################################################
sub _declare_queue {
  my ( $condvar,$channel ) = @_;
  _log( debug => 'Declaring RabbitMQ queue...' );
  $channel->declare_queue(
    queue       => 'test',auto_delete => 1,passive     => 0,durable     => 0,exclusive   => 0,no_ack      => 1,ticket      => 0,on_success =>
      sub { _on_declare_queue_success( $condvar,$channel,);
  return;
}

############################################################################
sub _on_declare_queue_success {
  my ( $condvar,$channel ) = @_;
  _log( debug => 'Declared RabbitMQ queue.' );
  _bind_queue( $condvar,$channel );
  return;
}

############################################################################
sub _bind_queue {
  my ( $condvar,$channel ) = @_;
  _log( debug => 'Binding RabbitMQ queue...' );
  $channel->bind_queue(
    queue       => 'test',exchange    => '',routing_key => '',on_success => sub { _on_bind_queue_success( $condvar,);
  return;
}

############################################################################
sub _on_bind_queue_success {
  my ( $condvar,$channel ) = @_;
  _log( debug => 'Binded RabbitMQ queue.' );
  _log( info  => 'Master ready to publish messages.' );
  _publish_message( $condvar,'Hello,world!' );
  return;
}

############################################################################
sub _publish_message {
  my ( $condvar,$message ) = @_;
  _log( debug => "Publishing RabbitMQ message ($message)..." );
  $channel->publish(
    queue       => 'test',body        => $message,header      => {},mandatory   => 0,immediate   => 0,on_success =>
      sub { _on_publish_message_success( $condvar,on_ack          => sub { _error( $condvar,'ack',on_nack         => sub { _error( $condvar,'nack',);
  return;
}

############################################################################
sub _on_publish_message_success {
  my ( $condvar,$channel ) = @_;
  _log( debug => "Published RabbitMQ message." );
  sleep 1;
  _publish_message( $condvar,world! Again ' . time );
  return;
}

############################################################################
sub _error {
  my ( $condvar,$type,@error ) = @_;
  _log( error => sprintf '%s - %s',join ',',@error );
  $condvar->send( $condvar,@error );
  return;
}

############################################################################
sub _log {
  my ( $level,$message ) = @_;
  my @time = gmtime time;
  $time[5] += 1900;
  $time[4] += 1;
  my $time = sprintf '%04d-%02d-%02dT%02d:%02d:%02d+00:00',@time[ 5,4,3,2,1,0 ];
  my @caller0    = caller(0);
  my @caller1    = caller(1);
  my $subroutine = $caller1[3];
  $subroutine =~ s/^$caller0[0]:://;
  print STDERR "$time [$level] $message at $caller0[1] line $caller0[2] ($subroutine; from $caller1[1] line $caller1[2])n";
  return;
}

该计划应:

>连接到RabbitMQ
>打开RabbitMQ频道
>声明一个simpe队列(名为“test”)
>绑定到该队列(名为“test”)
>发布消息(“Hello,world!”)
>成功发布消息后,等待一秒钟并发布另一条消息

该程序(主程序)不应使用消息.还有其他程序可以完成这项工作.

最小的例子(见上文)将产生以下输出:

2015-08-12T13:02:07+00:00 [debug] main at minimal.pl line 9 (main; from minimal.pl line 5)
2015-08-12T13:02:07+00:00 [debug] Connecting to RabbitMQ... at minimal.pl line 13 (main; from minimal.pl line 5)
2015-08-12T13:02:07+00:00 [debug] Connected to RabbitMQ. at minimal.pl line 36 (_on_connect_success; from minimal.pl line 22)
2015-08-12T13:02:07+00:00 [debug] Opening RabbitMQ channel... at minimal.pl line 44 (_open_channel; from minimal.pl line 37)
2015-08-12T13:02:07+00:00 [debug] Opened RabbitMQ channel. at minimal.pl line 58 (_on_open_channel_success; from minimal.pl line 46)
2015-08-12T13:02:07+00:00 [debug] Declaring RabbitMQ queue... at minimal.pl line 66 (_declare_queue; from minimal.pl line 59)
2015-08-12T13:02:07+00:00 [debug] Declared RabbitMQ queue. at minimal.pl line 88 (_on_declare_queue_success; from minimal.pl line 76)
2015-08-12T13:02:07+00:00 [debug] Binding RabbitMQ queue... at minimal.pl line 96 (_bind_queue; from minimal.pl line 89)
2015-08-12T13:02:07+00:00 [error] failure - Channel closed at minimal.pl line 155 (_error; from minimal.pl line 102)
2015-08-12T13:02:07+00:00 [error] close - Net::AMQP::Frame::Method=HASH(0x38fe1c8) at minimal.pl line 155 (_error; from minimal.pl line 50)

为什么AnyEvent :: RabbitMQ或RabbitMQ本身关闭了频道(不是连接或我错过了什么)?

解决方法

如果您查看RabbitMQ服务器日志,您将看到如下内容:

{amqp_error,access_refused,”operation not permitted on the default exchange”,’queue.bind’}

显然,它不允许您在默认交换上绑定队列.所以你需要先声明并绑定自己的交换.

sub _declare_exchange {
  my ( $condvar,$channel ) = @_;
  _log( debug => 'Declaring RabbitMQ exchange...' );
  $channel->declare_exchange(
    exchange        => 'testest',type            => 'fanout',on_success =>
      sub { _on_declare_exchange_success( $condvar,);
  return;
}

############################################################################
sub _on_declare_exchange_success {
  my ( $condvar,$channel ) = @_;
  _log( debug => 'Declared RabbitMQ exchange.' );
  _bind_exchange( $condvar,$channel );
  return;
}

############################################################################
sub _bind_exchange {
  my ( $condvar,$channel ) = @_;
  _log( debug => 'Binding RabbitMQ exchange...' );
  $channel->bind_exchange(
    source      => 'testest',destination => 'testest',on_success => sub { _on_bind_exchange_success( $condvar,);
  return;
}

一旦你设置了这些子,请告诉你的程序使用这个自定义交换.

sub _on_open_channel_success {
  my ( $condvar,$channel ) = @_;
  _log( debug => 'Opened RabbitMQ channel.' );
  $channel->confirm;
  _declare_exchange( $condvar,$channel );
  return;
}

当您向队列发送消息时,必须使用$channel->确认以使RabbitMQ回答确认.如果你不这样做,成功处理程序永远不会被调用,因为没有成功的响应回来.

然后在_bind_queue中,您需要将交换添加到bind_queue()调用.

$channel->bind_queue(
    queue       => 'test',exchange    => 'testest',# <-- here
    routing_key => '',# ...
  );

使用publish()调用需要在_publish_message中完成相同的操作.在那里你也应该用实际处理确认的东西替换on_ack处理程序.我认为你打算这样做,但有一个复制/粘贴错误1.

$channel->publish(
  queue       => 'test',# <-- here
  routing_key => '',# ...
  on_ack          => sub { 
  _on_publish_message_success( $condvar,@_ );
  },);

还有一件事是,当您使用AnyEvent时,_on_publish_message_success中的睡眠调用不是一个好主意,因为这会停止整个程序.请改用AE::timer.

my $t; 
$t = AE::timer(1,sub {
  _publish_message( $condvar,world! Again ' . time );
  undef $t;
});

这是包含所有更改的完整代码.

use strictures 2;

use AnyEvent::RabbitMQ;

main();

############################################################################
sub main {
  _log( debug => 'main' );
  my $condvar = AnyEvent->condvar;
  my $ar      = AnyEvent::RabbitMQ->new;
  $ar->load_xml_spec;
  _log( debug => 'Connecting to RabbitMQ...' );
  $ar->connect(
    host            => 'localhost',vhost           => '/guest',$channel );
  return;
}

############################################################################
sub _declare_exchange {
  my ( $condvar,);
  return;
}

############################################################################
sub _on_bind_exchange_success {
  my ( $condvar,$channel ) = @_;
  _log( debug => 'Binded RabbitMQ exchange.' );
  _declare_queue( $condvar,$channel );
  return;
}


############################################################################
sub _declare_queue {
  my ( $condvar,on_ack          => sub { 
        _on_publish_message_success( $condvar,@_ );
#        _error( $condvar,@_ )    
    },$channel ) = @_;
  _log( debug => "Published RabbitMQ message." );
  my $t; $t=AE::timer(1,sub {
      _publish_message( $condvar,world! Again ' . time );
      undef $t;
  });
  return;
}

############################################################################
sub _error {
  my ( $condvar,0 ];
  my @caller0    = caller(0);
  my @caller1    = caller(1);
  my $subroutine = $caller1[3];
  $subroutine =~ s/^$caller0[0]:://;
  print STDERR "$time [$level] $message at $caller0[1] line $caller0[2] ($subroutine; from $caller1[1] line $caller1[2])n";
  return;
}

1)在某些地方你需要为同事买一杯啤酒:)

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读