容易的linux自動化運維工具之clinet端(二)

原文:https://www.3qos.com/article/101.htmlhtml

做者:容易  日期:2015-03-17python

備註:非本人贊成,請勿轉載linux


    接上一篇 容易的linux集中管理工具之master端(一)git

    client端代碼shell

#-*- coding: UTF-8 -*- 

__author__ = 'tiger'

#!/usr/bin/env python

import zmq, time, sys, os, atexit

from signal import SIGTERM

import random

import logging

#import ConfigParser

import subprocess

from logging.handlers import RotatingFileHandler

import socket

import fcntl

import struct

import signal

  

#定義日誌函數

def mylog(logfile):

    rthandler = RotatingFileHandler(logfile, 'a', maxBytes=50 * 1024 * 1024, backupCount=3)

    formatter = logging.Formatter(

        '%(levelname)s %(thread)d %(threadName)s %(process)d %(funcName)s %(asctime)s %(filename)s[line:%(lineno)d] %(message)s',

        datefmt='%a, %d %b %Y %H:%M:%S')

    rthandler.setFormatter(formatter)

    log = logging.getLogger()

    log.setLevel(logging.INFO)

    log.addHandler(rthandler)

    return log

  

#獲取指定接口的IP地址

def get_ip_address(ifname):

    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    return socket.inet_ntoa(fcntl.ioctl(

        s.fileno(),

        0x8915, # SIOCGIFADDR

        struct.pack('256s', ifname[:15])

    )[20:24])

  

#定義命令或者腳本執行超時所觸發的錯誤

class ProcessTimeout(Exception):

    pass

  

#定義命令或者腳本執行超時所觸發的錯誤句柄

def timeout_handler(signum, frame):

    raise ProcessTimeout

  

#執行命令的函數

