容易的Linux自動化運維工具之master端(一)

    

原文:https://www.3qos.com/article/101.htmlhtml

做者:容易  日期:2015-03-22python

備註:非本人贊成,請勿轉載linux


        瞭解過很多linux集中管理工具,基本原理都差不太多,只是複雜度和安裝難度不一樣而已,本人習慣一切從簡也喜歡玩,本身寫了個簡單的。不追求一次性管理成千上萬臺服務器,簡單的管理1000臺左右的服務器因該問題不大。基本原則安裝簡單,管理簡單,功能簡單知足平常基本需求便可。至於安全和別的問題,你們能夠本身去折騰。nginx

    本人非專業開發,更多的是站在運維的角度,下降運維人員學習和開發相似工具的門檻,因此其中有幾個功能直接使用了一般系統安裝都會自帶的系統命令實現。git

    該工具實現瞭如下主要的功能安全

一、集中命令發佈服務器

二、集中腳本發佈運維

三、任務實時監控和結果反饋socket

四、命令或腳本執行超時設置,整個任務的超時設置tcp

五、日誌記錄功能等

 備註:腳本發佈須要一個http服務器提供靜態腳本給客戶端下載,例如簡單搭建一個nginx靜態服務器便可。linux集中管理基本都是基於命令和腳本,別的就根據各自喜歡本身去增長吧。

   基本思路master監聽來自客戶端的請求。正常的請求分爲如下幾個類別

一、新曾批量任務,即須要發佈的批量任務

二、客戶端任務請求,客戶端會定時到服務端輪詢是否有屬於本身的任務

三、客戶端任務報告,來次客戶端的任務反饋

原理以下圖

   

       服務端master負責任務發佈,以及任務監控,客戶端主動與服務端創建長鏈接,每隔1秒左右會請求master詢問是否有屬於本身的任務,若是有屬於該客戶端的任務,服務端會將任務信息發送給客戶端,客戶端根據任務的內容執行相關操做,而且將執行結果返回給服務端。這樣作有點傻可是簡單,服務端不須要去作太複雜的鏈接管理,目前服務器仍是屬於阻塞鏈接,後續有機會改改。

   大概原理就這樣,其餘的就不說太多廢話了。

master 端代碼

#-*- coding: UTF-8 -*- 

__author__ = 'tiger'

#!/usr/bin/env python

import zmq, time, sys, os, atexit

from signal import SIGTERM

from threading import Thread

from Queue import Queue

import logging
from logging.handlers import RotatingFileHandler

#定義日誌函數
def mylog(logfile):
    rthandler = RotatingFileHandler(logfile, 'a', maxBytes=50 * 1024 * 1024, backupCount=3)
    formatter = logging.Formatter(
        '%(levelname)s %(thread)d %(threadName)s %(process)d %(funcName)s %(asctime)s %(filename)s[line:%(lineno)d] %(message)s',
        datefmt='%a, %d %b %Y %H:%M:%S')
    rthandler.setFormatter(formatter)
    log = logging.getLogger()
    log.setLevel(logging.INFO)

    log.addHandler(rthandler)

    return log
