ansible.cfg中開啓callback功能python
callback_plugins = /usr/share/ansible/plugins/callback #指定路徑後,callback文件要放到這個目錄 bin_ansible_callbacks = True callback_whitelist = timer #指定腳本中定義的callback的名稱
官網示例:shell
# cat /usr/local/lib/python3.5/dist-packages/ansible/plugins/callback/log_plays.py #默認路徑
# Make coding more python3-ish from __future__ import (absolute_import, division, print_function) __metaclass__ = type from datetime import datetime from ansible.plugins.callback import CallbackBase class CallbackModule(CallbackBase): """ This callback module tells you how long your plays ran for. """ CALLBACK_VERSION = 2.0 CALLBACK_TYPE = 'aggregate' CALLBACK_NAME = 'timer' #配置文件中定義的callback名稱 CALLBACK_NEEDS_WHITELIST = True def __init__(self): super(CallbackModule, self).__init__() self.start_time = datetime.now() def days_hours_minutes_seconds(self, runtime): minutes = (runtime.seconds // 60) % 60 r_seconds = runtime.seconds - (minutes * 60) return runtime.days, runtime.seconds // 3600, minutes, r_seconds def playbook_on_stats(self, stats): self.v2_playbook_on_stats(stats) def v2_playbook_on_stats(self, stats): end_time = datetime.now() runtime = end_time - self.start_time self._display.display("Playbook run took %s days, %s hours, %s minutes, %s seconds" % (self.days_hours_minutes_seconds(runtime)))
效果圖:數據庫
ansible自帶的log插件json
from __future__ import (absolute_import, division, print_function) __metaclass__ = type import os import time import json from ansible.module_utils._text import to_bytes from ansible.plugins.callback import CallbackBase class CallbackModule(CallbackBase): """ logs playbook results, per host, in /var/log/ansible/hosts """ CALLBACK_VERSION = 2.0 CALLBACK_TYPE = 'notification' CALLBACK_NAME = 'log_plays' #callback name,須要在ansible配置文件中指定 若是下面參數爲False,可忽略 CALLBACK_NEEDS_WHITELIST = True TIME_FORMAT="%b %d %Y %H:%M:%S" MSG_FORMAT="%(now)s - %(category)s - %(data)s\n\n" def __init__(self): super(CallbackModule, self).__init__() if not os.path.exists("/var/log/ansible/hosts"): os.makedirs("/var/log/ansible/hosts") def log(self, host, category, data): if type(data) == dict: if '_ansible_verbose_override' in data: # avoid logging extraneous data data = 'omitted' else: data = data.copy() invocation = data.pop('invocation', None) data = json.dumps(data) if invocation is not None: data = json.dumps(invocation) + " => %s " % data path = os.path.join("/var/log/ansible/hosts", host) now = time.strftime(self.TIME_FORMAT, time.localtime()) msg = to_bytes(self.MSG_FORMAT % dict(now=now, category=category, data=data)) with open(path, "ab") as fd: fd.write(msg) def runner_on_failed(self, host, res, ignore_errors=False): self.log(host, 'FAILED', res) def runner_on_ok(self, host, res): self.log(host, 'OK', res) def runner_on_skipped(self, host, item=None): self.log(host, 'SKIPPED', '...') def runner_on_unreachable(self, host, res): self.log(host, 'UNREACHABLE', res) def runner_on_async_failed(self, host, res, jid): self.log(host, 'ASYNC_FAILED', res) def playbook_on_import_for_host(self, host, imported_file): self.log(host, 'IMPORTED', imported_file) def playbook_on_not_import_for_host(self, host, missing_file): self.log(host, 'NOTIMPORTED', missing_file)
以上示例爲將執行結果寫入到 /var/log/ansible/hosts目錄以host爲名的文件裏,api
#修改log_plays腳本並進行簡單說明bash
#!/usr/bin/env python #-*-coding:utf-8 -*-
from __future__ import (absolute_import, division, print_function) __metaclass__ = type import os import time import json from ansible.module_utils._text import to_bytes from ansible.plugins.callback import CallbackBase class CallbackModule(CallbackBase): """ logs playbook results, per host, in /var/log/ansible/hosts """ CALLBACK_VERSION = 2.0 CALLBACK_TYPE = 'notification' CALLBACK_NAME = 'log_plays' #此處爲callback_name,須要在配置文件中指定 CALLBACK_NEEDS_WHITELIST = True #爲True時,以上必作
#定義的格式,自定義便可 TIME_FORMAT="%b %d %Y %H:%M:%S" MSG_FORMAT="%(now)s - %(category)s - %(data)s\n\n" MSG_FORMAT1="%(data)s\n\n" def __init__(self): super(CallbackModule, self).__init__() if not os.path.exists("/var/log/ansible/hosts"): os.makedirs("/var/log/ansible/hosts")
#定義log函數的目的是將處理後的執行結果寫到文件,我這裏直接display在屏幕上,這裏能夠自定義一個寫入到數據庫的函數, def log(self, host, category, data): #默認的執行結果爲一個字典,即data在這裏爲一個字典 result_last = json.dumps(self.option_result(data)) #定義一個函數,接收執行的結果,因爲結果不支持字典數據,因此只能dumps成str self._display.display(result_last) #將執行結果在屏幕上顯示出來(不支持print打印)
#如下爲相關格式化內容
path = os.path.join("/var/log/ansible/hosts", host) now = time.strftime(self.TIME_FORMAT, time.localtime()) # msg = to_bytes(self.MSG_FORMAT % dict(now=now, category=category, data=data)) # msg1 = to_bytes(self.MSG_FORMAT1 % dict(data=data)) # with open(path, "ab") as fd: # fd.write(msg) #定義函數,解析執行結果,並返回給log函數(注:此函數裏代碼可直接寫到log函數裏,此處爲了區分清楚,單寫一個) def option_result(self,msg): result = {} result['stderr_lines'] = data_result['stderr_lines'] result['start_time'] = data_result['start'] result['end_time'] = data_result['end'] result['stderr'] = data_result['stderr'] return result
def runner_on_failed(self, host, res, ignore_errors=False): self.log(host, 'FAILED', res) def runner_on_ok(self, host, res): self.log(host, 'OK', res) def runner_on_skipped(self, host, item=None): self.log(host, 'SKIPPED', '...') def runner_on_unreachable(self, host, res): self.log(host, 'UNREACHABLE', res) def runner_on_async_failed(self, host, res, jid): self.log(host, 'ASYNC_FAILED', res) def playbook_on_import_for_host(self, host, imported_file): self.log(host, 'IMPORTED', imported_file) def playbook_on_not_import_for_host(self, host, missing_file): self.log(host, 'NOTIMPORTED', missing_file)
結果(沒有通過解析處理的完整內容):ssh
通過解析:async
#而後能夠自定義腳本,將結果發送郵件給管理員,或者寫入到數據庫,此處再也不詳細介紹ide
首先看一下簡單示例(ad-hoc),python_version == 2.3.1函數
#!/usr/bin/env python # -*-coding:utf-8 -*- import json from collections import namedtuple from ansible.parsing.dataloader import DataLoader from ansible.vars import VariableManager from ansible.inventory import Inventory from ansible.playbook.play import Play from ansible.executor.task_queue_manager import TaskQueueManager from ansible.plugins.callback import CallbackBase # 自定義運行callback,即運行完api後調用v2_runner_on_ok函數,打印host和result class ResultsCallback(CallbackBase): def __init__(self, *args, **kwargs): super(ResultsCallback, self).__init__(*args, **kwargs) #初始化父類 self.host_ok = {} self.host_unreachable = {} self.host_failed = {} def v2_runner_on_unreachable(self, result): self.host_unreachable[result._host.get_name()] = result def v2_runner_on_ok(self, result, *args, **kwargs): self.host_ok[result._host.get_name()] = result def v2_runner_on_failed(self, result, *args, **kwargs): self.host_failed[result._host.get_name()] = result # 建立一個Options類型,列表裏爲屬性,這裏用來設置ansible執行時的參數,如ask_pass,sudo_user等等(namedtuple建立一個和tuple相似的對象,並且對象擁有能夠訪問的屬性) Options = namedtuple('Options', ['connection', 'module_path', 'forks', 'become', 'become_method', 'become_user', 'check', 'diff']) # initialize needed objects loader = DataLoader() # 能夠經過options.(屬性) 的方法去獲取屬性的值,如獲取connection的值能夠直接寫成options.connection options = Options(connection='ssh', module_path='/path/to/mymodules', forks=100, become=None, become_method=None, become_user=None, check=False, diff=False) passwords = dict(vault_pass='secret') # Instantiate our ResultCallback for handling results as they come in results_callback = ResultsCallback() # 管理變量的類,包括主機、組、擴展等變量 variable_manager = VariableManager() # 加載inventory變量,這裏的sources的值能夠是hosts文件,也能夠是ip列表,如['xxx.xxx.xx.x','xx.xx.xx.xx'] inventory = Inventory(loader=loader, variable_manager=variable_manager, host_list='/etc/ansible/hosts') # 建立一個任務 play_source = dict( name="Ansible Play", hosts='test', gather_facts='no', tasks=[ dict(action=dict(module='shell', args='ls /home'), register='shell_out'), dict(action=dict(module='debug', args=dict(msg='{{shell_out.stdout}}'))) ] ) play = Play().load(play_source, variable_manager=variable_manager, loader=loader) # actually run it tqm = None try: tqm = TaskQueueManager( inventory=inventory, variable_manager=variable_manager, loader=loader, options=options, passwords=passwords, stdout_callback=results_callback, ) result = tqm.run(play) finally: if tqm is not None: tqm.cleanup() #print (results_callback.host_ok.items()) #打印執行成功後的結果 #執行的結果: dict_items([('192.168.132.130', <ansible.executor.task_result.TaskResult object at 0x7f8eeba6d710>), ('10.10.10.11', <ansible.executor.task_result.TaskResult object at 0x7f8eeba6db38>)] ##定義字典用於接收或者處理結果 result_raw = {'success':{},'failed':{},'unreachable':{}} #循環打印這個結果,success,failed,unreachable須要每一個都定義一個 for host,result in results_callback.host_ok.items(): result_raw['success'][host] = result._result #結果下面有__host和_result屬性較爲經常使用
for host,result in results_callback.host_failed.items(): result_raw['failed'][host] = result._result for host,result in results_callback.host_unreachable.items(): result_raw['unreachable'][host] = result._result print (result_raw)
結果
{'success': {'10.10.10.11': {'changed': False, 'msg': 'detail\nerror.logs\nmanage.py\nMyDevOps\noperations\nreports\nscanhosts\nstatic\ntemplates\ntest_ansible.py\nusers', '_ansible_verbose_always': True, 'failed': False, '_ansible_no_log': False},
'192.168.132.130': {'changed': False, 'msg': 'detail\nerror.logs\nmanage.py\nMyDevOps\noperations\nreports\nscanhosts\nstatic\ntemplates\ntest_ansible.py\nusers', '_ansible_verbose_always': True, 'failed': False, '_ansible_no_log': False}}, 'unreachable': {}, 'failed': {}}
playbook 自定義示例
#!/usr/bin/env python import json from collections import namedtuple from ansible.parsing.dataloader import DataLoader from ansible.vars import VariableManager from ansible.inventory import Inventory from ansible.executor.playbook_executor import PlaybookExecutor from ansible.plugins.callback import CallbackBase class ResultsCallback(CallbackBase): def __init__(self,*args,**kwargs): super(ResultsCallback,self).__init__(*args,**kwargs) self.task_ok = {} self.task_unreachable = {} self.task_failed = {} self.task_skipped = {} self.task_stats = {} def v2_runner_on_unreachable(self, result): self.task_unreachable[result._host.get_name()] = result def v2_runner_on_ok(self, result, *args, **kwargs): self.task_ok[result._host.get_name()] = result def v2_runner_on_failed(self, result, *args, **kwargs): self.task_failed[result._host.get_name()] = result def v2_runner_on_skipped(self, result, *args, **kwargs): self.task_skipped[result._host.get_name()] = result def v2_runner_on_stats(self, result, *args, **kwargs): self.task_stats[result._host.get_name()] = result ## 用來加載解析yml文件或者json內容 loader = DataLoader() variable_manager = VariableManager() results_callback = ResultsCallback() # 根據inventory加載對應變量,此處host_list參數能夠有兩種格式: # 1: hosts文件(須要), # 2: 能夠是IP列表,此處使用IP列表 inventory = Inventory(loader=loader, variable_manager=variable_manager,host_list='/etc/ansible/hosts') variable_manager.set_inventory(inventory) passwords=None #初始化 Options = namedtuple('Options', ['connection', 'remote_user', 'verbosity', 'ack_pass', 'module_path', 'forks', 'become', 'become_method', 'become_user', 'check', 'listhosts', 'listtasks', 'listtags', 'syntax',]) #初始化須要的對象 options = Options(connection='smart', remote_user='root', ack_pass=None, forks=100, verbosity=5, module_path=None, become=True, become_method='sudo', become_user='root', check=None, listhosts=None, listtasks=None, listtags=None, syntax=None) playbook = PlaybookExecutor(playbooks=['/root/test.yml'],inventory=inventory, variable_manager=variable_manager, loader=loader,options=options,passwords=passwords) playbook._tqm._stdout_callback = results_callback result = playbook.run() ##定義字典用於接收或者處理結果 result_raw = {'success':{},'failed':{},'unreachable':{},'skipped':{},'status':{}} #循環打印這個結果,success,failed,unreachable須要每一個都定義一個 for host,result in results_callback.task_ok.items(): result_raw['success'][host] = result._result for host,result in results_callback.task_failed.items(): result_raw['failed'][host] = result._result for host,result in results_callback.task_unreachable.items(): result_raw['unreachable'][host] = result._result for host,result in results_callback.task_skipped.items(): result_raw['skipped'][host] = result._result for host,result in results_callback.task_stats.items(): result_raw['status'][host] = result._result print (result_raw)
打印結果
{'status': {}, 'failed': {'10.10.10.11': {'changed': True, 'rc': 127, 'stdout_lines': [], 'stderr_lines': ['/bin/sh: ifcsonfig: 未找到命令'], 'end': '2018-06-05 16:47:46.441073', 'delta': '0:00:00.011661', 'invocation': {'module_args': {'chdir': None, 'warn': True, 'executable': None, '_uses_shell': True, 'creates': None, '_raw_params': 'ifcsonfig', 'removes': None}}, 'stderr': '/bin/sh: ifcsonfig: 未找到命令', '_ansible_parsed': True, 'start': '2018-06-05 16:47:46.429412', 'cmd': 'ifcsonfig', '_ansible_no_log': False, 'failed': True, 'stdout': ''}}, 'success': {}, 'skipped': {}, 'unreachable': {}}
#!/usr/bin/env python import json from collections import namedtuple from ansible.parsing.dataloader import DataLoader from ansible.vars import VariableManager from ansible.inventory import Inventory from ansible.executor.playbook_executor import PlaybookExecutor from ansible.plugins.callback import CallbackBase from ansible.playbook.play import Play from ansible.executor.task_queue_manager import TaskQueueManager class ResultsCallback(CallbackBase): def __init__(self,*args,**kwargs): super(ResultsCallback,self).__init__(*args,**kwargs) self.task_ok = {} self.task_unreachable = {} self.task_failed = {} self.task_skipped = {} self.task_stats = {} # self.host_ok = {} # self.host_unreachable = {} # self.host_failed = {} def v2_runner_on_unreachable(self, result): self.task_unreachable[result._host.get_name()] = result def v2_runner_on_ok(self, result, *args, **kwargs): self.task_ok[result._host.get_name()] = result def v2_runner_on_failed(self, result, *args, **kwargs): self.task_failed[result._host.get_name()] = result def v2_runner_on_skipped(self, result, *args, **kwargs): self.task_skipped[result._host.get_name()] = result def v2_runner_on_stats(self, result, *args, **kwargs): self.task_stats[result._host.get_name()] = result #chushihua class Runner(object): def __init__(self,*args,**kwargs): self.loader = DataLoader() self.variable_manager = VariableManager() self.results_callback = ResultsCallback() self.inventory = Inventory(loader=self.loader, variable_manager=self.variable_manager, host_list='/etc/ansible/hosts') self.variable_manager.set_inventory(self.inventory) self.passwords = None Options = namedtuple('Options', ['connection', 'remote_user', 'verbosity', 'ack_pass', 'module_path', 'forks', 'become', 'become_method', 'become_user', 'check', 'listhosts', 'listtasks', 'listtags', 'syntax', ]) # 初始化須要的對象 self.options = Options(connection='smart', remote_user='root', ack_pass=None, forks=100, verbosity=5, module_path=None, become=True, become_method='sudo', become_user='root', check=None, listhosts=None, listtasks=None, listtags=None, syntax=None) def run_ad_hoc(self): play_source = dict( name="Ansible Play", hosts='test', gather_facts='no', tasks=[ dict(action=dict(module='shell', args='ls /home'), register='shell_out'), dict(action=dict(module='debug', args=dict(msg='{{shell_out.stdout}}'))) ] ) play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader) tqm = None try: tqm = TaskQueueManager( inventory=self.inventory, variable_manager=self.variable_manager, loader=self.loader, options=self.options, passwords=self.passwords, stdout_callback=self.results_callback, ) result = tqm.run(play) finally: if tqm is not None: tqm.cleanup() ##定義字典用於接收或者處理結果 result_raw = {'success': {}, 'failed': {}, 'unreachable': {}, 'skipped': {}, 'status': {}} # 循環打印這個結果,success,failed,unreachable須要每一個都定義一個 for host, result in self.results_callback.task_ok.items(): result_raw['success'][host] = result._result for host, result in self.results_callback.task_failed.items(): result_raw['failed'][host] = result._result for host, result in self.results_callback.task_unreachable.items(): result_raw['unreachable'][host] = result._result return result_raw def run_playbook(self): playbook = PlaybookExecutor(playbooks=['/root/test.yml'], inventory=self.inventory, variable_manager=self.variable_manager, loader=self.loader, options=self.options, passwords=self.passwords) playbook._tqm._stdout_callback = self.results_callback results = playbook.run() ##定義字典用於接收或者處理結果 result_raw = {'success': {}, 'failed': {}, 'unreachable': {}, 'skipped': {}, 'status': {}} # 循環打印這個結果,success,failed,unreachable須要每一個都定義一個 for host, result in self.results_callback.task_ok.items(): result_raw['success'][host] = result._result for host, result in self.results_callback.task_failed.items(): result_raw['failed'][host] = result._result for host, result in self.results_callback.task_unreachable.items(): result_raw['unreachable'][host] = result._result for host, result in self.results_callback.task_skipped.items(): result_raw['skipped'][host] = result._result for host, result in self.results_callback.task_stats.items(): result_raw['status'][host] = result._result return result_raw c = Runner() print (c.run_ad_hoc(),c.run_playbook())
#!/usr/bin/env python import json from collections import namedtuple from ansible.parsing.dataloader import DataLoader from ansible.vars.manager import VariableManager from ansible.inventory.manager import InventoryManager from ansible.executor.playbook_executor import PlaybookExecutor from ansible.plugins.callback import CallbackBase from ansible.playbook.play import Play from ansible.executor.task_queue_manager import TaskQueueManager class ResultsCallback(CallbackBase): def __init__(self,*args,**kwargs): super(ResultsCallback,self).__init__(*args,**kwargs) self.task_ok = {} self.task_unreachable = {} self.task_failed = {} self.task_skipped = {} self.task_stats = {} # self.host_ok = {} # self.host_unreachable = {} # self.host_failed = {} def v2_runner_on_unreachable(self, result): self.task_unreachable[result._host.get_name()] = result def v2_runner_on_ok(self, result, *args, **kwargs): self.task_ok[result._host.get_name()] = result def v2_runner_on_failed(self, result, *args, **kwargs): self.task_failed[result._host.get_name()] = result def v2_runner_on_skipped(self, result, *args, **kwargs): self.task_skipped[result._host.get_name()] = result def v2_runner_on_stats(self, result, *args, **kwargs): self.task_stats[result._host.get_name()] = result #chushihua class Runner(object): def __init__(self,*args,**kwargs): self.loader = DataLoader() self.results_callback = ResultsCallback() self.inventory = InventoryManager(loader=self.loader,sources=['/etc/ansible/hosts']) self.variable_manager = VariableManager(loader=self.loader, inventory=self.inventory) self.passwords = None Options = namedtuple('Options', ['connection', 'remote_user', 'ask_sudo_pass', 'verbosity', 'ack_pass', 'module_path', 'forks', 'become', 'become_method', 'become_user', 'check', 'listhosts', 'listtasks', 'listtags', 'syntax', 'sudo_user', 'sudo', 'diff']) # 初始化須要的對象 self.options = Options(connection='smart', remote_user=None, ack_pass=None, sudo_user=None, forks=5, sudo=None, ask_sudo_pass=False, verbosity=5, module_path=None, become=None, become_method=None, become_user=None, check=False, diff=False, listhosts=None, listtasks=None, listtags=None, syntax=None) def run_ad_hoc(self): play_source = dict( name="Ansible Play ad-hoc", hosts='test', gather_facts='no', tasks=[ dict(action=dict(module='shell', args='ls /home'), register='shell_out'), #dict(action=dict(module='debug', args=dict(msg='{{shell_out.stdout}}'))) ] ) play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader) tqm = None try: tqm = TaskQueueManager( inventory=self.inventory, variable_manager=self.variable_manager, loader=self.loader, options=self.options, passwords=self.passwords, stdout_callback=self.results_callback, ) result = tqm.run(play) finally: if tqm is not None: tqm.cleanup() ##定義字典用於接收或者處理結果 result_raw = {'success': {}, 'failed': {}, 'unreachable': {}, 'skipped': {}, 'status': {}} # 循環打印這個結果,success,failed,unreachable須要每一個都定義一個 for host, result in self.results_callback.task_ok.items(): result_raw['success'][host] = result._result for host, result in self.results_callback.task_failed.items(): result_raw['failed'][host] = result._result for host, result in self.results_callback.task_unreachable.items(): result_raw['unreachable'][host] = result._result return result_raw def run_playbook(self): playbook = PlaybookExecutor(playbooks=['/root/test.yml'], inventory=self.inventory, variable_manager=self.variable_manager, loader=self.loader, options=self.options, passwords=self.passwords) playbook._tqm._stdout_callback = self.results_callback results = playbook.run() ##定義字典用於接收或者處理結果 result_raw = {'success': {}, 'failed': {}, 'unreachable': {}, 'skipped': {}, 'status': {}} # 循環打印這個結果,success,failed,unreachable須要每一個都定義一個 for host, result in self.results_callback.task_ok.items(): result_raw['success'][host] = result._result for host, result in self.results_callback.task_failed.items(): result_raw['failed'][host] = result._result for host, result in self.results_callback.task_unreachable.items(): result_raw['unreachable'][host] = result._result for host, result in self.results_callback.task_skipped.items(): result_raw['skipped'][host] = result._result for host, result in self.results_callback.task_stats.items(): result_raw['status'][host] = result._result return result_raw c = Runner() print (c.run_ad_hoc(),c.run_playbook())