加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

perl – 如何使用anyevent进行异步www-mechanize

发布时间:2020-12-15 23:33:59 所属栏目:大数据 来源:网络整理
导读:我一直在对这个主题进行大量的研究,虽然有一些问题与我有关,但我真的很难理解如何使用AnyEvent和www-mechanize正确地进行异步编程.我正在努力坚持机械化,因为它有一个干净的界面,并且内置了我期望的功能:(比如获取网站的所有图像等).如果没有可靠/好的方法来
我一直在对这个主题进行大量的研究,虽然有一些问题与我有关,但我真的很难理解如何使用AnyEvent和www-mechanize正确地进行异步编程.我正在努力坚持机械化,因为它有一个干净的界面,并且内置了我期望的功能:(比如获取网站的所有图像等).如果没有可靠/好的方法来做我想做的事情,那么我将开始关注AnyEvent :: HTTP,但我想我会先朝那个方向询问.

我是AnyEvent编程的新手,但之前用回调做了大量的perl和javascript / jquery异步调用.这对我来说很有意义,但它并没有用AnyEvent Mech点击我.

这是我正在处理的代码,它从上游队列中提取URL.给出URL,我想得到一个说明拉入页面上的所有图像,然后异步.抓住所有图像.

所以伪代码看起来像这样:

>从队列中抓取网址
>获取页面
>获取所有img url链接
>在img urls上执行许多异步调用(例如在后端存储imgs)

我读过,我不能(在研究错误之后)在AnyEvent回调中阻塞.如何构建我的程序以进行异步调用而不阻塞?

AE事件只能在AE感知功能阻止时处理,因此我使用的是LWP::Protocol::AnyEvent::http.它使用AnyEvent :: HTTP替换LWP(Net:HTTP)的正常HTTP后端,这是AE感知的.

工作人员创建如下:

