perl 多线程,实时监控线程数,支持max thread
发布时间:2020-12-15 21:11:15 所属栏目:大数据 来源:网络整理
导读:今天PHP站长网 52php.cn把收集自互联网的代码分享给大家,仅供参考。 #!/usr/bin/perl -w#Description:rerun eod job group by system#Auther:Suzm#Date :2015-06-23use DBI;use Thread;use strict;push( @INC,$ENV{TPMS_
以下代码由PHP站长网 52php.cn收集自互联网 现在PHP站长网小编把它分享给大家,仅供参考 #!/usr/bin/perl -w #Description:rerun eod job group by system #Auther:Suzm #Date :2015-06-23 use DBI; use Thread; use strict; push( @INC,$ENV{TPMS_EOD_PERL_LIB} ); require public_pg; my %dbc_info; my $maxnum = 3; my %threads; my $tx_date; my $path_log = $ENV{TPMS_EOD_LOGPATH}; my $system = get_config('TPMS_SYSTEM_ID'); unless ( defined( $system ) ) { $system = "error"; } #my $system = undef; my $log_name = uc( CITIC::getscript_name($0) . "_" .$system); my $log_file = CITIC::create_logfile( $log_name,$path_log ); my $TPMS_EOD_SID = $ENV{TPMS_EOD_SID}; my $my_sysenv = uc( $ENV{MY_SYSENV} ); select $log_file; $| = 1; #get eod job info,return hash sub getEodJob { my ( $sysid,$dbh ) = @_; my %jobinfo; #my $str = "select * from tpms_eod.T_TPMS_EODTABLE where system='${sysid}'"; my $str = <<EOF; select a.* from tpms_eod.t_tpms_eodtable a inner join tpms_eod.Tpms_Rollback_Bizdate b on a.TX_DATE != b.CURR_BIZ_DATE or (a.TX_DATE = b.CURR_BIZ_DATE and a.FLAG=1) where a.SYSTEM='${sysid}' EOF my $sth; eval { $sth = $dbh->prepare($str); $sth->execute(); }; if ([email?protected]) { CITIC::showtime(); print "An error occurred ([email?protected])n"; CITIC::showtime(); print $dbh->errstr . "n"; $dbh->disconnect(); return %jobinfo; } while ( my @row = $sth->fetchrow_array() ) { $jobinfo{ $row[1] }{"SYSTEM"} = $row[0]; $jobinfo{ $row[1] }{"JOB_SCRIPT"} = $row[2]; $jobinfo{ $row[1] }{"ARGV"} = $row[3]; $jobinfo{ $row[1] }{"START_TIME"} = $row[4]; $jobinfo{ $row[1] }{"END_TIME"} = $row[5]; $jobinfo{ $row[1] }{"TX_DATE"} = $row[6]; $jobinfo{ $row[1] }{"FLAG"} = $row[7]; } return %jobinfo; } #获取配置参数 sub get_config{ my($name) = @_; my $str; open(CONFIG,"$ENV{TPMS_EOD_ETC}/etc.profile"); while(<CONFIG>){ my($key,$value)=split('=',$_); if(uc($key) eq uc($name)){ $str = CITIC::strimstr($value); chomp $str; }else{ next; } } close(CONFIG); return uc($str); } #cmd sub cmd { my $ret; my ( $script,$argv,$tablename,$dbh ) = @_; my $cmd = " perl ${script} $argv"; my $rc = open( INFO,"$cmd 2>&1|" ); unless ($rc) { CITIC::showtime(); print "Can't invoke the Command!n"; return 1; } while (<INFO>) { print $_; #print STDOUT $_; if (eof) { unless (/b(complete|Succeeds)/) { CITIC::showtime(); print "*" x 6 . "$tablename作业执行失败" . "*" x 6 . "n"; $ret = 1; } else { CITIC::showtime(); print "*" x 6 . "$tablename作业执行成功" . "*" x 6 . "n"; $ret = 0; } } } close(INFO); return $ret; } #update job info sub upJobInfo { my ( $dbh,$flag,$final ) = @_; my $upstr; my $timestamp = showtime(); if ( $flag == 1 ) { $upstr = "START_TIME=to_timestamp('$timestamp','YYYY-MM-DD hh24:mi:ss:ff'),TX_DATE=to_date('$tx_date','YYYY-MM-DD')"; } else { $upstr = "END_TIME=to_timestamp('$timestamp',FLAG=$final"; } my $str = "update tpms_eod.T_TPMS_EODTABLE set $upstr where TABLE_NAME='$tablename' and SYSTEM='$system'"; my $sth; eval { $sth = $dbh->prepare($str); $sth->execute(); }; if ([email?protected]) { CITIC::showtime(); print "An error occurred ([email?protected])n"; CITIC::showtime(); print $dbh->errstr . "n"; $dbh->disconnect(); return 1; } return 0; } #get tx_date sub getTxdate { my ($dbh) = @_; my $txdate; my $str = "select to_char(CURR_BIZ_DATE,'YYYY-MM-DD') from tpms_eod.Tpms_Rollback_Bizdate"; my $sth; eval { $sth = $dbh->prepare($str); $sth->execute(); }; if ([email?protected]) { CITIC::showtime(); print "An error occurred ([email?protected])n"; CITIC::showtime(); print $dbh->errstr . "n"; $dbh->disconnect(); return 1; } my $table = $sth->fetchall_arrayref(); $txdate = $table->[0][0]; #$dbh->disconnect(); return $txdate; } #get timestamp sub showtime { my ( $sec,$min,$hour,$day,$mon,$year ) = localtime( time() ); my $current = ""; $sec = sprintf( "%02d",$sec ); $min = sprintf( "%02d",$min ); $hour = sprintf( "%02d",$hour ); $day = sprintf( "%02d",$day ); $mon = sprintf( "%02d",$mon + 1 ); $year += 1900; $current = " $year-$mon-$day" . " $hour" . ":$min" . ":$sec"; return ${current}; } #execute eod job sub executeJob { my ( $dbh,%hash ) = @_; my $i; my $num = scalar( keys %hash ); for my $jobname ( keys %hash ) { CITIC::showtime(); print "*" x 6 . "正在执行$jobname作业" . "*" x 6 . "n"; upJobInfo( $dbh,$jobname,1 ); $i++; $num--; eval { $threads{$jobname} = Thread->new( &;cmd,$hash{$jobname}{'JOB_SCRIPT'},$hash{$jobname}{'ARGV'},$jobname ); #->join(); }; if ([email?protected]) { CITIC::showtime(); print "An error occurred ([email?protected])n"; return 1; } if ( $i > $maxnum - 1 ) { print "进程达到最大数" . Thread->list() . "!!!n"; while (1) { for my $th ( Thread->list() ) { my $tid; if ( $th->done ) { eval { $tid = $th->join() }; foreach my $tb ( keys %threads ) { if ( $threads{$tb}->tid() == $th->tid() ) { if ($tid) { upJobInfo( $dbh,$tb,2,1 ); } else { upJobInfo( $dbh,0 ); } } } $i--; } else { next; } } if ( Thread->list() < $maxnum ) { print "进程小于3,开始新的进程n"; last; } else { print "sleep 10s,waiting for the job finishn"; sleep 10; } } } if ( $num == 0 ) { print "#" x 20 . "所有作业调度完成,等待进程结束!!!" . "#" x 20 . "n"; for my $th1 ( Thread->list() ) { my $tid1; eval { $tid1 = $th1->join() }; foreach my $tb1 ( keys %threads ) { if ( $threads{$tb1}->tid() == $th1->tid() ) { if ($tid1) { upJobInfo( $dbh,$tb1,1 ); } else { upJobInfo( $dbh,0 ); } } } } } } } #程序入口 sub main { #my ($system)[email?protected]_; my $ret = 0; my $dbh; %dbc_info = CITIC::get_dbc_info($TPMS_EOD_SID); unless (%dbc_info) { CITIC::showtime(); print "Failed to get database information!n"; } else { $dbh = CITIC::connect_db( $dbc_info{"ip"},$dbc_info{"port"},$dbc_info{"sid"},$dbc_info{"user"},$dbc_info{"pwd"} ); } unless ($dbh) { $ret = 1; } else { my %jobinfo = getEodJob( $system,$dbh ); if (%jobinfo) { if ( getTxdate($dbh) eq 1 ) { $ret = 1; } else { $tx_date = getTxdate($dbh); executeJob( $dbh,%jobinfo ); } } else { CITIC::showtime(); print "没有要执行的JOB,请确认后再执行!n"; $ret = 1; } # $ret = excute_sql( $dbh,@sql_queue ); } return $ret; } open( STDERR,">&STDOUT" ); #if ( $#ARGV < 0 ) { # print "Please input parameters,for example:n1.system id :CTSn"; # exit(1); #} #$system = uc( $ARGV[0] ); my $ret = main(); if ( $ret == 0 ) { print STDOUT "completen"; } else { print STDOUT "failn"; # CITIC::send_mail( # "[$my_sysenv] $log_name JOB FAILED",#"Hi all,n Job $log_name failed. Please see attached log file for details.nthanks!" # ); } exit($ret); END { CITIC::showtime(); print "return code is $retn"; CITIC::close_logfile($log_file); } 以上内容由PHP站长网【52php.cn】收集整理供大家参考研究 如果以上内容对您有帮助,欢迎收藏、点赞、推荐、分享。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |