使用Ansible + Flask + Celery搭建web平臺。
目錄結構
.
├── ansible_api
│ ├── ansible_playbook_inventory.py
│ ├── ansible_playbook.py
│ ├── ansible_task.py
│ ├── init.py
│ └── README.md
├── app.py
├── config
│ ├── Config.ini
│ └── hosts
├── database
│ ├── controller.py
│ ├── db_controller.py
│ └── init.py
├── README.md
├── requirements.txt
└── templates
└── index.htmlhtml
4 directories, 14 filespython
Ansible Task Api 這一部分跟命令行執行Ansible命令格式同樣
ansible_task.pyweb
#!/usr/bin/env python # coding:utf-8 import json from collections import namedtuple from ansible.parsing.dataloader import DataLoader from ansible.vars import VariableManager from ansible.inventory import Inventory from ansible.playbook.play import Play from ansible.executor.task_queue_manager import TaskQueueManager from ansible.plugins.callback import CallbackBase # 這裏爲封裝的返回信息,讓咱們獲取返回信息更加方便進行處理 class ResultCallback(CallbackBase): def __init__(self, *args): super(ResultCallback, self).__init__(display=None) self.status_ok = json.dumps({}, ensure_ascii=False) self.status_fail = json.dumps({}, ensure_ascii=False) self.status_unreachable = json.dumps({}, ensure_ascii=False) self.status_playbook = '' self.status_no_hosts = False self.host_ok = {} self.host_failed = {} self.host_unreachable = {} def v2_runner_on_ok(self, result): host = result._host.get_name() self.runner_on_ok(host, result._result) self.host_ok[host] = result def v2_runner_on_failed(self, result, ignore_errors=False): host = result._host.get_name() self.runner_on_failed(host, result._result, ignore_errors) self.host_failed[host] = result def v2_runner_on_unreachable(self, result): host = result._host.get_name() self.runner_on_unreachable(host, result._result) self.host_unreachable[host] = result class Task(): def __init__(self, module, command): self.module = module self.command = command def run(self): # 這裏跟ansible.cfg文件中的配置同樣 Options = namedtuple('Options', ['listtags', 'listtasks', 'listhosts', 'syntax', 'connection', 'module_path', 'forks', 'remote_user', 'private_key_file', 'ssh_common_args', 'ssh_extra_args', 'sftp_extra_args', 'scp_extra_args', 'become', 'become_method', 'become_user', 'verbosity', 'check'] ) variable_manager = VariableManager() loader = DataLoader() options = Options(listtags=False, listtasks=False, listhosts=False, syntax=False, connection='smart', module_path='/usr/lib/python2.6/site-packages/ansible/modules/', forks=100, remote_user='root', private_key_file=None, ssh_common_args=None, ssh_extra_args=None, sftp_extra_args=None, scp_extra_args=None, become=False, become_method=None, become_user='root', verbosity=None, check=False ) passwords = dict(vault_pass='secret') # 傳入的hosts文件 inventory = Inventory(loader=loader, variable_manager=variable_manager, host_list='config/hosts') variable_manager.set_inventory(inventory) # 要執行的task play_source = dict( name = "Ansible Play", hosts = 'all', gather_facts = 'no', tasks = [ dict(action=dict(module=self.module, args=self.command), register='shell_out'), #dict(action=dict(module='shell', args='id'), register='shell_out'), #dict(action=dict(module='shell', args=dict(msg='{{shell_out.stdout}}'))) ] ) play = Play().load(play_source, variable_manager=variable_manager, loader=loader) tqm = None try: tqm = TaskQueueManager( inventory=inventory, variable_manager=variable_manager, loader=loader, options=options, passwords=passwords, ) self.results_callback = ResultCallback() tqm._stdout_callback = self.results_callback result = tqm.run(play) finally: if tqm is not None: tqm.cleanup() def get_result(self): # 執行任務 self.run() result_all = {'success': {}, 'failed': {}, 'unreachable': {}} for host, result in self.results_callback.host_ok.items(): info = {} info['stdout'] = result._result['stdout'] info['delta'] = result._result['delta'] result_all['success'][host] = info for host, result in self.results_callback.host_failed.items(): if 'msg' in result._result: result_all['failed'][host] = result._result['msg'] for host, result in self.results_callback.host_unreachable.items(): if 'msg' in result._result: result_all['unreachable'][host] = result._result['msg'] return json.dumps(result_all, ensure_ascii=False,sort_keys=True, indent=2) if __name__ =='__main__': res=Task('shell', 'hostname') print res.get_result()
Ansible playbook,執行playbook文件
ansible_playbook.pyshell
#!/usr/bin/python #coding:utf8 import os import json from collections import namedtuple from ansible.inventory import Inventory from ansible.vars import VariableManager from ansible.parsing.dataloader import DataLoader from ansible.executor.playbook_executor import PlaybookExecutor from ansible.plugins.callback import CallbackBase from ansible.errors import AnsibleParserError class mycallback(CallbackBase): def __init__(self, *args): super(mycallback, self).__init__(display=None) self.status_ok = json.dumps({}, ensure_ascii=False) self.status_fail = json.dumps({}, ensure_ascii=False) self.status_unreachable = json.dumps({}, ensure_ascii=False) self.status_playbook = '' self.status_no_hosts = False self.host_ok = {} self.host_failed = {} self.host_unreachable = {} def v2_runner_on_ok(self, result): host = result._host.get_name() self.runner_on_ok(host, result._result) self.host_ok[host] = result def v2_runner_on_failed(self, result, ignore_errors=False): host = result._host.get_name() self.runner_on_failed(host, result._result, ignore_errors) self.host_failed[host] = result def v2_runner_on_unreachable(self, result): host = result._host.get_name() self.runner_on_unreachable(host, result._result) self.host_unreachable[host] = result def v2_playbook_on_no_hosts_matched(self): self.playbook_on_no_hosts_matched() self.status_no_hosts = True def v2_playbook_on_play_start(self, play): self.playbook_on_play_start(play.name) self.playbook_path = play.name class ansible_playbook(): # 初始化各項參數,根據需求修改 def __init__(self, playbook, host_list='/etc/ansible/hosts', ansible_cfg=None, passwords={}): self.playbook_path = playbook self.passwords = passwords Options = namedtuple('Options', ['listtags', 'listtasks', 'listhosts', 'syntax', 'connection','module_path','forks', 'remote_user', 'private_key_file', 'ssh_common_args', 'ssh_extra_args','sftp_extra_args', 'scp_extra_args', 'become', 'become_method', 'become_user','verbosity', 'check']) self.options = Options(listtags=False, listtasks=False, listhosts=False, syntax=False, connection='smart',module_path='/usr/lib/python2.6/site-packages/ansible/modules/', forks=100,remote_user='root', private_key_file=None, ssh_common_args=None, ssh_extra_args=None,sftp_extra_args=None, scp_extra_args=None, become=False, become_method=None, become_user='root',verbosity=None, check=False) if ansible_cfg != None: os.environ["ANSIBLE_CONFIG"] = ansible_cfg self.variable_manager = VariableManager() self.loader = DataLoader() self.inventory = Inventory(loader=self.loader, variable_manager=self.variable_manager, host_list=host_list) # 定義運行的方法和返回值 def run(self): #判斷playbook是否存在 if not os.path.exists(self.playbook_path): code = 1000 results = {'playbook': self.playbook_path, 'msg': self.playbook_path + ' playbook is not exist', 'flag': False} pbex = PlaybookExecutor(playbooks=[self.playbook_path], inventory=self.inventory, variable_manager=self.variable_manager, loader=self.loader, options=self.options, passwords=self.passwords) self.results_callback = mycallback() pbex._tqm._stdout_callback = self.results_callback try: code = pbex.run() except AnsibleParserError: code = 1001 results = {'playbook': self.playbook_path, 'msg': self.playbook_path + ' playbook have syntax error', 'flag': False} return code, results if self.results_callback.status_no_hosts: code = 1002 results = {'playbook': self.playbook_path, 'msg': self.results_callback.status_no_hosts, 'flag': False, 'executed': False} return code, results def get_result(self): result_all = {'success': {}, 'failed': {}, 'unreachable': {}} for host, result in self.results_callback.host_ok.items(): result_all['success'][host] = result._result for host, result in self.results_callback.host_failed.items(): if 'msg' in result._result: result_all['failed'][host] = result._result['msg'] for host, result in self.results_callback.host_unreachable.items(): if 'msg' in result._result: result_all['unreachable'][host] = result._result['msg'] return json.dumps(result_all, ensure_ascii=False,sort_keys=True, indent=2) if __name__ == '__main__': play_book = ansible_playbook(playbook='/etc/ansible/site.yml') play_book.run() print play_book.get_result()
Ansible playbook支持動態inventory傳入
ansible_playbook_inventory.py數據庫
#!/usr/bin/python #coding:utf8 import os import json from collections import namedtuple from ansible.inventory import Inventory from ansible.vars import VariableManager from ansible.parsing.dataloader import DataLoader from ansible.executor.playbook_executor import PlaybookExecutor from ansible.plugins.callback import CallbackBase from ansible.errors import AnsibleParserError from ansible.inventory.group import Group from ansible.inventory.host import Host class mycallback(CallbackBase): def __init__(self, *args): super(mycallback, self).__init__(display=None) self.status_ok = json.dumps({}, ensure_ascii=False) self.status_fail = json.dumps({}, ensure_ascii=False) self.status_unreachable = json.dumps({}, ensure_ascii=False) self.status_playbook = '' self.status_no_hosts = False self.host_ok = {} self.host_failed = {} self.host_unreachable = {} def v2_runner_on_ok(self, result): host = result._host.get_name() self.runner_on_ok(host, result._result) self.host_ok[host] = result def v2_runner_on_failed(self, result, ignore_errors=False): host = result._host.get_name() self.runner_on_failed(host, result._result, ignore_errors) self.host_failed[host] = result def v2_runner_on_unreachable(self, result): host = result._host.get_name() self.runner_on_unreachable(host, result._result) self.host_unreachable[host] = result def v2_playbook_on_no_hosts_matched(self): self.playbook_on_no_hosts_matched() self.status_no_hosts = True def v2_playbook_on_play_start(self, play): self.playbook_on_play_start(play.name) self.playbook_path = play.name class MyInventory(Inventory): """ this is my ansible inventory object. """ def __init__(self, resource, loader, variable_manager): """ resource的數據格式是一個列表字典,好比 { "group1": { "hosts": [{"hostname": "10.0.0.0", "port": "22", "username": "test", "password": "pass"}, ...], "vars": {"var1": value1, "var2": value2, ...} } } 若是你只傳入1個列表,這默認該列表內的全部主機屬於my_group組,好比 [{"hostname": "10.0.0.0", "port": "22", "username": "test", "password": "pass"}, ...] """ self.resource = resource self.inventory = Inventory(loader=loader, variable_manager=variable_manager, host_list=[]) self.gen_inventory() def my_add_group(self, hosts, groupname, groupvars=None): """ add hosts to a group """ my_group = Group(name=groupname) # if group variables exists, add them to group if groupvars: for key, value in groupvars.iteritems(): my_group.set_variable(key, value) # add hosts to group for host in hosts: # set connection variables hostname = host.get("hostname") hostip = host.get('ip', hostname) hostport = host.get("port") username = host.get("username") password = host.get("password") ssh_key = host.get("ssh_key") my_host = Host(name=hostname, port=hostport) my_host.set_variable('ansible_ssh_host', hostip) my_host.set_variable('ansible_ssh_port', hostport) my_host.set_variable('ansible_ssh_user', username) my_host.set_variable('ansible_ssh_pass', password) my_host.set_variable('ansible_ssh_private_key_file', ssh_key) # set other variables for key, value in host.iteritems(): if key not in ["hostname", "port", "username", "password"]: my_host.set_variable(key, value) # add to group my_group.add_host(my_host) self.inventory.add_group(my_group) def gen_inventory(self): """ add hosts to inventory. """ if isinstance(self.resource, list): self.my_add_group(self.resource, 'default_group') elif isinstance(self.resource, dict): for groupname, hosts_and_vars in self.resource.iteritems(): self.my_add_group(hosts_and_vars.get("hosts"), groupname, hosts_and_vars.get("vars")) class ansible_playbook(object): # 初始化各項參數,根據需求修改 def __init__(self, playbook, hosts,ansible_cfg=None, passwords={}): self.playbook_path = playbook self.hosts = hosts self.passwords = passwords Options = namedtuple('Options', ['listtags', 'listtasks', 'listhosts', 'syntax', 'connection', 'module_path', 'forks', 'remote_user', 'private_key_file', 'ssh_common_args', 'ssh_extra_args', 'sftp_extra_args', 'scp_extra_args', 'become', 'become_method', 'become_user', 'verbosity', 'check' ]) self.options = Options(listtags=False, listtasks=False, listhosts=False, syntax=False, connection='smart', module_path='/usr/lib/python2.6/site-packages/ansible/modules/', forks=100,remote_user='root', private_key_file=None, ssh_common_args=None, ssh_extra_args=None, sftp_extra_args=None, scp_extra_args=None, become=False, become_method=None, become_user='root', verbosity=None, check=False ) if ansible_cfg != None: os.environ["ANSIBLE_CONFIG"] = ansible_cfg self.variable_manager = VariableManager() self.loader = DataLoader() #self.inventory = Inventory(loader=self.loader, variable_manager=self.variable_manager, host_list=host_list) self.inventory = MyInventory(self.hosts, self.loader, self.variable_manager).inventory self.variable_manager.set_inventory(self.inventory) # 定義運行的方法和返回值 def run(self): #判斷playbook是否存在 if not os.path.exists(self.playbook_path): code = 1000 results = {'playbook': self.playbook_path, 'msg': self.playbook_path + ' playbook is not exist', 'flag': False } pbex = PlaybookExecutor(playbooks=[self.playbook_path], inventory=self.inventory, variable_manager=self.variable_manager, loader=self.loader, options=self.options, passwords=self.passwords ) self.results_callback = mycallback() pbex._tqm._stdout_callback = self.results_callback try: code = pbex.run() except AnsibleParserError: code = 1001 results = {'playbook': self.playbook_path, 'msg': self.playbook_path + ' playbook have syntax error', 'flag': False} return code, results if self.results_callback.status_no_hosts: code = 1002 results = {'playbook': self.playbook_path, 'msg': self.results_callback.status_no_hosts, 'flag': False, 'executed': False} return code, results def get_result(self): result_all = {'success': {}, 'failed': {}, 'unreachable': {}} for host, result in self.results_callback.host_ok.items(): result_all['success'][host] = result._result for host, result in self.results_callback.host_failed.items(): if 'msg' in result._result: result_all['failed'][host] = result._result['msg'] for host, result in self.results_callback.host_unreachable.items(): if 'msg' in result._result: result_all['unreachable'][host] = result._result['msg'] return json.dumps(result_all, ensure_ascii=False,sort_keys=True, indent=2) if __name__ == '__main__': # 動態資產信息傳入 hosts = { "group1": { "hosts": [{"hostname": "192.168.30.141", "port": "22", "username": "root", "password": "oracle"}], "vars": {"var1": "value1"} } } play_book = ansible_playbook(playbook='/etc/ansible/playbook_yaml/user.yml', hosts = hosts) play_book.run() print play_book.get_result()
目前位置Ansible API三個文件準備好了,下一步開始準備數據庫的信息錄入。json