qsub_all.pl:node
#!/usr/bin/perl -w use strict; use Getopt::Std; use Cwd; my $pwd = getcwd(); my($qsub_opt,@allJobs,$qsubDir,$shell); use vars qw($opt_d $opt_l $opt_q $opt_N $opt_P $opt_n $opt_b $opt_m $opt_s $opt_r $opt_h); getopts("d:l:q:N:P:n:b:m:s:rh"); if($opt_h or @ARGV == 0){ &usage(); exit; } # 生成目錄$qsubDir, 用於存聽任務輸出信息等 $shell = shift; my $shell_name = (split /\//,$shell)[-1]; $qsubDir = $opt_d || (split /\//,$shell)[-1]."_qsub"; `rm -rf $qsubDir` if(-e $qsubDir); `mkdir $qsubDir`; `rm $shell.log` if(-e "$shell.log"); `rm $shell.error` if(-e "$shell.error"); `rm $shell.finished` if(-e "$shell.finished"); # 根據參數生成投遞任務命令 $opt_l = $opt_l || "vf=1G"; $qsub_opt = "qsub -cwd -S /bin/bash -l $opt_l "; $qsub_opt .= "-q $opt_q " if($opt_q); $qsub_opt .= "-P $opt_P " if($opt_P); $qsub_opt .= "-l h=$opt_n" if($opt_n); $opt_N = $opt_N || "work"; # 默認每一個sh文本放1個命令, 最大同時任務30 # 每隔120秒掃描任務狀態, 最大嘗試投遞次數10 my $lines = $opt_b || 1; my $maxJob = $opt_m || 30; my $sleepTime = $opt_s || 120; my $max_try = 10; $max_try = 1 if(!$opt_r); # 根據$shell文檔生成並行運行的命令文檔 my $split_number; open IS,$shell or die "can\'t open shell.sh: $shell\n"; while(<IS>){ chomp; $split_number++; my $num = 1; open OUTS,">$qsubDir/$opt_N\_$split_number.sh" or die "can\'t open split shell: $qsubDir/$opt_N\_$split_number.sh\n"; print OUTS $_; while($num < $lines){ $num++; last if(eof(IS)); chomp(my $command = <IS>); print OUTS "\n$command"; } print OUTS "\necho this-work-is-complete\n"; close OUTS; push @allJobs,"$qsubDir/$opt_N\_$split_number.sh"; } close IS; &qsub_and_wait(); # 若是最大容許同時投遞任務數目$maxJob小於總投遞任務$split_number, 則投遞$maxJob個任務, 否則投遞$split_number個 # 每隔$sleepTime時間查看任務狀態, 若是在跑任務數目小於$sub_num, 則繼續投遞任務 # 將全部任務的運行結果寫入$shell.log sub qsub_and_wait{ chomp(my $user = `whoami`); my(%runJob,%error,@wait); my $sub_num = $maxJob > $split_number ? $split_number : $maxJob; @wait = (1..$split_number); my $qnum = 0; while(@wait and $qnum < $sub_num){ my $i = shift @wait; print "$qsub_opt -o $qsubDir/$opt_N\_$i.sh.o -e $qsubDir/$opt_N\_$i.sh.e -N $opt_N\_$i\_$shell_name $qsubDir/$opt_N\_$i.sh\n"; chomp(my $qmess = `$qsub_opt -o $qsubDir/$opt_N\_$i.sh.o -e $qsubDir/$opt_N\_$i.sh.e -N $opt_N\_$i\_$shell_name $qsubDir/$opt_N\_$i.sh`); if($qmess =~ /^[Yy]our\sjob\s(\d+)\s\(\".*\"\)\shas\sbeen\ssubmitted.?$/){ $runJob{$1} = "$qsubDir/$opt_N\_$i.sh"; $qnum++; }else{ unshift @wait,$i; } } while(@wait or keys %runJob){ sleep($sleepTime); &check_job($user,\%error,\@wait,\%runJob); $qnum = keys %runJob; while(@wait and $qnum < $sub_num){ my $i = shift @wait; print "$qsub_opt -o $qsubDir/$opt_N\_$i.sh.o -e $qsubDir/$opt_N\_$i.sh.e -N $opt_N\_$i\_$shell_name $qsubDir/$opt_N\_$i.sh\n"; chomp(my $qmess = `$qsub_opt -o $qsubDir/$opt_N\_$i.sh.o -e $qsubDir/$opt_N\_$i.sh.e -N $opt_N\_$i\_$shell_name $qsubDir/$opt_N\_$i.sh`); if($qmess =~ /^[Yy]our\sjob\s(\d+)\s\(\".*\"\)\shas\sbeen\ssubmitted.?$/){ $runJob{$1} = "$qsubDir/$opt_N\_$i.sh"; $qnum++; }else{ unshift @wait,$i; } } } open OUTL,">>$shell.log" or die "can\'t open shell.log\n"; if(keys %error){ print OUTL "There are some job can't run finish, check the shell and qsub again\n"; for(sort {$a cmp $b} keys %error){ print OUTL "$_\n"; } }else{ print OUTL "All jobs are finished correctly\n"; } close OUTL; } # 檢查投遞任務的狀態, 運行qstat -xml -u $userName, 得到當前並行任務的ID, 名字, 狀態, 隊列 # 若是狀態爲Eqw,T,跑的節點是dead狀態, 則撤銷這個任務, 若是錯誤次數少於最大限度, 則從新投遞 # 對於已經中止的任務, 若是是完成了, 則從正在跑的任務名單剔除, 否則且在錯誤次數少於最大限度時,從新加入等待名單 sub check_job{ my($userName,$error,$wait,$run) = @_; my %dead; &dead_nodes(\%dead); my %running; my $qsub_stat = `qstat -xml -u $userName`; while($qsub_stat =~ /<JB_job_number>(\d+?)<\/JB_job_number>.*? <JB_name>(.+?)<\/JB_name>.*? <state>(.+?)<\/state>.*? <queue_name>(.*?)<\/queue_name> /gxs){ my ($jbnum, $jbname, $jbstat, $jbqueue) = ($1, $2, $3, $4); if($jbname =~ /$opt_N\_(\d+)/){ my $num = $1; my $split_shell = $$run{$jbnum}; if($jbstat eq "Eqw" or $jbstat eq "T" or ($jbqueue =~ /^.+@(.+)\.local$/ and exists $dead{$1})){ $$error{$split_shell}++; `qdel $jbnum`; `echo $split_shell has not finished! >>$shell.error`; if($$error{$split_shell} < $max_try){ `rm $split_shell.[oe]`; unshift @$wait,$num; `echo $split_shell has been reqsub >>$shell.error`; } delete $$run{$jbnum}; } $running{$jbnum} = undef; } } foreach my $id (sort {$a <=> $b} keys %$run){ my $split_shell = $$run{$id}; if(!exists $running{$id}){ delete $$run{$id}; chomp(my $log = `tail -1 $split_shell.o`); if($log eq "this-work-is-complete"){ delete($$error{$split_shell}); `echo $split_shell is finished! >> $shell.finished`; }else{ `echo $split_shell has not finished! >>$shell.error`; $$error{$split_shell}++; if($$error{$split_shell} < $max_try){ `rm $split_shell.[oe]`; my $num = $1 if($split_shell =~ /$opt_N\_(\d+)\.sh/); unshift @$wait,$num; `echo $split_shell has been reqsub >>$shell.error`; } } } } } # 運行qhost命令, 若是某個節點的LOAD MEMUSE SWAPUS MEMTOT SWAPTO其中一個爲-, 則將這個節點設置爲undef sub dead_nodes{ my $dead = shift; chomp(my @nodeMess = `qhost`); shift @nodeMess for(1..3); foreach(@nodeMess){ my @temp = split; my $node_name = $temp[0]; $dead->{$node_name} = undef if($temp[3]=~/-/ || $temp[5]=~/-/ || $temp[7]=~/-/ || $temp[4]=~/-/ || $temp[6]=~/-/); } } # 輸出幫助信息 sub usage{ print <<EOD; usage: perl $0 [options] shell.sh Options: -d qsub script and log dir, default ./shell.sh_qsub/ # 生成這個目錄, 保存拆分的小任務以及任務的輸出信息等 -l the qsub -l option argument: vf=xxG[,p=xx,...] (default vf=1G) # qsub的-l參數的內存,cpu部分, 輸入形式爲vf=xxG[,p=xx,...] -q queue list, default all availabile queues # qsub的-q參數, 指定任務的投遞節點 -N set the prefix tag for qsubed jobs, default work # 指定拆分後小任務的前綴 -P project_name, default not # qsub的-P參數, 指定任務的項目名 -n compute node, default all availabile nodes # qsub的-l參數的節點部分 -b set number of lines to form a job, default 1 # 指定每一個拆分後sh文檔有幾個小任務 -m set the maximum number of jobs to throw out, default 30 # 指定可同時提交任務的最大數目 -s set interval time of checking by qstat, default 120 seconds # 指定每隔多長時間檢查任務的狀態 -r mark to reqsub the job which was finished error, max reqsub 10 times, default not # # 指定每一個sh任務可錯投遞次數 -h show this help # 顯示幫助信息 EOD }