在咱們平常運維/運維開發工做中各類系統主要分爲兩大流派
本文主要討論下有agent側一些注意事項html
優勢node
缺點python
特色無侵入性agent:典型應用就是基於ssh ansible
優勢nginx
缺點git
經典client案例github
代碼應當簡潔,避免過多資源消耗
agent資源監控可使用prometheus的 client_golang ,默認會export 進程的cpu_use 、fd、mem等信息幫助咱們定位資源消耗golang
# HELP process_cpu_seconds_total Total user and system CPU time spent in seconds. # TYPE process_cpu_seconds_total counter process_cpu_seconds_total 38913.32 # HELP process_max_fds Maximum number of open file descriptors. # TYPE process_max_fds gauge process_max_fds 6.815744e+06 # HELP process_open_fds Number of open file descriptors. # TYPE process_open_fds gauge process_open_fds 15 # HELP process_resident_memory_bytes Resident memory size in bytes. # TYPE process_resident_memory_bytes gauge process_resident_memory_bytes 1.4659584e+07 # HELP process_start_time_seconds Start time of the process since unix epoch in seconds. # TYPE process_start_time_seconds gauge process_start_time_seconds 1.59350253732e+09 # HELP process_virtual_memory_bytes Virtual memory size in bytes. # TYPE process_virtual_memory_bytes gauge process_virtual_memory_bytes 1.201352704e+09 # HELP process_virtual_memory_max_bytes Maximum amount of virtual memory available in bytes. # TYPE process_virtual_memory_max_bytes gauge process_virtual_memory_max_bytes -1
舉例:如今要升級agent版本 from v1.0 to v1.1redis
如ansible-playbook 能夠參考我以前的文章 使用ansible-playbook實現dnsdist快速劫持工具json
咱們可使用下面python代碼將跑playbook封裝成一個方法,使用的時候只須要傳入 ip列表,yaml,和額外的變量dict
便可
咳咳:這個方案典型問題就是受限於單個ansible性能問題(不少小夥伴都被折磨過吧),固然能夠將大量的ip列表分片分發給多個ansible-server執行,再將結果merge一下segmentfault
t = PlaybookApi([ip], yaml_path, {"conf_dir": conf_dir, "bk_file_name": bk_file_name}) t.run()
from collections import namedtuple from ansible.parsing.dataloader import DataLoader from ansible.vars import VariableManager from ansible.inventory import Inventory from ansible.utils.vars import load_extra_vars from ansible.utils.vars import load_options_vars from ansible.executor.playbook_executor import PlaybookExecutor from ansible.plugins.callback import CallbackBase class ResultsCollector(CallbackBase): def __init__(self, *args, **kwargs): super(ResultsCollector, self).__init__(*args, **kwargs) self.host_ok = {} self.host_unreachable = {} self.host_failed = {} def v2_runner_on_unreachable(self, result): self.host_unreachable[result._host.get_name()] = result def v2_runner_on_ok(self, result, *args, **kwargs): self.host_ok[result._host.get_name()] = result def v2_runner_on_failed(self, result, *args, **kwargs): self.host_failed[result._host.get_name()] = result # class PlaybookApi(PlaybookExecutor): class PlaybookApi(PlaybookExecutor): def __init__(self, host_list, yaml_path, extra_vars): self.host_list = host_list self.yaml_path = yaml_path # self.kcache_path = kcache_path self.callback = ResultsCollector() self.extra_vars = extra_vars self.IpmiPlay() super(PlaybookApi, self).__init__(playbooks=[self.yaml_path], inventory=self.inventory, variable_manager=self.variable_manager, loader=self.loader, options=self.options, passwords={}) self._tqm._stdout_callback = self.callback def IpmiPlay(self): Options = namedtuple('Options', ['listtags', 'listtasks', 'listhosts', 'syntax', 'connection', 'module_path', 'forks', 'remote_user', 'private_key_file', 'ssh_common_args', 'ssh_extra_args', 'sftp_extra_args', 'scp_extra_args', 'become', 'become_method', 'become_user', 'verbosity', 'check', 'extra_vars']) self.options = Options(listtags=False, listtasks=False, listhosts=False, syntax=False, connection='ssh', module_path=None, forks=10, remote_user='', private_key_file=None, ssh_common_args='', ssh_extra_args='', sftp_extra_args='', scp_extra_args='', become=True, become_method='sudo', become_user='root', verbosity=3, check=False, extra_vars={}) self.loader = DataLoader() # create the variable manager, which will be shared throughout # the code, ensuring a consistent view of global variables variable_manager = VariableManager() variable_manager.extra_vars = load_extra_vars(loader=self.loader, options=self.options) variable_manager.options_vars = load_options_vars(self.options) self.variable_manager = variable_manager # create the inventory, and filter it based on the subset specified (if any) self.inventory = Inventory(loader=self.loader, variable_manager=self.variable_manager, host_list=self.host_list) self.variable_manager.set_inventory(self.inventory) self.variable_manager.extra_vars = self.extra_vars def get_result(self): # print("calling in get_result") self.results_raw = {'success': {}, 'failed': {}, "unreachable": {}} for host, result in self.callback.host_ok.items(): self.results_raw['success'][host] = result for host, result in self.callback.host_failed.items(): self.results_raw['failed'][host] = result for host, result in self.callback.host_unreachable.items(): self.results_raw['unreachable'][host] = result._result['msg'] return self.results_raw if __name__ == '__main__': h = ["127.0.0.1"] yaml = "systemd_stop.yaml" api = PlaybookApi(h, yaml, {"app": "falcon-judge"}) api.run() res = api.get_result() for k, v in res.items(): for kk, vv in v.items(): print(kk, vv._result)
以falcon-agent代碼爲例,代碼地址 https://github.com/ning1875/falcon-plus/tree/master/modules/agent 總體實現流程:
ps:原諒我那蜘蛛爬的字吧
實現分析
文件升級完後如何重啓服務呢:以systemd爲例只須要發送term信號給自身進程便可,即kill 進程pid
pid := os.Getpid() thisPro, _ := os.FindProcess(pid) thisPro.Signal(os.Kill)
agent如何管理版本: 在const中指定
// changelog: // 3.1.3: code refactor // 3.1.4: bugfix ignore configuration // 5.0.0: 支持經過配置控制是否開啓/run接口;收集udp流量數據;du某個目錄的大小 // 5.1.0: 同步插件的時候再也不使用checksum機制 // 5.1.1: 修復往多個transfer發送數據的時候crash的問題 // 5.1.2: ignore mount point when blocks=0 // 6.0.0: agent自升級,新增一些監控項 // 6.0.1: agent collect level // 6.0.2: 添加單核監控開關默認不打開,單核監控tag變動爲core=core0x ,添加mem.available.percent // 6.0.3: 增長sys.uptime // 6.0.4: 修復cpu.iowait>100的bug // 6.0.5: 添加進程採集監控,間隔30s // 6.0.6: 調整內置的採集func間隔 disk io相關和tcp 10s-->30s,agent_version 整數表明當前版本,去掉動態監控方法 // 6.0.7: ntp 支持chronyc ,服務監控rpc call 間隔調整爲一分鐘 // 6.0.8: 修改監控項抓取時間間隔, 10s只保留cpu,解決斷點問題 // 6.0.9: 修復dfa dfb塊設備採集,修復不一樣版本ss-s的bug // 6.1.0: 修復機器上主機名被改case,使ip轉化爲nxx-xx-xx的形式 const ( VERSION = "6.1.0" COLLECT_INTERVAL = time.Second URL_CHECK_HEALTH = "url.check.health" NET_PORT_LISTEN = "net.port.listen" DU_BS = "du.bs" PROC_NUM = "proc.num" UPTIME = "sys.uptime" )
管理員如何發起升級:只須要給hbs發起http請求打開升級開關
curl -X POST http://127.0.0.1:6031/agent/upgrade -d '{"wgeturl":"http://${your_cdn_addr}/file/open-falcon","version":"6.0.1","binfile_md5":"35ac8534c0b31237e844ef8ee2bb9b9e"}'
缺點
http-req --->hbs --->開啓升級開關--->檢查agent心跳信息中版本號,並檢查當前hbs升級隊列--->發送升級指令給agent ---> agent經過 升級命令中的url地址和目標版本號下載新的二進制(會有備份和回滾邏輯)--->agent check沒有問題後獲取自身的pid向本身發送kill信號 --->agent退出而後會被systemd拉起打到升級的目的--->新的心跳信息中版本checkok不會繼續升級
1. falcon-agent新加了採集指標,測試OK後在代碼中打上新的版本號好比6.0.0(現有是6.0.1) 2. 而後將新版 放到下載服務器的路徑下 wget http://${your_cdn_addr}/file/open-falcon/bin_6.0.1 3. 而後向hbs 發送升級的http請求(這裏有個保護機制:只能在hbs本機發起) 4. 而後經過hbs 的http接口查詢當前心跳上來的agent的版本查看升級進度 ,curl -s http://localhost:6031/agentversions |python -m "json.tool" 5. 同時須要鏈接的redis集羣觀察 agent_upgrade_set 這個set的值,redis-cli -h ip -p port -c smembers agent_upgrade_set & scard agent_upgrade_set 6. 目前看併發2000能夠把一臺下載的nginx萬兆網卡流量打滿。1.24GB/s ## falcon-agent 自升級命令 curl -X POST http://127.0.0.1:6031/agent/upgrade -d '{"wgeturl":"http://${your_cdn_addr}/file/open-falcon","version":"6.0.1","binfile_md5":"35ac8534c0b31237e844ef8ee2bb9b9e"}' curl -X GET http://127.0.0.1:6031/agent/upgrade/nowargs {"msg":"success","data":{"type":0,"wgeturl":"http://${your_cdn_addr}/file/open-falcon","version":"6.0.1","binfile_md5":"35ac8534c0b31237e844ef8ee2bb9b9e","cfgfile_md5":""}} curl http://127.0.0.1:6031/agentversions {"msg":"success","data":{"n3-021-225":"6.0.1"}} curl -X DELETE http://127.0.0.1:6031/agent/upgrade {"msg":"success","data":"取消升級成功"} uri: url: http://127.0.0.1:6031/agent/upgrade method: POST body: {"wgeturl":"http://${your_cdn_addr}/file/open-falcon","version":"6.0.2","binfile_md5":"f5c597f15e379a77d1e2ceeec7bd99a8"} status_code: 200 body_format: json