計算機集羣多任務投遞腳本

    qsub_all.pl:node

#!/usr/bin/perl -w
use strict;
use Getopt::Std;
use Cwd;

my $pwd = getcwd();

my($qsub_opt,@allJobs,$qsubDir,$shell);

use vars qw($opt_d $opt_l $opt_q $opt_N $opt_P $opt_n $opt_b $opt_m $opt_s $opt_r $opt_h);
getopts("d:l:q:N:P:n:b:m:s:rh");

if($opt_h or @ARGV == 0){
    &usage();
    exit;
}

# 生成目錄$qsubDir, 用於存聽任務輸出信息等
$shell = shift;
my $shell_name = (split /\//,$shell)[-1];
$qsubDir = $opt_d || (split /\//,$shell)[-1]."_qsub";
`rm -rf $qsubDir` if(-e $qsubDir);
`mkdir $qsubDir`;
`rm $shell.log` if(-e "$shell.log");
`rm $shell.error` if(-e "$shell.error");
`rm $shell.finished` if(-e "$shell.finished");

# 根據參數生成投遞任務命令
$opt_l = $opt_l || "vf=1G";
$qsub_opt = "qsub -cwd -S /bin/bash -l $opt_l ";
$qsub_opt .= "-q $opt_q " if($opt_q);
$qsub_opt .= "-P $opt_P " if($opt_P);
$qsub_opt .= "-l h=$opt_n" if($opt_n);
$opt_N = $opt_N || "work";

# 默認每一個sh文本放1個命令, 最大同時任務30
# 每隔120秒掃描任務狀態, 最大嘗試投遞次數10
my $lines = $opt_b || 1;
my $maxJob = $opt_m || 30;
my $sleepTime = $opt_s || 120;
my $max_try = 10;
$max_try = 1 if(!$opt_r);

# 根據$shell文檔生成並行運行的命令文檔
my $split_number;
open IS,$shell or die "can\'t open shell.sh: $shell\n";
while(<IS>){
    chomp;
    $split_number++;
    my $num = 1;
    open OUTS,">$qsubDir/$opt_N\_$split_number.sh" or die "can\'t open split shell: $qsubDir/$opt_N\_$split_number.sh\n";
    print OUTS $_;
    while($num < $lines){
        $num++;
        last if(eof(IS));
        chomp(my $command = <IS>);
        print OUTS "\n$command";
    }
    print OUTS "\necho this-work-is-complete\n";
    close OUTS;
    push @allJobs,"$qsubDir/$opt_N\_$split_number.sh";
}
close IS;

&qsub_and_wait();

# 若是最大容許同時投遞任務數目$maxJob小於總投遞任務$split_number, 則投遞$maxJob個任務, 否則投遞$split_number個
# 每隔$sleepTime時間查看任務狀態, 若是在跑任務數目小於$sub_num, 則繼續投遞任務
# 將全部任務的運行結果寫入$shell.log
sub qsub_and_wait{
    chomp(my $user = `whoami`);

    my(%runJob,%error,@wait);
    my $sub_num = $maxJob > $split_number ? $split_number : $maxJob;
    @wait = (1..$split_number);

    my $qnum = 0;
    while(@wait and $qnum < $sub_num){
        my $i = shift @wait;
        print "$qsub_opt -o $qsubDir/$opt_N\_$i.sh.o -e $qsubDir/$opt_N\_$i.sh.e -N $opt_N\_$i\_$shell_name $qsubDir/$opt_N\_$i.sh\n";
        chomp(my $qmess = `$qsub_opt -o $qsubDir/$opt_N\_$i.sh.o -e $qsubDir/$opt_N\_$i.sh.e -N $opt_N\_$i\_$shell_name $qsubDir/$opt_N\_$i.sh`);
        if($qmess =~ /^[Yy]our\sjob\s(\d+)\s\(\".*\"\)\shas\sbeen\ssubmitted.?$/){
            $runJob{$1} = "$qsubDir/$opt_N\_$i.sh";
            $qnum++;
        }else{
            unshift @wait,$i;
        }
    }

    while(@wait or keys %runJob){
        sleep($sleepTime);
        &check_job($user,\%error,\@wait,\%runJob);
        $qnum = keys %runJob;
        while(@wait and $qnum < $sub_num){
            my $i = shift @wait;
            print "$qsub_opt -o $qsubDir/$opt_N\_$i.sh.o -e $qsubDir/$opt_N\_$i.sh.e -N $opt_N\_$i\_$shell_name $qsubDir/$opt_N\_$i.sh\n";
            chomp(my $qmess = `$qsub_opt -o $qsubDir/$opt_N\_$i.sh.o -e $qsubDir/$opt_N\_$i.sh.e -N $opt_N\_$i\_$shell_name $qsubDir/$opt_N\_$i.sh`);
            if($qmess =~ /^[Yy]our\sjob\s(\d+)\s\(\".*\"\)\shas\sbeen\ssubmitted.?$/){
                $runJob{$1} = "$qsubDir/$opt_N\_$i.sh";
                $qnum++;
            }else{
                unshift @wait,$i;
            }
        }
    }

    open OUTL,">>$shell.log" or die "can\'t open shell.log\n";
    if(keys %error){
        print OUTL "There are some job can't run finish, check the shell and qsub again\n";
        for(sort {$a cmp $b} keys %error){
            print OUTL "$_\n";
        }
    }else{
        print OUTL "All jobs are finished correctly\n";
    }
    close OUTL;
}

# 檢查投遞任務的狀態, 運行qstat -xml -u $userName, 得到當前並行任務的ID, 名字, 狀態, 隊列
# 若是狀態爲Eqw,T,跑的節點是dead狀態, 則撤銷這個任務, 若是錯誤次數少於最大限度, 則從新投遞
# 對於已經中止的任務, 若是是完成了, 則從正在跑的任務名單剔除, 否則且在錯誤次數少於最大限度時,從新加入等待名單
sub check_job{
    my($userName,$error,$wait,$run) = @_;
    my %dead;
    &dead_nodes(\%dead);
    my %running;
    my $qsub_stat = `qstat -xml -u $userName`;
    while($qsub_stat =~ /<JB_job_number>(\d+?)<\/JB_job_number>.*?
            <JB_name>(.+?)<\/JB_name>.*?
            <state>(.+?)<\/state>.*?
            <queue_name>(.*?)<\/queue_name>
            /gxs){
        my ($jbnum, $jbname, $jbstat, $jbqueue) = ($1, $2, $3, $4);
        if($jbname =~ /$opt_N\_(\d+)/){
            my $num = $1;
            my $split_shell = $$run{$jbnum};
            if($jbstat eq "Eqw" or $jbstat eq "T" or ($jbqueue =~ /^.+@(.+)\.local$/ and exists $dead{$1})){
                $$error{$split_shell}++;
                `qdel $jbnum`;
                `echo $split_shell has not finished! >>$shell.error`;
                if($$error{$split_shell} < $max_try){
                    `rm $split_shell.[oe]`;
                    unshift @$wait,$num;
                    `echo $split_shell has been reqsub >>$shell.error`;
                }
                delete $$run{$jbnum};
            }
            $running{$jbnum} = undef;
        }
    }

    foreach my $id (sort {$a <=> $b} keys %$run){
        my $split_shell = $$run{$id};
        if(!exists $running{$id}){
            delete $$run{$id};
            chomp(my $log = `tail -1 $split_shell.o`);
            if($log eq "this-work-is-complete"){
                delete($$error{$split_shell});
                `echo $split_shell is finished! >> $shell.finished`;
            }else{
                `echo $split_shell has not finished! >>$shell.error`;
                $$error{$split_shell}++;
                if($$error{$split_shell} < $max_try){
                    `rm $split_shell.[oe]`;
                    my $num = $1 if($split_shell =~ /$opt_N\_(\d+)\.sh/);
                    unshift @$wait,$num;
                    `echo $split_shell has been reqsub >>$shell.error`;
                }
            }
        }
    }
}

# 運行qhost命令, 若是某個節點的LOAD  MEMUSE  SWAPUS MEMTOT SWAPTO其中一個爲-, 則將這個節點設置爲undef
sub dead_nodes{
    my $dead = shift;
    chomp(my @nodeMess = `qhost`);
    shift @nodeMess for(1..3);
    foreach(@nodeMess){
        my @temp = split;
        my $node_name = $temp[0];
        $dead->{$node_name} = undef if($temp[3]=~/-/ || $temp[5]=~/-/ || $temp[7]=~/-/ || $temp[4]=~/-/ || $temp[6]=~/-/);
    }
}

# 輸出幫助信息
sub usage{
    print <<EOD;
usage: perl $0 [options] shell.sh
    Options:
        -d  qsub script and log dir, default ./shell.sh_qsub/  # 生成這個目錄, 保存拆分的小任務以及任務的輸出信息等
        -l  the qsub -l option argument: vf=xxG[,p=xx,...] (default vf=1G)  # qsub的-l參數的內存,cpu部分, 輸入形式爲vf=xxG[,p=xx,...]
        -q  queue list, default all availabile queues  # qsub的-q參數, 指定任務的投遞節點
        -N  set the prefix tag for qsubed jobs, default work  # 指定拆分後小任務的前綴
        -P  project_name, default not  # qsub的-P參數, 指定任務的項目名
        -n  compute node, default all availabile nodes  # qsub的-l參數的節點部分
        -b  set number of lines to form a job, default 1  # 指定每一個拆分後sh文檔有幾個小任務
        -m  set the maximum number of jobs to throw out, default 30  # 指定可同時提交任務的最大數目
        -s  set interval time of checking by qstat, default 120 seconds  # 指定每隔多長時間檢查任務的狀態
        -r  mark to reqsub the job which was finished error, max reqsub 10 times, default not  # # 指定每一個sh任務可錯投遞次數
        -h  show this help  # 顯示幫助信息
EOD
}
相關文章
相關標籤/搜索