#任務監控函數
def job_monitor(job_queue, add_queue, homedir, log):
    #ob_start,是表明job id號。
    #ip_list,是新任務涉及的全部IP地址列表
    global job_start
    global ip_list
    #任務報告會寫到該目錄下以job id生成的log文件中,每一個任務會單獨生成一個
    job_path = homedir + '/' + 'job/'
    while 1:
        #判斷是否有新的任務,若是有執行後續的操做,沒有將被阻塞一直到有新任務爲止
        job_info = add_queue.get()
        #定義變量,job_status存放批量任務因此客戶端的響應信息和狀態
        job_status = {}
        #日誌文件的絕對路徑和日誌文件名
        job_log = job_path + str(job_start) + '.log'
        #進行日誌寫操做,標記任務的開始
        file_object = open(job_log, 'w+')
        file_object.write('job ' + str(job_start) + ' start\n')
        file_object.flush()
        job_status.update(job_info['iplist'])
        job_objects_report = 0
        log.info('job_start status:%s' % job_start)
        #新任務開始時,job_start作爲全局變量會自動標記爲job id,不然爲0
        while job_start:
            #時間監控,判斷任務超時使用

            end_time = time.time()

            #嘗試獲取任務的返回結果,採用非阻塞方式,若是沒有任務報告,將會等待1秒後繼續嘗試一直到任務超時。

            try:

                ip_info = job_queue.get_nowait()

                #有任務報告時,首先判斷任務的ID是否與此次的任務ID一致,若是一致寫入日誌,而且更新對應的client的響應狀態。

                if ip_info['id'] == job_start:

                    job_objects_report = job_objects_report + 1

                    try:

                        job_status[ip_info['ip']] = [ip_info['code'], ip_info['info']]

                        log.info(ip_info)

                        file_object.write(ip_info['ip'] + ' code:' + str(ip_info['code']) + '\n')

                        file_object.write(ip_info['info'].strip())

                        file_object.write('\n')

                        file_object.flush()

                    except Exception, err:

                        log.warn(str(err))

                else:

                    continue

                #若是所有客戶端都響應了任務請求,提示任務結束刷新相關任務狀態。

                if job_objects_report >= job_info['count']:

                    log.info('job commpletel,job status:' + str(job_status))

                    file_object.write('job ' + str(job_start) + ' end\n')

                    file_object.close()

                    job_objects_report = 0

                    job_status = {}

                    job_start = 0

            #假如獲取任務報告,失敗時,判斷超時狀態,若是超時了,結束本次任務,刷新相關狀態信息,跳出循環監聽新的任務。

            except:

                if (end_time - job_info['start_time']) >= job_info['jobtimeout']:

                    #job_status.update(ip_list)

                    log.info('job_timeout,job status info:' + str(job_status))

                    #若是客戶端沒有返回任務報告,客戶端對應的任務狀態爲N

                    for key, value in job_status.items():

                        if value == 'N':

                            file_object.write(key + ' code:N\n')

                    #將相關信息寫入日誌,刷新任務的狀態信息,推出循環

                    file_object.write('job ' + str(job_start) + ' has error please check.\n')

                    file_object.write('job ' + str(job_start) + ' end\n')

                    file_object.close()

                    ip_list = {}

                    job_status = {}

                    job_objects_report = 0

                    job_start = 0

                else:

                    time.sleep(1)

                continue

  

#負責任務接收,發佈以及與客戶端的鏈接

