##JOB執行流程 先看下官網對於master端的工做流程的介紹:html
The Salt master works by always publishing commands to all connected minions and the minions decide if the command is meant for them by checking themselves against the command target. The typical lifecycle of a salt job from the perspective of the master might be as follows: 1) A command is issued on the CLI. For example, 'salt my_minion test.ping'. 使用命令行工具生成一個條命令,如:'salt my_minion test.ping'。 2) The 'salt' command uses LocalClient to generate a request to the salt master by connecting to the ReqServer on TCP:4506 and issuing the job. 'salt' 命令使用LocalClient鏈接本地的4506端口來發送命令。 3) The salt-master ReqServer sees the request and passes it to an available MWorker over workers.ipc. salt-master ReqServer接收請求,而後把請求經過workers.ipc分發到一個可用的MWorker中去。 4) A worker picks up the request and handles it. First, it checks to ensure that the requested user has permissions to issue the command. Then, it sends the publish command to all connected minions. For the curious, this happens in ClearFuncs.publish(). 一個worker線程認領請求而且處理它。首先,它檢查用戶是否有權限發送命令。而後,它發送一個publish類型的命令到全部鏈接的minions。這一步發生在ClearFuncs.publish()中。 5) The worker announces on the master event bus that it is about to publish a job to connected minions. This happens by placing the event on the master event bus (master_event_pull.ipc) where the EventPublisher picks it up and distributes it to all connected event listeners on master_event_pub.ipc. worker線程生成一個事件,說它準備將命令發送給minons。步驟是(1)worker將事件發送到master的事件總線中去(master_event_pull.ipc)。(2)EventPublisher獲取這個事件,並經過master_event_pub.ipc分發給全部的訂閱者。 6) The message to the minions is encrypted and sent to the Publisher via IPC on publish_pull.ipc. 發送個minions的消息加密後經過publish_pull.ipc發送給Publisher。 7) Connected minions have a TCP session established with the Publisher on TCP port 4505 where they await commands. When the Publisher receives the job over publish_pull, it sends the jobs across the wire to the minions for processing. 在線的minions經過TCP會話鏈接到master端的4505端口來等待命令。當Publisher在publish_pull接收到命令後,便把命令經過4505端口發送給minions。 8) After the minions receive the request, they decrypt it and perform any requested work, if they determine that they are targeted to do so. minions接收到請求後,首先解密請求,若是肯定命令是發送給本身的,便去執行命令。 9) When the minion is ready to respond, it publishes the result of its job back to the master by sending the encrypted result back to the master on TCP 4506 where it is again picked up by the ReqServer and forwarded to an available MWorker for processing. (Again, this happens by passing this message across workers.ipc to an available worker.) 當minion處理完命令後,便經過master的4506端口返回執行結果。master端的ReqServer接收到結果,再次將結果發送給MWorker去處理。(ReqServer是經過workers.ipc將消息分發給一個可用的worker線程的。) 10) When the MWorker receives the job it decrypts it and fires an event onto the master event bus (master_event_pull.ipc). (Again for the curious, this happens in AESFuncs._return(). MWorker接收這個job並解密它,而後它會在master的事件總線中發佈一個事件(master_event_pull.ipc)(這一步發生在AESFuncs._return()中)。 11) The EventPublisher sees this event and re-publishes it on the bus to all connected listeners of the master event bus (on master_event_pub.ipc). This is where the LocalClient has been waiting, listening to the event bus for minion replies. It gathers the job and stores the result. EventPublisher接收到這個事件,再次把它分發給全部的訂閱者(經過master_event_pub.ipc)。LocalClient就在這裏監聽事件,等待本身須要的結果。它蒐集並存儲命令執行結果。 12) When all targeted minions have replied or the timeout has been exceeded, the salt client displays the results of the job to the user on the CLI. 當全部的minions返回結果或者執行超時,salt客戶端在界面顯示結果。
##源碼分析python
下面介紹master執行salt模塊用到的幾個類,參照上面的流程閱讀源碼。session
###salt.master.Masterapp
建立ReqServer的代碼在run_reqserver()中:ide
def run_reqserver(self): reqserv = ReqServer( self.opts, self.key, self.master_key) reqserv.run()
###salt.master.ReqServertornado
打開salt.master.ReqServer
:工具
class ReqServer(object): ''' Starts up the master request server, minions send results to this interface. ''' def __init__(self, opts, key, mkey): ''' Create a request server :param dict opts: The salt options dictionary :key dict: The user starting the server and the AES key :mkey dict: The user starting the server and the RSA key :rtype: ReqServer :returns: Request server ''' self.opts = opts self.master_key = mkey # Prepare the AES key self.key = key def __bind(self): ''' Binds the reply server ''' dfn = os.path.join(self.opts['cachedir'], '.dfn') if os.path.isfile(dfn): try: os.remove(dfn) except os.error: pass self.process_manager = salt.utils.process.ProcessManager(name='ReqServer_ProcessManager') req_channels = [] for transport, opts in iter_transport_opts(self.opts): chan = salt.transport.server.ReqServerChannel.factory(opts) chan.pre_fork(self.process_manager) req_channels.append(chan) for ind in range(int(self.opts['worker_threads'])): self.process_manager.add_process(MWorker, args=(self.opts, self.master_key, self.key, req_channels, ), ) self.process_manager.run() def run(self): ''' Start up the ReqServer ''' try: self.__bind() except KeyboardInterrupt: log.warn('Stopping the Salt Master') raise SystemExit('\nExiting on Ctrl-c') def destroy(self): if hasattr(self, 'clients') and self.clients.closed is False: self.clients.setsockopt(zmq.LINGER, 1) self.clients.close() if hasattr(self, 'workers') and self.workers.closed is False: self.workers.setsockopt(zmq.LINGER, 1) self.workers.close() if hasattr(self, 'context') and self.context.closed is False: self.context.term() # Also stop the workers if hasattr(self, 'process_manager'): self.process_manager.kill_children() def __del__(self): self.destroy()
代碼比較簡單,主要的功能在_bind()方法中,它根據配置文件的中worker_threads
生成數個worker線程。oop
###salt.master.MWorker源碼分析
在salt.master.MWorker
類中,也是經過_bind()方法來接收請求的:post
def __bind(self): ''' Bind to the local port ''' # using ZMQIOLoop since we *might* need zmq in there zmq.eventloop.ioloop.install() self.io_loop = zmq.eventloop.ioloop.ZMQIOLoop() for req_channel in self.req_channels: req_channel.post_fork(self._handle_payload, io_loop=self.io_loop) # TODO: cleaner? Maybe lazily? self.io_loop.start()
核心語句在req_channel.post_fork(self._handle_payload, io_loop=self.io_loop)
,它將接收到的請求交給self._handle_payload
處理,咱們看下_handle_payload
方法:
@tornado.gen.coroutine def _handle_payload(self, payload): ''' The _handle_payload method is the key method used to figure out what needs to be done with communication to the server Example cleartext payload generated for 'salt myminion test.ping': {'enc': 'clear', 'load': {'arg': [], 'cmd': 'publish', 'fun': 'test.ping', 'jid': '', 'key': 'alsdkjfa.,maljf-==adflkjadflkjalkjadfadflkajdflkj', 'kwargs': {'show_jid': False, 'show_timeout': False}, 'ret': '', 'tgt': 'myminion', 'tgt_type': 'glob', 'user': 'root'}} :param dict payload: The payload route to the appropriate handler ''' key = payload['enc'] load = payload['load'] ret = {'aes': self._handle_aes, 'clear': self._handle_clear}[key](load) raise tornado.gen.Return(ret)
在代碼的最後一行能夠看到,若是key是'aes'的話就調用self._handle_aes方法,它是用來處理minion返回的結果的;若是key是'clear'的話就調用self._handle_clear方法,它是用來處理master發送的命令的。
看下self. _handle_clear
方法:
def _handle_clear(self, load): ''' Process a cleartext command :param dict load: Cleartext payload :return: The result of passing the load to a function in ClearFuncs corresponding to the command specified in the load's 'cmd' key. ''' log.trace('Clear payload received with command {cmd}'.format(**load)) if load['cmd'].startswith('__'): return False return getattr(self.clear_funcs, load['cmd'])(load), {'fun': 'send_clear'}
重點是最後一句,它根據load['cmd']
的值來調用self.clear_funcs
中的對應方法,執行salt模塊時,load['cmd']
的值是publish
。self.clear_funcs
是salt.master.ClearFuncs
的實例化對象,salt.master.ClearFuncs
介紹見下文。
self. _handle_aes
方法跟self. _handle_clear
方法相似:
def _handle_aes(self, data): ''' Process a command sent via an AES key :param str load: Encrypted payload :return: The result of passing the load to a function in AESFuncs corresponding to the command specified in the load's 'cmd' key. ''' if 'cmd' not in data: log.error('Received malformed command {0}'.format(data)) return {} log.trace('AES payload received with command {0}'.format(data['cmd'])) if data['cmd'].startswith('__'): return False return self.aes_funcs.run_func(data['cmd'], data)
當salt-minion返回命令的結果時data['cmd']
的值是_return
,看下run_func
的源碼可知其調用的是salt.master.AESFuncs
的_return
方法,salt.master.AESFuncs
介紹見下文。
###salt.master.ClearFuncs
ClearFuncs.publish方法開始的部分是進行身份認證,認證經過後會生成一條事件來講明即將發送消息:
payload = self._prep_pub(minions, jid, clear_load, extra)
self._prep_pub
中核心代碼是這一行:
self.event.fire_event(new_job_load, tagify([clear_load['jid'], 'new'], 'job'))
最後發送消息給minions:
self._send_pub(payload)
self._send_pub
方法很簡單,調用底層的消息隊列發送消息:
def _send_pub(self, load): ''' Take a load and send it across the network to connected minions ''' for transport, opts in iter_transport_opts(self.opts): chan = salt.transport.server.PubServerChannel.factory(opts) chan.publish(load)
###salt.master.AESFuncs
看下_return
方法源碼:
def _return(self, load): ''' Handle the return data sent from the minions. Takes the return, verifies it and fires it on the master event bus. Typically, this event is consumed by the Salt CLI waiting on the other end of the event bus but could be heard by any listener on the bus. :param dict load: The minion payload ''' try: salt.utils.job.store_job( self.opts, load, event=self.event, mminion=self.mminion) except salt.exception.SaltCacheError: log.error('Could not store job information for load: {0}'.format(load))
能夠看到,主要代碼在salt.utils.job.store_job
中,核心代碼在這裏:
if event: # If the return data is invalid, just ignore it log.info('Got return from {id} for job {jid}'.format(**load)) event.fire_event(load, tagify([load['jid'], 'ret', load['id']], 'job')) event.fire_ret_load(load)
往事件總線裏面發送消息。
##總結 這裏只是大體介紹了大體的流程,其中關於數據如何在消息隊列間流轉的,沒有細寫,之後有機會再單獨寫篇博客介紹下。