有不少開源自動化運維工具都很好用如ansible/salt stack等,徹底不用重複造輪子。只不過,不少運維同窗學習Python以後,苦於沒小項目訓練,本篇演示用Python寫一個批量操做主機的工具,你們空餘時候能夠試着寫寫,完善完善。python
在運維工做中,古老的方式部署環境、上線代碼可能都須要手動在服務器上敲命令,不勝其煩。因此,腳本,自動化工具等仍是頗有必要的。我以爲一個批量操做工具應該考慮如下幾點:服務器
(1)本質上,就是到遠程主機上執行命令並返回結果。併發
(2)作到批量。也就是要併發對多臺機器進行操做。app
(3)將返回的結果,清晰地展現給用戶。運維
一般開源的主機批量管理工具備兩類,一類是有agent,如SaltStack、Puppet等;另外一類是無agent如ansible。雖然咱們不必重複造輪子,可是能夠試着寫一寫,一是加深對這類軟件原理的理解,二是練習Python。建議若是服務器規模在1000臺之內的用無agent的方式也能hold住;若是超過1000臺,用有agent的會好太多。ssh
接下來咱們一塊兒看看怎麼具體實現。函數
到遠程機器上執行命令,並返回結果,至少有兩種方式:一是用paramiko模塊;而是能夠創建機器互信,從中控執行ssh命令。工具
下面我把本身封裝好的代碼貼一下,是基於paramiko模塊封裝的,ssh的你們能夠本身實現:學習
import paramiko class SSHParamiko(object): err = "argument passwd or rsafile can not be None" def __init__(self, host, port, user, passwd=None, rsafile=None): self.h = host self.p = port self.u = user self.w = passwd self.rsa = rsafile def _connect(self): if self.w: return self.pwd_connect() elif self.rsa: return self.rsa_connect() else: raise ConnectionError(self.err) def _transfer(self): if self.w: return self.pwd_transfer() elif self.rsa: return self.rsa_transfer() else: raise ConnectionError(self.err) def pwd_connect(self): conn = paramiko.SSHClient() conn.set_missing_host_key_policy(paramiko.AutoAddPolicy()) conn.connect(self.h, self.p, self.u, self.w) return conn def rsa_connect(self): pkey = paramiko.RSAKey.from_private_key_file(self.rsa) conn = paramiko.SSHClient() conn.set_missing_host_key_policy(paramiko.AutoAddPolicy()) conn.connect(hostname=self.h, port=self.p, username=self.u, pkey=pkey) return conn def pwd_transfer(self): transport = paramiko.Transport(self.h, self.p) transport.connect(username=self.u, password=self.w) sftp = paramiko.SFTPClient.from_transport(transport) return sftp, transport def rsa_transfer(self): pkey = paramiko.RSAKey.from_private_key_file(self.rsa) transport = paramiko.Transport(self.h, self.p) transport.connect(username=self.u, pkey=pkey) sftp = paramiko.SFTPClient.from_transport(transport) return sftp, transport def run_cmd(self, cmd): conn = self._connect() stdin, stdout, stderr = conn.exec_command(cmd) code = stdout.channel.recv_exit_status() stdout, stderr = stdout.read(), stderr.read() conn.close() if not stderr: return code, stdout.decode() else: return code, stderr.decode() def get_file(self, remote, local): sftp, conn = self._transfer() sftp.get(remote, local) conn.close() def put_file(self, local, remote): sftp, conn = self._transfer() sftp.put(local, remote) conn.close()
固然,代碼還能夠重構一下哈。接下來咱們看下效果:測試
if __name__ == '__main__': h = "個人測試機IP" p = 22 u = "個人用戶名" w = "個人密碼" obj = SSHParamiko(h, p, u, w) r = obj.run_cmd("df -h") print(r[0]) print(r[1])
執行以後的結果是:
0 Filesystem Size Used Avail Use% Mounted on /dev/vda1 40G 3.4G 34G 9% / devtmpfs 3.9G 0 3.9G 0% /dev tmpfs 3.9G 0 3.9G 0% /dev/shm tmpfs 3.9G 410M 3.5G 11% /run tmpfs 3.9G 0 3.9G 0% /sys/fs/cgroup /dev/vdb 300G 12G 289G 4% /search/odin tmpfs 783M 0 783M 0% /run/user/0 tmpfs 783M 0 783M 0% /run/user/1000
能夠清晰看到第一行是命令執行的狀態碼,0表示成功,非0表示失敗;第二行開始就是咱們的命令返回結果。是否是比較清晰呢?
併發執行一般用Python3自帶的線程模塊就行,這裏我用的from concurrent.futures import ThreadPoolExecutor。而且當拿到結果以後,我還作了一些格式化輸出,好比綠色輸出表示成功,紅色輸出表示命令執行失敗,黃色表示提醒等。廢話很少說,直接看代碼吧!
from concurrent.futures import ThreadPoolExecutor class AllRun(object): def __init__(self, ssh_objs, cmds, max_worker=50): self.objs = [o for o in ssh_objs] self.cmds = [c for c in cmds] self.max_worker = max_worker # 最大併發線程數 self.success_hosts = [] # 存放成功機器數目 self.failed_hosts = [] # 存放失敗的機器IP self.mode = None self.func = None def serial_exec(self, obj): """單臺機器上串行執行命令,並返回結果至字典""" result = list() for c in self.cmds: r = obj.run_cmd(c) result.append([c, r]) return obj, result def concurrent_run(self): """併發執行""" future = ThreadPoolExecutor(self.max_worker) for obj in self.objs: try: future.submit(self.serial_exec, obj).add_done_callback(self.callback) except Exception as err: err = self.color_str(err, "red") print(err) future.shutdown(wait=True) def callback(self, future_obj): """回調函數,處理返回結果""" ssh_obj, rlist = future_obj.result() print(self.color_str("{} execute detail:".format(ssh_obj.h), "yellow")) is_success = True for item in rlist: cmd, [code, res] = item info = f"{cmd} | code => {code}\nResult:\n{res}" if code != 0: info = self.color_str(info, "red") is_success = False if ssh_obj.h not in self.failed_hosts: self.failed_hosts.append(ssh_obj.h) else: info = self.color_str(info, "green") print(info) if is_success: self.success_hosts.append(ssh_obj.h) if ssh_obj.h in self.failed_hosts: self.failed_hosts.remove(ssh_obj.h) def overview(self): """展現總的執行結果""" for i in self.success_hosts: print(self.color_str(i, "green")) print("-" * 30) for j in self.failed_hosts: print(self.color_str(j, "red")) info = "Success hosts {}; Failed hosts {}." s, f = len(self.success_hosts), len(self.failed_hosts) info = self.color_str(info.format(s, f), "yellow") print(info) @staticmethod def color_str(old, color=None): """給字符串添加顏色""" if color == "red": new = "\033[31;1m{}\033[0m".format(old) elif color == "yellow": new = "\033[33;1m{}\033[0m".format(old) elif color == "blue": new = "\033[34;1m{}\033[0m".format(old) elif color == "green": new = "\033[36;1m{}\033[0m".format(old) else: new = old return new if __name__ == '__main__': h1 = "adime01.shouji.sjs.ted" p1 = 22 u1 = "odin" w1 = "*****" h = "10.129.206.97" p = 22 u = "root" w = "*****" obj1 = SSHParamiko(h1, p1, u1, w1) obj = SSHParamiko(h, p, u, w) cmds = ["df -h", "ls"] all_obj = AllRun([obj1, obj], cmds) all_obj.concurrent_run() all_obj.overview()
上述代碼運行的結果:
從執行結果來看,高亮顯示,清新明瞭。既顯示了各個主機的各個命令執行狀態碼,返回結果,最後還彙總結果,成功了多少臺機器和失敗了多少臺機器。
咱們還能夠換一下執行的命令,讓命令執行失敗看看:
後期還能夠包裝一下,將主機、密碼、批量執行的命令寫在配置文件中;或再根據須要包裝成命令行工具,在平常運維工做中能夠適當減小人肉敲命令的繁瑣。