加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

perl 多线程,实时监控线程数,支持max thread

发布时间:2020-12-15 21:11:15 所属栏目:大数据 来源:网络整理
导读:今天PHP站长网 52php.cn把收集自互联网的代码分享给大家,仅供参考。 #!/usr/bin/perl -w#Description:rerun eod job group by system#Auther:Suzm#Date :2015-06-23use DBI;use Thread;use strict;push( @INC,$ENV{TPMS_

以下代码由PHP站长网 52php.cn收集自互联网

现在PHP站长网小编把它分享给大家,仅供参考

#!/usr/bin/perl -w
#Description:rerun eod job group by system
#Auther:Suzm
#Date  :2015-06-23

use DBI;
use Thread;
use strict;

push( @INC,$ENV{TPMS_EOD_PERL_LIB} );
require public_pg;

my %dbc_info;
my $maxnum = 3;
my %threads;
my $tx_date;
my $path_log = $ENV{TPMS_EOD_LOGPATH};

my $system = get_config('TPMS_SYSTEM_ID');
unless ( defined( $system ) ) {
	$system = "error";
}

#my $system = undef;

my $log_name =
  uc( CITIC::getscript_name($0) . "_" .$system);
my $log_file     = CITIC::create_logfile( $log_name,$path_log );
my $TPMS_EOD_SID = $ENV{TPMS_EOD_SID};
my $my_sysenv    = uc( $ENV{MY_SYSENV} );

select $log_file;
$| = 1;

#get eod job info,return hash
sub getEodJob {
	my ( $sysid,$dbh ) = @_;
	my %jobinfo;

	#my $str = "select * from tpms_eod.T_TPMS_EODTABLE where system='${sysid}'";
	my $str = <<EOF;
select a.* from tpms_eod.t_tpms_eodtable a
inner join tpms_eod.Tpms_Rollback_Bizdate b
on a.TX_DATE != b.CURR_BIZ_DATE
or (a.TX_DATE = b.CURR_BIZ_DATE and a.FLAG=1)
where a.SYSTEM='${sysid}'
EOF
	my $sth;
	eval {
		$sth = $dbh->prepare($str);
		$sth->execute();
	};
	if ([email?protected]) {
		CITIC::showtime();
		print "An error occurred ([email?protected])n";
		CITIC::showtime();
		print $dbh->errstr . "n";
		$dbh->disconnect();
		return %jobinfo;
	}
	while ( my @row = $sth->fetchrow_array() ) {
		$jobinfo{ $row[1] }{"SYSTEM"}     = $row[0];
		$jobinfo{ $row[1] }{"JOB_SCRIPT"} = $row[2];
		$jobinfo{ $row[1] }{"ARGV"}       = $row[3];
		$jobinfo{ $row[1] }{"START_TIME"} = $row[4];
		$jobinfo{ $row[1] }{"END_TIME"}   = $row[5];
		$jobinfo{ $row[1] }{"TX_DATE"}    = $row[6];
		$jobinfo{ $row[1] }{"FLAG"}       = $row[7];

	}
	return %jobinfo;
}

#获取配置参数
sub get_config{
	my($name) = @_;
	my $str;
	open(CONFIG,"$ENV{TPMS_EOD_ETC}/etc.profile");
	while(<CONFIG>){
		my($key,$value)=split('=',$_);
		if(uc($key) eq uc($name)){
			$str = CITIC::strimstr($value);
			chomp $str;
		}else{
			next;
		}
	}
	close(CONFIG);
	return uc($str);
}

#cmd
sub cmd {
	my $ret;
	my ( $script,$argv,$tablename,$dbh ) = @_;
	my $cmd = " perl ${script} $argv";
	my $rc = open( INFO,"$cmd 2>&1|" );
	unless ($rc) {
		CITIC::showtime();
		print "Can't invoke the Command!n";
		return 1;
	}
	while (<INFO>) {
		print $_;

		#print STDOUT $_;
		if (eof) {
			unless (/b(complete|Succeeds)/) {
				CITIC::showtime();
				print "*" x 6 . "$tablename作业执行失败" . "*" x 6 . "n";
				$ret = 1;
			}
			else {
				CITIC::showtime();
				print "*" x 6 . "$tablename作业执行成功" . "*" x 6 . "n";
				$ret = 0;
			}
		}
	}
	close(INFO);
	return $ret;
}

#update job info
sub upJobInfo {
	my ( $dbh,$flag,$final ) = @_;
	my $upstr;
	my $timestamp = showtime();

	if ( $flag == 1 ) {
		$upstr =
"START_TIME=to_timestamp('$timestamp','YYYY-MM-DD hh24:mi:ss:ff'),TX_DATE=to_date('$tx_date','YYYY-MM-DD')";
	}
	else {
		$upstr =
"END_TIME=to_timestamp('$timestamp',FLAG=$final";
	}

	my $str =
"update tpms_eod.T_TPMS_EODTABLE set $upstr where TABLE_NAME='$tablename' and SYSTEM='$system'";
	my $sth;
	eval {
		$sth = $dbh->prepare($str);
		$sth->execute();

	};
	if ([email?protected]) {
		CITIC::showtime();
		print "An error occurred ([email?protected])n";
		CITIC::showtime();
		print $dbh->errstr . "n";
		$dbh->disconnect();
		return 1;
	}
	return 0;
}