def ioloop(sock_file, job_queue, add_queue, log):

    #job_command定義任務的具體信息

    job_command = {}

    #job_start定義的是任務的ID,沒有任務時job_start爲0

    global job_start

    #定義的是新任務因此服務器地址列表

    global ip_list

    #建立監聽

    context = zmq.Context()

    socks = context.socket(zmq.REP)

    socks.bind(sock_file)

    while 1:

        #獲取客戶端請求信息。

        req = socks.recv_pyobj()

        #獲取客戶端請求的類別

        try:

            req_type = req['req_type']

        except Exception, err:

            socks.send_pyobj({'rep_type':'N'})

            log.info(str(err))

            continue

        #若是是task類別,表明是客戶端任務請求,而且獲取客戶端的IP信息

        if req_type == 'task':

            try:

                req_ip = req['ip']

            except Exception, err:

                log.info(str(err))

                socks.send_pyobj({'rep_type':'N'})

                continue

            #若是客戶端請求的ip地址,在任務列表中,將須要執行的任務信息發送給客戶端,不然提示沒有相關任務

            if req_ip in ip_list.keys():

                ip_list.pop(req_ip)

                socks.send_pyobj(job_command)

            else:

                socks.send_pyobj({'rep_type':'N'})

        #若是是report,表明是客戶端的任務響應報告

        elif req_type == 'report':

        #判斷job返回狀態信息,列表列表,包括job_ib,job_code(0表明ok,其餘表明失敗)

        #若是響應的id號等於本次執行的任務id,將報告信息經過queue傳遞到任務監控線程進行處理,其餘的忽略。

            try:

                if req['id'] == job_start:

                    job_queue.put(req)

                    socks.send_pyobj({'rep_type':'report','id':req['id'],'status':'ok'})

                else:

                    socks.send_pyobj({'rep_type':'N'})

                    log.info('report time out.' + str(req))

            except Exception, err:

                log.info('add report queue error:' + str(err))

                socks.send_pyobj({'rep_type':'N'})

        #增長任務,當job_start!=0時,表明有任務執行,不容許添加新任務,後續改進

        #信任務的信息爲字典對象,job_info表明任務的具體信息,例如須要執行的命令以及相關超時信息和任務ID等,

        #iplist表明須要執行本次任務的全部客戶端IP

        #以下示例

        #ip_list={'192.168.4.195':'N','192.168.4.196':'N','192.168.4.194':'N'}

        #{'id': job_id, 'type': job_type, 'task': job_task, 'jobtimeout': job_jobtimeout,

        #            'cmdtimeout': job_cmdtimeout, 'env': job_env, 'fileserver': job_fileserver, 'rep_type': 'newtask',

        #            'rundir': job_rundir}

        elif req_type == 'addjob':

            if job_start == 0:

                try:

                    job_command = req['job_info']

                    job_start = job_command['id']

                    ip_list = req['iplist']

                    job_info = {}

                    job_info['start_time'] = time.time()

                    job_info['iplist'] = req['iplist']

                    job_info['count'] = len(req['iplist'])

                    job_info['jobtimeout'] = job_command['jobtimeout']

                    job_info['id'] = job_command['id']

                    job_info['iplist'] = req['iplist']

                    add_queue.put(job_info)

                    socks.send_pyobj('job add ok,job log job/' + str(job_info['id']) + '.log')

                    log.info('job add ok,iplist:' + str(ip_list) + ',job_command:' + str(job_command))

                except Exception, err:

                    socks.send_pyobj('job add error ' + str(err))

                    log.warn(str(err))

            elif job_start != 0:

                socks.send_pyobj('job add error,job running.')

                log.info('job add error,job running.')

            else:

                socks.send_pyobj('job add error,parameter error.')

                log.info('job add error,parameter error.')

        else:

            socks.send_pyobj({'rep_type':'N'})

  

#檢查IP地址是否正常,暫時未使用

def ip_check(ip):

    q = ip.split('.')

    return len(q) == 4 and len(filter(lambda x: x >= 0 and x <= 255, \

                                      map(int, filter(lambda x: x.isdigit(), q)))) == 4

  

#將master啓動爲守護進程

class Daemon:

    def __init__(self, pidfile, homedir, sock_file, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):

        self.stdin = stdin

        self.stdout = stdout

        self.stderr = stderr

        self.pidfile = pidfile

        self.homedir = homedir

        self.sock_file = sock_file

  

    def _daemonize(self):

  

        #脫離父進程

        try:

            pid = os.fork()

            if pid > 0:

                sys.exit(0)

        except OSError, e:

            sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))

            sys.exit(1)

            #脫離終端

        os.setsid()

        #修改當前工做目錄

        os.chdir("/")

        #重設文件建立權限

        os.umask(0)

        #第二次fork,禁止進程從新打開控制終端

        try:

            pid = os.fork()

            if pid > 0:

                sys.exit(0)

        except OSError, e:

            sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))

            sys.exit(1)

        sys.stdout.flush()

        sys.stderr.flush()

        si = file(self.stdin, 'r')

        so = file(self.stdout, 'a+')

        se = file(self.stderr, 'a+', 0)

        #重定向標準輸入/輸出/錯誤

        os.dup2(si.fileno(), sys.stdin.fileno())

        os.dup2(so.fileno(), sys.stdout.fileno())

        os.dup2(se.fileno(), sys.stderr.fileno())

        #註冊程序退出時的函數,即刪掉pid文件

        atexit.register(self.delpid)

        pid = str(os.getpid())

        file(self.pidfile, 'w+').write("%s\n" % pid)

  

    def delpid(self):

        os.remove(self.pidfile)

  

    def start(self):

        # Check for a pidfile to see if the daemon already runs

        try:

            pf = file(self.pidfile, 'r')

            pid = int(pf.read().strip())

            pf.close()

        except IOError:

            pid = None

        if pid:

            message = "pidfile %s already exist. Daemon already running?\n"

            sys.stderr.write(message % self.pidfile)

            sys.exit(1)

            # Start the daemon

        self._daemonize()

        self._run()

  

    def stop(self):

        # Get the pid from the pidfile

        try:

            pf = file(self.pidfile, 'r')

            pid = int(pf.read().strip())

            pf.close()

        except IOError:

            pid = None

        if not pid:

            message = "pidfile %s does not exist. Daemon not running?\n"

            sys.stderr.write(message % self.pidfile)

            return # not an error in a restart

            # Try killing the daemon process

        try:

            while 1:

                os.kill(pid, SIGTERM)

                time.sleep(0.1)

        except OSError, err:

            err = str(err)

            if err.find("No such process") > 0:

                if os.path.exists(self.pidfile):

                    os.remove(self.pidfile)

            else:

                print str(err)

                sys.exit(1)

  

    def restart(self):

        self.stop()

        self.start()
    def _run(self):

        pass

#繼承Daemon重寫_run函數實現本身的守護進程

class MyDaemon(Daemon):

    def _run(self, ):

        #定義日誌函數

        log = mylog(self.stdout)

        global job_start

        global ip_list

        job_start = 0

        ip_list = {}

        #定義任務Q,job_queue用來與任務監控線程傳遞客戶端的任務報告

        job_queue = Queue(maxsize=500)

        #用來告訴任務監控線程有新的任務

        add_queue = Queue(maxsize=5)

        #啓動任務監控線程

        worker = Thread(target=job_monitor, args=(job_queue, add_queue, self.homedir, log))

        worker.setDaemon(True)

        worker.start()

        #啓動任務監聽服務,負責處理來次監控處理進程的請求,而且將監控對象的信息返回給監控進程處理。

        ioloop(self.sock_file, job_queue, add_queue, log)

  

#程序的啓動入口

def main():

    #定義了程序所需的相關目錄,日誌文件名等信息

    homedir = os.getcwd()

    for i in ('log', 'run', 'job'):

        path = homedir + '/' + i

        if not os.path.exists(path):

            os.makedirs(path, 0755)

    stdout = homedir + '/log' + '/oaos_master.log'

    stderr = homedir + '/log' + '/oaos_master.err'

    pidfile = homedir + '/run' + '/oaos_master.pid'

    #定義了監聽端口

    sock_file = "tcp://192.168.4.194:7777"

    daemon = MyDaemon(pidfile, homedir, sock_file, stdout=stdout, stderr=stderr)

    if len(sys.argv) == 2:

        if 'start' == sys.argv[1]:

            daemon.start()

        elif 'stop' == sys.argv[1]:

            daemon.stop()

        elif 'restart' == sys.argv[1]:

            daemon.restart()

        else:

            print "Unknown command"

            sys.exit(2)

        sys.exit(0)

    else:

        print "usage: %s start|stop|restart" % sys.argv[0]

        sys.exit(2)

if __name__ == "__main__":
    main()

add_job和client端代碼請見下一篇

相關文章
相關標籤/搜索