def exec_shell(task, rundir=None, timeout=None):

    #定義超時

    if timeout:

        signal.signal(signal.SIGALRM, timeout_handler)

        signal.alarm(timeout)

    p = subprocess.Popen(task, bufsize=0, shell=True, cwd=rundir, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    try:

        stdout, stderr = p.communicate()

        returncode = p.poll()

        signal.alarm(0)

    except ProcessTimeout:

        p.stdout.close()

        p.stderr.close()

        p.terminate()

        stderr = 'Calculation was taking too long, so I killed it dead.\n'

        returncode = 'timeout'

    del p

    #返回的信息,若是成功返回標準輸出

    #錯誤返回標準錯誤

    if returncode == 0:

        return [returncode, stdout]

    else:

        return [returncode, stderr]

  

#任務類別爲腳本時調用該函數

def exec_script(fileserver, script, dirpath, rundir=None, timeout=None):

    fileurl = fileserver + script['script_name']

    #去http服務器下載腳本,直接調用系統的wget命令下載

    getfile = 'wget -N -P ' + dirpath + ' ' + fileurl

    filename = dirpath + script['script_name']

    task = script['script_env'] + ' ' + filename

    getfile_res = exec_shell(getfile, rundir=rundir, timeout=timeout)

    if getfile_res[0] == 0:

        task_res = exec_shell(task, rundir=rundir, timeout=timeout)

        try:

            os.remove(filename)

        except Exception, err:

            task_res[1] = task_res[1] + str(err)

        return task_res

    else:

        return getfile_res

  

#與master創建鏈接完成與master的通訊

def ioloop(sock_file, homedir, mip, stdout):

    #定義運行目錄,默認狀況下,腳或者命令在該目錄執行

    dirpath = homedir + '/run/'

    #定義平常函數

    log = mylog(stdout)

    context = zmq.Context()

    socket = context.socket(zmq.REQ)

    socket.setsockopt(zmq.LINGER, 0)

    socket.connect(sock_file)

    poller = zmq.Poller()

    poller.register(socket, zmq.POLLIN)

    #循環

    while 1:

        #發送請求信息

        try:

            socket.send_pyobj({'req_type': 'task', 'ip': mip})

        except Exception, err:

            log.warn(str(err))

            socket.close()

            context.term()

            time.sleep(random.randint(1, 5))

            context = zmq.Context()

            socket = context.socket(zmq.REQ)

            socket.setsockopt(zmq.LINGER, 0)

            socket.connect(sock_file)

            poller = zmq.Poller()

            poller.register(socket, zmq.POLLIN)

            continue

        #服務端響應超時

        if poller.poll(20 * 1000):

            rep = socket.recv_pyobj()

            #若是有響應信息,判斷響應的類別。

            try:

                rep_type = rep['rep_type']

            except Exception, err:

                log.info(str(err))

                time.sleep(random.uniform(0.8, 1.2))

                continue

            #若是響應類別爲newtask,則獲取本次任務所需的其餘信息

            if rep_type == 'newtask':

                try:

                    job_id = rep['id']

                    job_task = rep['task']

                    job_type = rep['type']

                    cmd_timeout = rep['cmdtimeout']

                    rundir = rep['rundir']

                    log.warn('start new job ' + str(rep))

                except Exception, err:

                    if job_id:

                        socket.send_pyobj(

                            {'id': job_id, 'code': '99', 'info': str(err), 'ip': mip, 'req_type': 'report'})

                        socket.recv_pyobj()

                    time.sleep(random.uniform(0.8, 1.2))

                    log.warn(str(err) + str(rep))

                    continue

                #若是任務類別是腳本,則嘗試獲取執行腳本所需的其餘信息

                if job_type == 's':

                    try:

                        script_env = rep['env']

                        script = {'script_name': job_task, 'script_env': script_env}

                        fileserver = rep['fileserver']

                        #調用運行腳本的函數執行腳本

                        if rundir == 'None':

                            res = exec_script(fileserver, script, dirpath, rundir=dirpath, timeout=cmd_timeout)

                        else:

                            res = exec_script(fileserver, script, dirpath, rundir=rundir, timeout=cmd_timeout)

                    except Exception, err:

                        log.warn(str(err))

                        continue

                #任務類別爲其餘時則統一看成命令執行

                else:

                    if rundir == 'None':

                        res = exec_shell(job_task, rundir=dirpath, timeout=cmd_timeout)

                    else:

                        res = exec_shell(job_task, rundir=rundir, timeout=cmd_timeout)

                #將執行結果返回給master,標記請求類別爲report,然master知道該請求是任務報告請求。

                socket.send_pyobj({'id': job_id, 'code': res[0], 'info': res[1], 'ip': mip, 'req_type': 'report'})

                socket.recv_pyobj()

                log.info(str({'id': job_id, 'code': res[0], 'info': res[1], 'ip': mip}))

                time.sleep(random.uniform(0.8, 1.2))

            else:

                time.sleep(random.uniform(0.8, 1.2))

        else:

            #響應超時時嘗試重連master端

            log.warn("master server connect time out,will colse current socket,try again.")

            socket.close()

            context.term()

            time.sleep(random.randint(1, 5))

            context = zmq.Context()

            socket = context.socket(zmq.REQ)

            socket.setsockopt(zmq.LINGER, 0)

            socket.connect(sock_file)

            poller = zmq.Poller()

            poller.register(socket, zmq.POLLIN)

    socket.close()

    context.term()

  

  

def ip_check(ip):

    q = ip.split('.')

    return len(q) == 4 and len(filter(lambda x: x >= 0 and x <= 255, \

                                      map(int, filter(lambda x: x.isdigit(), q)))) == 4

  

  

class Daemon:

    def __init__(self, pidfile, sock_file, homedir, mip, stdin='/dev/null', stdout='/dev/null',

                 stderr='/dev/null'):

        self.stdin = stdin

        self.stdout = stdout

        self.stderr = stderr

        self.pidfile = pidfile

        self.homedir = homedir

        self.sock_file = sock_file

        self.mip = mip

  

    def _daemonize(self):

  

        #脫離父進程

        try:

            pid = os.fork()

            if pid > 0:

                sys.exit(0)

        except OSError, e:

            sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))

            sys.exit(1)

            #脫離終端

        os.setsid()

        #修改當前工做目錄

        os.chdir("/")

        #重設文件建立權限

        os.umask(0)

        #第二次fork,禁止進程從新打開控制終端

        try:

            pid = os.fork()

            if pid > 0:

                sys.exit(0)

        except OSError, e:

            sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))

            sys.exit(1)

        sys.stdout.flush()

        sys.stderr.flush()

        si = file(self.stdin, 'r')

        so = file(self.stdout, 'a+')

        se = file(self.stderr, 'a+', 0)

        #重定向標準輸入/輸出/錯誤

        os.dup2(si.fileno(), sys.stdin.fileno())

        os.dup2(so.fileno(), sys.stdout.fileno())

        os.dup2(se.fileno(), sys.stderr.fileno())

        #註冊程序退出時的函數,即刪掉pid文件

        atexit.register(self.delpid)

        pid = str(os.getpid())

        file(self.pidfile, 'w+').write("%s\n" % pid)

  

    def delpid(self):

        os.remove(self.pidfile)

  

    def start(self):

        # Check for a pidfile to see if the daemon already runs

        try:

            pf = file(self.pidfile, 'r')

            pid = int(pf.read().strip())

            pf.close()

        except IOError:

            pid = None

        if pid:

            message = "pidfile %s already exist. Daemon already running?\n"

            sys.stderr.write(message % self.pidfile)

            sys.exit(1)

            # Start the daemon

        self._daemonize()

        self._run()

  

    def stop(self):

        # Get the pid from the pidfile

        try:

            pf = file(self.pidfile, 'r')

            pid = int(pf.read().strip())

            pf.close()

        except IOError:

            pid = None

        if not pid:

            message = "pidfile %s does not exist. Daemon not running?\n"

            sys.stderr.write(message % self.pidfile)

            return # not an error in a restart

            # Try killing the daemon process

        try:

            while 1:

                os.kill(pid, SIGTERM)

                time.sleep(0.1)

        except OSError, err:

            err = str(err)

            if err.find("No such process") > 0:

                if os.path.exists(self.pidfile):

                    os.remove(self.pidfile)

            else:

                print str(err)

                sys.exit(1)

  

    def restart(self):

        self.stop()

        self.start()

  

    def _run(self):

        pass

  

