Perl:读取fifos非阻塞
发布时间:2020-12-16 06:14:01 所属栏目:大数据 来源:网络整理
导读:我对 https://superuser.com/questions/482953/read-non-blocking-from-multiple-fifos-in-parallel?answertab=oldest#tab-top的原始解决方案将数据副本保存在磁盘上. 我现在已经制作了第二个版本来缓冲内存中的一行. 它工作但它需要所有fifos在它开始之前连
我对
https://superuser.com/questions/482953/read-non-blocking-from-multiple-fifos-in-parallel?answertab=oldest#tab-top的原始解决方案将数据副本保存在磁盘上.
我现在已经制作了第二个版本来缓冲内存中的一行. 它工作但它需要所有fifos在它开始之前连接.这有效: window1$mkfifo {1..100} window1$parcat {1..100} | pv >/dev/null window2$parallel -j0 'cat bigfile > ' ::: * 这不会给出任何输出(因为100没有连接): window1$mkfifo {1..100} window1$parcat {1..100} | pv >/dev/null window2$parallel -j0 'cat bigfile > ' ::: {1..99} 我尝试使用open’<'.这解决了上述问题,但现在它并没有停止在EOF. 我怎么做? 最小版本(不支持大行而不退避等待): #!/usr/bin/perl use Symbol qw(gensym); use IPC::Open3; use POSIX qw(:errno_h); use Fcntl qw(:DEFAULT :flock); for (@ARGV) { open($fh{$_},"<",$_) || die; # set fh non blocking($fh{$_}); my $flags; fcntl($fh{$_},&F_GETFL,$flags) || die $!; # Get the current flags on the filehandle $flags |= &O_NONBLOCK; # Add non-blocking to the flags fcntl($fh{$_},&F_SETFL,$flags) || die $!; # Set the flags on the filehandle } while(keys %fh) { for(keys %fh) { my($string,$something_read) = non_blocking_read($_); print $string; } # Sleep 1 ms select(undef,undef,1/1000); } { my %buffer; sub non_blocking_read { my $file = shift; my $in = $fh{$file}; my $rv = sysread($in,substr($buffer{$file},length $buffer{$file}),327680); if (!$rv) { if($! == EAGAIN) { # Would block: Nothing read return(undef,undef); } else { # This file is done close $in; delete $fh{$file}; my $buf = $buffer{$file}; delete $buffer{$file}; return ($buf,1); } } # Find n for full line my $i = (rindex($buffer{$file},"n")+1); if($i) { # Return full line # Remove full line from $buffer return(substr($buffer{$file},$i),1,$i) = ""); } else { # Something read,but not a full line return("",1); } } } 完整版:重要的代码在前40行:其余的是经过良好测试的代码. #!/usr/bin/perl use Symbol qw(gensym); use IPC::Open3; for (@ARGV) { open($fh{$_},$_) || die; set_fh_non_blocking($fh{$_}); } $ms = 1; while(keys %fh) { for(keys %fh) { my($string,$something_read) = non_blocking_read($_); if($something_read) { $ms = 0.1; print $string; } } $ms = exp_usleep($ms); } { my %buffer; my $ms; sub non_blocking_read { use POSIX qw(:errno_h); my $file = shift; my $in = $fh{$file}; my $rv = read($in,327680); if (!$rv) { if($! == EAGAIN) { # Would block: Nothing read return(undef,undef); } else { # This file is done close $in; delete $fh{$file}; my $buf = $buffer{$file}; delete $buffer{$file}; return ($buf,1); } } #### Well-tested code below # Find n or r for full line my $i = (::rindex64($buffer{$file},"n")+1) || (::rindex64($buffer{$file},"r")+1); if($i) { # Return full line # Remove full line from $buffer return(substr($buffer{$file},$i) = ""); } else { # Something read,but not a full line return("",1); } } } sub rindex64 { # Do rindex on strings > 2GB. # rindex in Perl < v5.22 does not work for > 2GB # Input: # as rindex except STR which must be passed as a reference # Output: # as rindex my $ref = shift; my $match = shift; my $pos = shift; my $block_size = 2**31-1; my $strlen = length($$ref); # Default: search from end $pos = defined $pos ? $pos : $strlen; # No point in doing extra work if we don't need to. if($strlen < $block_size) { return rindex($$ref,$match,$pos); } my $matchlen = length($match); my $ret; my $offset = $pos - $block_size + $matchlen; if($offset < 0) { # The offset is less than a $block_size # Set the $offset to 0 and # Adjust block_size accordingly $block_size = $block_size + $offset; $offset = 0; } while($offset >= 0) { $ret = rindex( substr($$ref,$offset,$block_size),$match); if($ret != -1) { return $ret + $offset; } $offset -= ($block_size - $matchlen - 1); } return -1; } sub exp_usleep { # Sleep this many milliseconds. # Input: # $ms = milliseconds to sleep # Returns: # $ms + 10% my $ms = shift; select(undef,$ms/1000); return (($ms < 1000) ? ($ms * 1.1) : ($ms)); } sub set_fh_non_blocking { # Set filehandle as non-blocking # Inputs: # $fh = filehandle to be blocking # Returns: # N/A my $fh = shift; $Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;"; my $flags; fcntl($fh,$flags) || die $!; # Get the current flags on the filehandle $flags |= &O_NONBLOCK; # Add non-blocking to the flags fcntl($fh,$flags) || die $!; # Set the flags on the filehandle } 解决方法
此解决方案打开一个伪作者,一旦收到任何数据就会关闭.它做了正确的事情,除非输入为空时它不会结束:
mkfifo {1..100} parcat {1..100} & parallel -j2 echo works '>' {} ::: {1..100} parcat {1..100} & # Fails (parcat does not exit) parallel -j2 cat /dev/null '>' {} ::: {1..100} 码: #!/usr/bin/perl use Symbol qw(gensym); use IPC::Open3; use POSIX qw(:errno_h); use IO::Select; use strict; my $s = IO::Select->new(); my %fhr; my %fhw; for (@ARGV) { # Open the file with a fake writer that will never write open(my $fhw,"+<",$_) || die; # Open the file for real open(my $fhr,$_) || die; set_fh_non_blocking($fhr); $s->add($fhr); $fhr{$fhr}++; $fhw{$fhr}=$fhw; } my %buffer; while(keys %fhr) { for my $file ($s->can_read(undef)) { my $rv = sysread($file,327680); if (!$rv) { if($! == EAGAIN) { # Would block: Nothing read next; } else { # This file is done $s->remove($file); delete $fhr{$file}; print $buffer{$file}; delete $buffer{$file}; # Closing the $file causes it to block # close $file; next; } } if($fhw{$file}) { # We have received data from $file: # Close the fake writer close $fhw{$file}; delete $fhw{$file}; } # Find n or r for full line my $i = (::rindex64($buffer{$file},"n")+1) || (::rindex64($buffer{$file},"r")+1); if($i) { # Print full line # Remove full line from $buffer print substr($buffer{$file},$i); substr($buffer{$file},$i) = ""; next; } else { # Something read,but not a full line next; } } } sub rindex64 { # Do rindex on strings > 2GB. # rindex in Perl < v5.22 does not work for > 2GB # Input: # as rindex except STR which must be passed as a reference # Output: # as rindex my $ref = shift; my $match = shift; my $pos = shift; my $block_size = 2**31-1; my $strlen = length($$ref); # Default: search from end $pos = defined $pos ? $pos : $strlen; # No point in doing extra work if we don't need to. if($strlen < $block_size) { return rindex($$ref,$pos); } my $matchlen = length($match); my $ret; my $offset = $pos - $block_size + $matchlen; if($offset < 0) { # The offset is less than a $block_size # Set the $offset to 0 and # Adjust block_size accordingly $block_size = $block_size + $offset; $offset = 0; } while($offset >= 0) { $ret = rindex( substr($$ref,$match); if($ret != -1) { return $ret + $offset; } $offset -= ($block_size - $matchlen - 1); } return -1; } sub set_fh_non_blocking { # Set filehandle as non-blocking # Inputs: # $fh = filehandle to be blocking # Returns: # N/A my $fh = shift; $Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;"; my $flags; fcntl($fh,$flags) || die $!; # Set the flags on the filehandle } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |