關於returner的基礎使用請參考returner文章。python
繼續上面的話題,這裏編寫c/s端來採集數據。
mysql
繼續下面話題以前,你須要瞭解event、returner、zmq協議框架。web
步驟:sql
一、在syndic上運行客戶端程序,用來收集數據,其實就是master-minion架構。數據庫
二、收集的數據首先寫入本地log中,其次發送到頂級master端。多線程
三、頂級master運行服務端程序,用來接收數據,並寫入本地數據庫。架構
四、確保數據不丟失,採用zmq協議框架,使用REQ/REP套接字。app
先說說客戶端:框架
一、使用event去過濾事件,包含new_job和ret事件。
socket
二、關於new_job的信息使用單線程發送。
三、關於ret事件使用多線程發送。
client.py 程序以下
#!/usr/bin/env python # coding: utf-8 import sys import threading import salt.utils.event import msgpack import zmq import config class JobProcess(object): ''' return new job's jid and id to master ''' def __init__(self,jid,minions): self.jid = jid self.minions = minions # a list that include lots of minion id def run(self): context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect("tcp://%s:%d" % (config.server,config.server_port)) # use msgpack convert data for id in self.minions: data = {'jid':self.jid,'id':id} packed_data = msgpack.packb(data) socket.send(packed_data) message = socket.recv() socket.close() context.term() class ResultThread(threading.Thread): ''' return the minions's result to master ''' def __init__(self,fun_args,jid,result,success,fun,id): self.fun_args = fun_args # a list self.jid = jid self.result = result # not return self.success = success # flag self.fun = fun self.id = id super(ResultThread,self).__init__() def run(self): context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect("tcp://%s:%d" % (config.server,config.server_port)) data = {'fun_args':self.fun_args,'jid':self.jid, 'success':self.success,'fun':self.fun,'id':self.id} self.write_to_log(data) packed_data = msgpack.packb(data) socket.send(packed_data) message = socket.recv() socket.close() context.term() # write data to local log # if send failed, we can check the log to check the exe result def write_to_log(self,data): log_file = config.access_log with open(log_file,'a+') as fp: fp.writelines([str(data),'\n']) def filter_events(): ''' filter the return result ''' sock_dir = config.sock_dir event = salt.utils.event.MasterEvent(sock_dir) try: for eachevent in event.iter_events(full=True): ret = eachevent['data'] # get the minions' return result if 'salt/job/' in eachevent['tag']: if ret.has_key('id') and ret.has_key('return'): # Igonre saltutil.find_job event if ret['fun'] == "saltutil.find_job": continue # use thread handle data result_thd = ResultThread(ret['fun_args'],ret['jid'], ret['return'],ret['success'],ret['fun'],ret['id']) result_thd.start() # get new job's info: jid and minions elif eachevent['tag'] == 'new_job': # Igonre saltutil.find_job event if ret['fun'] == "saltutil.find_job": continue # handle data job_pro = JobProcess(ret['jid'],ret['minions']) job_pro.run() # other event else: pass except BaseException,e: if str(e): err = str(e) else: err = 'Terminated' print err with open(config.error_log,'a+') as fp: fp.writelines([err,'\n']) sys.exit(2) def main(): filter_events() if __name__ == '__main__': main()
配置文件config.py以下
#!/usr/bin/env python # coding: utf-8 # listen server ip and port server = '192.168.110.150' server_port = 10000 # the mysql server info host = 'localhost' user = 'salt' passwd = 'salt' db = 'salt' port = 3306 # the master local sock dir sock_dir = '/opt/app/salt/run/master' # log access_log = '/tmp/salt_access.log' error_log = '/tmp/salt_error.log'
filter_events函數是使用event過濾new_job和ret事件的。new_job事件採用JobProcess類處理,使用單進程。ret事件採用ResultThread類處理,使用多線程。
這裏有三個東西解釋下:
第一個就是爲何採用zmq協議框架。首先是不用考慮服務端和客戶端的啓動順序,其次就是server掛了,客戶端運行,只要啓動server,原先的數據並不會丟失。若是使用傳統socket,server掛了,就須要重啓客戶端再啓動服務端,數據會丟失。
第二個就是爲何須要採集new_job信息,咱們知道new_job包含jid和有關的minions,咱們先把這些數據入庫,而後根據ret信息將jid+id對應記錄更新。可是有些minion沒有啓動的話,是沒有ret信息的,若是此時不先把new_job信息入庫,那沒有返回ret信息的minion就不能統計到。
第三個就是爲何new_job採用單線程而ret採用多線程發送。你想一想,若是new_job也是用多線程,server端掛了,那麼此時在client端會堆積不少線程,會致使"Too many files open",從而致使client被迫退出。而若是採用單線程發送的話,server掛了,那麼此時客戶端會阻塞在recv上,就不會堆積不少線程了。
服務端:
一、接受客戶端的數據
二、new_job信息採用單線程寫入數據庫,ret信息使用多線程寫入數據庫
三、確保new_job信息先寫入,ret信息用於更新記錄
四、根據信息是否有success屬性判斷信息類別
server.py以下
#!/usr/bin/env python # coding: utf-8 import zmq import msgpack import MySQLdb import threading from contextlib import contextmanager from sys import exit import config @contextmanager def _get_serv(commit=False): ''' Return a mysql cursor ''' conn = MySQLdb.connect(host=config.host, user=config.user, passwd=config.passwd, db=config.db,port=config.port) cursor = conn.cursor() try: yield cursor except MySQLdb.DatabaseError as err: error, = err.args sys.stderr.write(error.message) cursor.execute("ROLLBACK") raise err else: if commit: cursor.execute("COMMIT") else: cursor.execute("ROLLBACK") finally: conn.close() class HandleProcess(object): ''' insert jid and id to mysql ''' def __init__(self,data): self.data = data def run(self): with _get_serv(commit=True) as cur: sql = "select jid,id from salt_returns where id=%s and jid=%s" num = cur.execute(sql,(self.data['id'],self.data['jid'])) if num: pass else: sql = "insert into salt_returns(id,jid) values(%s,%s)" cur.execute(sql,(self.data['id'],self.data['jid'])) class HandleThread(threading.Thread): ''' update the result to mysql ''' def __init__(self,data): self.data = data super(HandleThread,self).__init__() def run(self): with _get_serv(commit=True) as cur: sql = "select jid,id from salt_returns where id=%s and jid=%s" num = cur.execute(sql,(self.data['id'],self.data['jid'])) # the fun_args is a list ,need convert to str fun_args = str(self.data['fun_args']) # if jid and id is exist # then update the data if num: sql = "update salt_returns set fun_args=%s,success=%s,fun=%s where id=%s and jid=%s" cur.execute(sql,(fun_args,self.data['success'], self.data['fun'],self.data['id'],self.data['jid'])) def handle_data(message): unpack_data = msgpack.unpackb(message) if unpack_data.has_key('success'): thd = HandleThread(unpack_data) thd.start() else: pro = HandleProcess(unpack_data) pro.run() def main(): context = zmq.Context() socket = context.socket(zmq.REP) try: socket.bind("tcp://%s:%d" % (config.server, config.server_port)) except zmq.error.ZMQError,msg: print 'Bind failed:' + str(msg) with open(config.error_log,'a+') as fp: fp.writelines(['Bind failed: ',str(msg),'\n']) socket.close() context.term() exit(1) while True: try: message = socket.recv() handle_data(message) socket.send('OK') except BaseException,e: if str(e): err = str(e) else: err = 'Terminated' print err with open(config.error_log,'a+') as fp: fp.writelines([err,'\n']) socket.close() context.term() exit(2) if __name__ == "__main__": main()
這個結構能夠用於大型的master-syndic-minion框架,只要稍微修改下數據的寫入就行。
之後這個會用於web界面的執行結果統計。