#get tx_date
sub getTxdate {
	my ($dbh) = @_;
	my $txdate;
	my $str =
	  "select to_char(CURR_BIZ_DATE,'YYYY-MM-DD') from tpms_eod.Tpms_Rollback_Bizdate";
	my $sth;
	eval {
		$sth = $dbh->prepare($str);
		$sth->execute();
	};
	if ([email?protected]) {
		CITIC::showtime();
		print "An error occurred ([email?protected])n";
		CITIC::showtime();
		print $dbh->errstr . "n";
		$dbh->disconnect();
		return 1;
	}
	my $table = $sth->fetchall_arrayref();
	$txdate = $table->[0][0];

	#$dbh->disconnect();
	return $txdate;

}

#get timestamp
sub showtime {
	my ( $sec,$min,$hour,$day,$mon,$year ) = localtime( time() );
	my $current = "";
	$sec  = sprintf( "%02d",$sec );
	$min  = sprintf( "%02d",$min );
	$hour = sprintf( "%02d",$hour );
	$day  = sprintf( "%02d",$day );
	$mon  = sprintf( "%02d",$mon + 1 );
	$year += 1900;
	$current = " $year-$mon-$day" . " $hour" . ":$min" . ":$sec";
	return ${current};
}

#execute eod job
sub executeJob {
	my ( $dbh,%hash ) = @_;
	my $i;
	my $num = scalar( keys %hash );
	for my $jobname ( keys %hash ) {
		CITIC::showtime();
		print "*" x 6 . "正在执行$jobname作业" . "*" x 6 . "n";
		upJobInfo( $dbh,$jobname,1 );
		$i++;
		$num--;
		eval {
			$threads{$jobname} = Thread->new(
				&;cmd,$hash{$jobname}{'JOB_SCRIPT'},$hash{$jobname}{'ARGV'},$jobname
			);

			#->join();
		};
		if ([email?protected]) {
			CITIC::showtime();
			print "An error occurred ([email?protected])n";
			return 1;
		}
		if ( $i > $maxnum - 1 ) {
			print "进程达到最大数" . Thread->list() . "!!!n";
			while (1) {

				for my $th ( Thread->list() ) {
					my $tid;
					if ( $th->done ) {
						eval { $tid = $th->join() };
						foreach my $tb ( keys %threads ) {
							if ( $threads{$tb}->tid() == $th->tid() ) {
								if ($tid) {
									upJobInfo( $dbh,$tb,2,1 );
								}
								else {
									upJobInfo( $dbh,0 );
								}
							}
						}
						$i--;
					}
					else {
						next;
					}
				}
				if ( Thread->list() < $maxnum ) {
					print "进程小于3,开始新的进程n";
					last;
				}
				else {
					print "sleep 10s,waiting for the job finishn";
					sleep 10;
				}
			}
		}
		if ( $num == 0 ) {
			print "#" x 20
			  . "所有作业调度完成,等待进程结束!!!"
			  . "#" x 20 . "n";
			for my $th1 ( Thread->list() ) {
				my $tid1;
				eval { $tid1 = $th1->join() };
				foreach my $tb1 ( keys %threads ) {
					if ( $threads{$tb1}->tid() == $th1->tid() ) {
						if ($tid1) {
							upJobInfo( $dbh,$tb1,1 );
						}
						else {
							upJobInfo( $dbh,0 );
						}
					}
				}
			}
		}
	}
}

#程序入口
sub main {

	#my ($system)[email?protected]_;
	my $ret = 0;
	my $dbh;
	%dbc_info = CITIC::get_dbc_info($TPMS_EOD_SID);

	unless (%dbc_info) {
		CITIC::showtime();
		print "Failed to get database information!n";
	}
	else {
		$dbh = CITIC::connect_db(
			$dbc_info{"ip"},$dbc_info{"port"},$dbc_info{"sid"},$dbc_info{"user"},$dbc_info{"pwd"}
		);
	}

	unless ($dbh) {
		$ret = 1;
	}
	else {
		my %jobinfo = getEodJob( $system,$dbh );
		if (%jobinfo) {
			if ( getTxdate($dbh) eq 1 ) {
				$ret = 1;
			}
			else {
				$tx_date = getTxdate($dbh);
				executeJob( $dbh,%jobinfo );
			}

		}
		else {
			CITIC::showtime();
			print "没有要执行的JOB,请确认后再执行!n";
			$ret = 1;
		}

		#	$ret = excute_sql( $dbh,@sql_queue );

	}

	return $ret;
}

open( STDERR,">&STDOUT" );
#if ( $#ARGV < 0 ) {
#	print "Please input parameters,for example:n1.system id :CTSn";
#	exit(1);
#}

#$system = uc( $ARGV[0] );
my $ret = main();
if ( $ret == 0 ) {
	print STDOUT "completen";
}
else {
	print STDOUT "failn";

#	CITIC::send_mail(
#		"[$my_sysenv] $log_name JOB FAILED",#"Hi all,n      Job $log_name failed. Please see attached log file for details.nthanks!"
#	);
}
exit($ret);

END {
	CITIC::showtime();
	print "return code is $retn";
	CITIC::close_logfile($log_file);
}

以上内容由PHP站长网【52php.cn】收集整理供大家参考研究

如果以上内容对您有帮助,欢迎收藏、点赞、推荐、分享。

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读