my Worker->new(upstream_job_url => "tcp://127.0.0.1:5555',run_on_create => 1);

异步部分是sub _recv_msg,它调用_proc_msg.

我已经有一个AnyEvent循环,根据ZeroMQ perl绑定文档观察ZeroMQ套接字…

任何帮助非常感谢!

码:

package Worker;

use 5.12.0;

use Moose;
use AnyEvent;
use LWP::Protocol::AnyEvent::http;

use ZMQ::LibZMQ3;
use ZMQ::Constants qw/ZMQ_PUSH ZMQ_PULL ZMQ_POLLIN ZMQ_FD/;

use JSON;
use WWW::Mechanize;
use Carp;
use Coro;


has 'max_children' => (
    is => 'rw',isa => 'Int',required => 1,default => sub { 0 }
);

has 'upstream_job_url' => (
    is => 'rw',isa => 'URI',);

has ['uri','sink_url'] => (
    is => 'rw',required => 0,);

has 'run_on_create' => (
    is => 'rw',isa => 'Bool',default => sub { 1 }
);

has '_receiver' => (
    is => 'rw',isa => 'ZMQ::LibZMQ3::Socket',required => 0
);

sub BUILD {
    my $self = shift;
    $self->start if $self->run_on_create;
}

sub start
{
    my $self = shift;
    $self->_init_zmq();

    my $fh = zmq_getsockopt( $self->_receiver,ZMQ_FD );
    my $w; $w = AnyEvent->io( fh => $fh,poll => "r",cb => sub { $self->_recv_msg } );
    AnyEvent->condvar->recv;
}

sub _init_zmq
{   
    my $self = shift;
    my $c = zmq_init() or die "zmq_init: $!n";
    my $recv = zmq_socket($c,ZMQ_PULL) or die "zmq_socket: $!n";
    if( zmq_connect($recv,$self->upstream_job_url) != 0 ) {
        croak "zmq_connect: $!n";
    }
    $self->_receiver($recv);
}

sub _recv_msg
{
    my $self = shift;
    while(my $message = zmq_msg_data(zmq_recvmsg($self->_receiver)) ) {
        my $msg = JSON::from_json($message,{utf8 => 1});
        $self->uri(URI->new($msg->{url}));
        $self->_proc_msg;
    }
}

sub _proc_msg
{
    my $self = shift;
    my $c = async { 
        my $ua = WWW::Mechanize->new;
        $ua->protocols_allowed(['http']); 
        print "$$processing " . $self->uri->as_string . "... ";
        $ua->get($self->uri->as_string);
        if ($ua->success()) {
            say $ua->status . " OK";
        } else { 
            say $ua->status . " NOT OK";
        }
    }; 
    $c->join;
}

1;

正如你所看到的,我在_proc_msg中尝试Coro,我尝试过只做mech调用但是出错了

AnyEvent::CondVar: recursive blocking wait attempted at lib/Worker.pm line 91.

因为$mech在回调中仍然受阻.我不确定如何在我的回调中正确进行机械调用.

在ikegami的要求下,我添加了发送网址的驱动程序.出于测试目的,我只需阅读RSS源,并将链接发送给工作人员以尝试处理.我很好奇有关回调的任何事件的基本结构,但我很高兴只是为了获得该程序的帮助.这是驱动程序代码:

#!/usr/local/bin/perl

use strict;
use warnings;
use v5.12.0;

use lib './lib';

use Config::General;
use Getopt::Long;
use Carp;
use AnyEvent;
use AnyEvent::Feed;
use Parallel::ForkManager;
use ZMQ::LibZMQ3;
use ZMQ::Constants qw(ZMQ_PUSH ZMQ_PULL);
use Worker;

# Debug
use Data::Dumper;
$Data::Dumper::Deparse = 1;

my $config_file = "feeds.cfg";

GetOptions(
    "--config|c" => $config_file,"--help|h" => sub { usage(); exit(0); }
);

sub usage() 
{
    say "TODO";
}

$SIG{INT} = sub { croak; }; $SIG{TERM} = sub { croak; };
$SIG{CHLD} = 'IGNORE';

my $conf = Config::General->new($config_file) or croak "Couldn't open config file '$config_file' $!n";

my %config = $conf->getall();
my @readers = ();
my @feeds = load_feeds(%config);

my $mgr = Parallel::ForkManager->new( $config{'max_download_children'} ) or croak "Can't create fork manager: $!n";
my $context = zmq_init() or croak "zmq_init: $!n";
my $sender = zmq_socket($context,ZMQ_PUSH) or die "zmq_socket: $!n";

foreach my $feed_cfg (@feeds) {
    my $reader = AnyEvent::Feed->new(url => delete $feed_cfg->{url},%$feed_cfg);
    push(@readers,$reader); # save,don't go out of scope
}

# Fork Downloader children. These processes will look for incoming data
# in the img_queue and download the images,storing them in nosql
for ( 1 .. $config{'max_download_children'} ) {
    my $pid = $mgr->start; 
    if (!$pid) {
        # Child
        my $worker = Worker->new({
            upstream_job_url => URI->new('tcp://127.0.0.1:5555')
        });
        $mgr->finish;
        say "$$exiting.";
        exit(0);
    } else {
        # Parent
        say "[forked child $pid] my pid is $$";
    }
}

if (zmq_bind($sender,'tcp://127.0.0.1:5555') < 0) {
    croak "zmq_bind: $!n";
}

# Event loop 
AnyEvent->condvar->recv;

sub load_feeds
{
    my $conf = shift;
    my @feeds = ();
    foreach my $feed ( keys %{$conf->{'feeds'}} ) {
        my $feed_ref = $conf->{'feeds'};
        $feed_ref->{$feed}->{'name'} = $feed;
        $feed_ref->{$feed}->{'on_fetch'} = &;fetch_feed_cb;
        push(@feeds,$feed_ref->{$feed});   
    }
    return @feeds;
}

sub fetch_feed_cb
{
    my ($feed_reader,$new_entries,$feed,$error) = @_;
    if (defined $error) {
        say "Error fetching feed: $error";
        return;
    }
    say "$$checking for new feeds";
    for (@$new_entries) {
        my ($hash,$entry) = @$_;
        say "$$sending " . $entry->link;
        zmq_send($sender,JSON::to_json( { url => $entry->link },{ pretty => 1,utf8 => 1 } ));
    }
}

这是一个示例运行:

[forked child 40790] my pid is 40789
[forked child 40791] my pid is 40789
[forked child 40792] my pid is 40789
40789 checking for new feeds
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/f5nNM3zYBt0/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/Ay9V5pIpFBA/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/5XCVvt75ppU/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/mWprjBD3UhM/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/NngMs9pCQew/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/wiUsvafLGFU/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/QMp6gnZpFcA/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/kqUb_rpU5dE/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/tHItKqKhGXg/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/7LleQbVnPmE/
FATAL: $Coro::IDLE blocked itself - did you try to block inside an event loop callback? Caught at lib/Worker.pm line 99.
FATAL: $Coro::IDLE blocked itself - did you try to block inside an event loop callback? Caught at lib/Worker.pm line 99.
FATAL: $Coro::IDLE blocked itself - did you try to block inside an event loop callback? Caught at lib/Worker.pm line 99.
40791 processing http://feedproxy.google.com/~r/PerlNews/~3/Ay9V5pIpFBA/... 
40790 processing http://feedproxy.google.com/~r/PerlNews/~3/f5nNM3zYBt0/... 
40792 processing http://feedproxy.google.com/~r/PerlNews/~3/5XCVvt75ppU/... ^C at /usr/local/perls/perl5162/lib/perl5/site_perl/darwin-thread-multi-2level/AnyEvent/Loop.pm line 231.

如果我没有明确做’使用Coro’;在Worker.pm中,coro FATAL错误不显示.我不知道async在没有进一步的运行时错误之前是如何工作的.

示例配置文件(feeds.cfg):

max_download_children = 3
<feeds>
    <feed1>
        url="http://feeds.feedburner.com/PerlNews?format=xml"   
        interval=60
    </feed1>
</feeds>

所以今天我花了一点时间.所以我做出$c->加入的方式的错误.我不应该这样做,因为我无法阻止回调. Coro将安排异步块,它将在完成后完成.我唯一需要确保做的就是以某种方式知道所有的asyncs何时完成,我想我能搞清楚.现在棘手的部分是试图找出这个小秘密:

sub _recv_msg
{
    my $self = shift;
    while(my $message = zmq_msg_data(zmq_recvmsg($self->_receiver)) ) {
        my $msg = JSON::from_json($message,{utf8 => 1});
        $self->uri(URI->new($msg->{url}));
        $self->_proc_msg;
    }
}

这个while循环导致_proc_msg中的异步{}线程不运行.删除while循环,只需处理第一个msg和coros运行.保持while循环,它们永远不会运行.对我来说很奇怪,还没弄明白为什么.

进一步更新:

zmq_msg_recv被阻止了.此外,父级中的zmq_send可以阻止.必须使用ZMQ_NOBLOCK.
我将worker和main完全拆分为单独的程序.

解决方法

您可以使用 https://metacpan.org/pod/AnyEvent::HTTP::LWP::UserAgent进行异步调用.

use AnyEvent::HTTP::LWP::UserAgent;
  use AnyEvent;

  my $ua = AnyEvent::HTTP::LWP::UserAgent->new;
  my @urls = (...);
  my $cv = AE::cv;
  $cv->begin;
  foreach my $url (@urls) {
      $cv->begin;
      $ua->get_async($url)->cb(sub {
          my $r = shift->recv;
          print "url $url,content " . $r->content . "n";
          $cv->end;
      });
  }
  $cv->end;
  $cv->recv;

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读