#重寫Daemon的_run函數實現本身的Daemon進程

class MyDaemon(Daemon):

    def _run(self, ):

        ioloop(self.sock_file, self.homedir, self.mip, self.stdout)

  

#定義主函數,建立相關運行目錄和定義日誌路徑等

def main():

    homedir = os.getcwd()

    for i in ('log', 'run'):

        path = homedir + '/' + i

        if not os.path.exists(path):

            os.makedirs(path, 0755)

    stdout = homedir + '/log' + '/oaos_client.log'

    stderr = homedir + '/log' + '/oaos_client.err'

    pidfile = homedir + '/run' + '/oaos_client.pid'

    #master的tcp接口

    sock_file = "tcp://192.168.4.194:7777"

    #該接口是指用來與master通訊的客戶端IP接口

    ifname = 'eth0'

    try:

        mip = get_ip_address(ifname)

    except Exception, err:

        print err

        sys.exit(3)

    daemon = MyDaemon(pidfile, sock_file, homedir, mip, stdout=stdout, stderr=stderr)

    if len(sys.argv) == 2:

        if 'start' == sys.argv[1]:

            daemon.start()

        elif 'stop' == sys.argv[1]:

            daemon.stop()

        elif 'restart' == sys.argv[1]:

            daemon.restart()

        else:

            print "Unknown command"

            sys.exit(2)

        sys.exit(0)

    else:

        print "usage: %s start|stop|restart" % sys.argv[0]

        sys.exit(2)

  

  

if __name__ == "__main__":

    main()
相關文章
相關標籤/搜索