線上有一套k8s集羣,部署了不少應用。如今須要對一些基礎服務作一些常規檢測,好比:html
上面這麼多要求,有一大部分,都須要遠程執行命令。那麼如何使用python來執行遠程命令呢?node
使用paramiko模塊便可!python
安裝paramiko模塊mysql
pip3 install paramiko
這裏,我封裝了一個函數ssh2,代碼以下:linux
import paramiko def ssh2(ip, username, passwd, cmd): """ 使用ssh鏈接遠程服務器執行命令 :param username: 用戶名 :param passwd: 密碼 :param cmd: 執行的命令 :return: """ try: ssh = paramiko.SSHClient() # 建立一個新的SSHClient實例 # 設置host key,若是在"known_hosts"中沒有保存相關的信息,SSHClient 默認行爲是拒絕鏈接,會提示yes/no ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(ip, 22, username, passwd, timeout=1) # 鏈接遠程服務器,超時時間1秒 stdin, stdout, stderr = ssh.exec_command(cmd) # 執行命令 out = stdout.readlines() # 執行結果,readlines會返回列表 ssh.close() # 關閉ssh鏈接 return out except Exception as e: print(e) return False
執行此函數,會返回一個列表。由於是使用readlines,將結果轉換爲列表了!redis
若是執行時,輸出:sql
Please use EllipticCurvePublicKey.public_bytes to obtain both compressed and uncompressed point encoding.
paramiko 2.4.2 依賴 cryptography,而最新的cryptography==2.5裏有一些棄用的API。json
刪掉cryptography,安裝2.4.2,就不會報錯了。bootstrap
pip uninstall cryptography pip install cryptography==2.4.2
本文參考連接:api
https://blog.51cto.com/wangfeng7399/2376115
咱們須要獲取多臺服務器的時間,而且還須要對比時間差。那麼最簡單辦法,就是獲取時間戳,它是一段數字,那麼數字之間,就能夠作減法了!
使用 date +%s 命令,就能夠獲取時間戳了
root@localhost:~# date +%s 1547546824
怎麼去對比,每一天服務器的時間戳呢?上面已經獲取到時間戳了,關鍵問題是,如何對比?
這裏須要構造一個數據字典,將每一臺服務器的ip以及時間戳存儲一下,數據格式以下:
{ 'ip地址': 時間戳, ... }
首先從字典裏面取出第一個值,因爲python 3.5是無需的,因此取出的數據,每次可能不同。不過沒有關係,取出以後,再刪除便可!
而後將取出的第一個值,和字典中的其餘值,作對比便可!注意:時間戳要轉換爲int類型才行!
#!/usr/bin/env python3 # coding: utf-8 import json import paramiko def ssh2(ip, username, passwd, cmd): """ 使用ssh鏈接遠程服務器執行命令 :param username: 用戶名 :param passwd: 密碼 :param cmd: 執行的命令 :return: """ try: ssh = paramiko.SSHClient() # 建立一個新的SSHClient實例 # 設置host key,若是在"known_hosts"中沒有保存相關的信息,SSHClient 默認行爲是拒絕鏈接,會提示yes/no ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(ip, 22, username, passwd, timeout=1) # 鏈接遠程服務器,超時時間1秒 stdin, stdout, stderr = ssh.exec_command(cmd) # 執行命令 out = stdout.readlines() # 執行結果,readlines會返回列表 ssh.close() # 關閉ssh鏈接 return out except Exception as e: print(e) return False # 服務器列表 ip_list = ["192.168.0.172","192.168.0.173"] username = "root" passwd = "root" cmd = "date +%s" # 獲取時間戳命令 result_dic = {} # 執行結果 for ip in ip_list: # 遍歷ip列表 if not result_dic.get(ip): # 添加鍵值對 res = ssh2(ip,username,passwd,cmd) if res: res = json.loads(res[0]) result_dic[ip] = res print("添加鍵值,ip: ", ip,"value: ",res) else: print("ssh鏈接服務器失敗,ip: %s" %ip) print("結果字典",result_dic) # 獲取第一個ip以及時間戳 first_ip = "" first_val = "" for i in result_dic: # 遍歷結果字典 # 獲取第一個ip以及時間戳 first_ip = i res = ssh2(i,username,passwd,cmd) first_val = json.loads(res[0]) print("第一個ip: ", i,"value: ",first_val) result_dic.pop(i) # 刪除key,避免下面的for循環重複 break for ip in result_dic: d_value = int(first_val) - int(result_dic[ip]) # 判斷差值上下不超過2 if d_value <= -2 or d_value >= 2: print("錯誤, 上下差值超過2 !!!","ip:",ip,"value:",d_value) else: print("正常",ip,"差值爲: ",d_value)
執行腳本,輸出以下:
添加鍵值,ip: 192.168.0.172 value: 1547540144 添加鍵值,ip: 192.168.0.173 value: 1547540145 ... 第一個ip: 192.168.0.172 value: 1547540144 正常 192.168.0.173 差值爲: 1
查看k8s版本,使用命令 kubectl version
root@localhost:~# kubectl version Client Version: version.Info{Major:"1", Minor:"11", GitVersion:"v1.11.2", GitCommit:"bb9ffb1654d4a729bb4cec18ff088eacc153c239", GitTreeState:"clean", BuildDate:"2018-08-07T23:17:28Z", GoVersion:"go1.10.3", Compiler:"gc", Platform:"linux/amd64"}
那麼要獲取到 "v1.11.2" ,還須要進一步過濾
root@localhost:~# kubectl version | awk '{print $5}' | cut -d ':' -f 2 | head -1 | cut -d ',' -f 1 "v1.11.2"
使用上面的ssh2函數以後,輸出的值,是這樣的
["v1.11.2\n"]
這裏會有一個換行符,爲了不這種問題,使用 json.loads() 反序列一下,就能夠還原爲 v1.11.2,連雙引號也沒有了!
kube_v.py
#!/usr/bin/env python3 # coding: utf-8 # 驗證k8s版本 import json import paramiko def ssh2(ip, username, passwd, cmd): """ 使用ssh鏈接遠程服務器執行命令 :param username: 用戶名 :param passwd: 密碼 :param cmd: 執行的命令 :return: """ try: ssh = paramiko.SSHClient() # 建立一個新的SSHClient實例 # 設置host key,若是在"known_hosts"中沒有保存相關的信息,SSHClient 默認行爲是拒絕鏈接,會提示yes/no ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(ip, 22, username, passwd, timeout=1) # 鏈接遠程服務器,超時時間1秒 stdin, stdout, stderr = ssh.exec_command(cmd) # 執行命令 out = stdout.readlines() # 執行結果,readlines會返回列表 ssh.close() # 關閉ssh鏈接 return out except Exception as e: print(e) return False # 服務器列表 ip_list = ["192.168.0.172","192.168.0.173"] username = "root" passwd = "root" cmd = "kubectl version | awk '{print $5}' | cut -d ':' -f 2 | head -1 | cut -d ',' -f 1" # 獲取時間戳命令 result_dic = {} # 執行結果 for ip in ip_list: # 遍歷ip列表 if not result_dic.get(ip): # 添加鍵值對 res = ssh2(ip,username,passwd,cmd) # print("res",res) if res: # 判斷不爲None res = json.loads(res[0]) # 反序列化第一行結果 result_dic[ip] = res print("添加鍵值", ip,"value",res) print("結果字典",result_dic) # 獲取第一個ip以及時間戳 first_ip = "" first_val = "" for i in result_dic: # 遍歷結果字典 # 獲取第一個ip以及時間戳 first_ip = i first_val = ssh2(i,username,passwd,cmd) if first_val: first_val = json.loads(first_val[0]) print("第一個ip", i, "value", first_val) result_dic.pop(i) # 刪除key,比較下面的for循環重複 break for ip in result_dic: d_value = result_dic[ip] # print("其餘服務器,ip",ip,"value",d_value) # 判斷版本是否一致 if first_val == d_value: print("正常", ip, "版本是", d_value) else: print("錯誤, 版本不一致!!!", "ip:", ip, "value:", d_value)
執行輸出:
添加鍵值 192.168.0.172 value v1.11.2 添加鍵值 192.168.0.173 value v1.11.2 ... 第一個ip 192.168.0.172 value v1.11.2 正常 192.168.0.173 版本是 v1.11.2
查看cluster nodes信息,使用命令
redis-cli -c -h 192.168.0.172 -p 7201 cluster nodes
請確保服務器,已經安裝了redis,能夠執行redis-cli命令
執行輸出:
2a77efc52a1dfec83130b908dbd3809e057956f6 192.168.0.172:7201@17201 slave c036906bf3079fa3da15ef43df545b17c3efa144 0 1547548520000 19 connected db60724fdae507eda5d72fd1181559bc6da33097 192.168.0.169:7201@17201 slave ed5cfdfd021dcc821fc0749d318e8ac420cc5c14 0 1547548519607 18 connected ed5cfdfd021dcc821fc0749d318e8ac420cc5c14 192.168.0.143:7201@17201 myself,master - 0 1547548519000 18 connected 0-5460 d4f928c75a2a17d9c30ec77115628eb1840f6ee4 192.168.0.171:7201@17201 slave b8c692d0eff3a5d1ee059878a4213844c6d82ddf 0 1547548520610 21 connected b8c692d0eff3a5d1ee059878a4213844c6d82ddf 192.168.0.170:7201@17201 master - 0 1547548521614 21 connected 10923-16383 c036906bf3079fa3da15ef43df545b17c3efa144 192.168.0.168:7201@17201 master - 0 1547548518604 19 connected 5461-10922
從上面的輸出信息,能夠看出,有3個master節點,3個slave節點。其中紫色部分,若是id一致,表示這是一組服務器!
請確保這一組服務器不能同時掛掉,不然會形成數據丟失!
默認redis集羣要求至少6個節點,當redis集羣中的master節點,掛掉一半時,集羣不可用。
也就是說,目前有3個master節點,最多容許1臺mater節點掛掉!
還有一點,當有任意一個節點狀態爲fail時,也表示集羣不正常!說明:mater節點已經掛掉了一半!
所以,使用python來判斷集羣是否狀態的關鍵點,就是判斷狀態中是否有fail便可!
在redis.conf配置文件中, 有一個參數 appendonly ,表示是否開啓aof,默認是關閉的。
那麼使用redis-cli能夠獲取它的狀態
redis-cli -c -h 192.168.0.172 -p 7201 config get appendonly
執行輸出:
1) "appendonly" 2) "no"
上面這段輸出,表示 沒有開啓aof
redis.py
#!/usr/bin/env python3 # coding: utf-8 # 檢測redis集羣 import paramiko def ssh2(ip, username, passwd, cmd): """ 使用ssh鏈接遠程服務器執行命令 :param username: 用戶名 :param passwd: 密碼 :param cmd: 執行的命令 :return: """ try: ssh = paramiko.SSHClient() # 建立一個新的SSHClient實例 # 設置host key,若是在"known_hosts"中沒有保存相關的信息,SSHClient 默認行爲是拒絕鏈接,會提示yes/no ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(ip, 22, username, passwd, timeout=1) # 鏈接遠程服務器,超時時間1秒 stdin, stdout, stderr = ssh.exec_command(cmd) # 執行命令 out = stdout.readlines() # 執行結果,readlines會返回列表 ssh.close() # 關閉ssh鏈接 return out except Exception as e: print(e) return False # 服務器列表 redis_apps = ["192.168.0.168","192.168.0.172"] username = "root" passwd = "root" port = "6379" cmd1 = "cluster nodes" # cluster nodes狀態 cmd2 = "config get appendonly" # AOF狀態 result_dic = {} # 執行結果 for ip in redis_apps: # 遍歷ip列表 if not result_dic.get(ip): result_dic[ip] = {} # nodes if not result_dic[ip].get('nodes'): # 添加鍵值對,統一由192.168.0.167這臺服務器執行redis-cli,由於它安裝了redis-cli res = ssh2("192.168.0.167",username,passwd,"redis-cli -c -h {} -p {} {}".format(ip,port,cmd1)) result_dic[ip]['nodes'] = res print("添加鍵值,ip: ", ip,"value: ",res) # aof if not result_dic[ip].get('aof'): # 添加鍵值對 res = ssh2("192.168.0.167",username,passwd,"redis-cli -c -h {} -p {} {}".format(ip,port,cmd2)) # print(res,type(res)) res = res[1].split("\n")[0] # 獲取選項值 result_dic[ip]['aof'] = res print("添加鍵值,ip: ", ip,"value: ",res) print("結果字典",result_dic) # 標誌位 flag_nodes = True for i in result_dic: for j in result_dic[i]['nodes']: # 遍歷結果列表 if "fail" in j: # 判斷是否有fail狀態 print("狀態異常: ",j) flag_nodes = False # 判斷標誌位 if flag_nodes: print("redis cluster nodes 狀態正常") else: print("redis cluster nodes 狀態異常") # 輸出aof狀態 print("redis aof狀態以下: ") for i in result_dic: print("ip: {} 狀態: {}".format(i,result_dic[i]['aof']))
執行程序,輸出:
添加鍵值,ip: 192.168.0.168 value: [...] ... redis cluster nodes 狀態正常 redis aof狀態以下: ip: 192.168.0.168 狀態: no ip: 192.168.0.172 狀態: no
判斷etcd工做是否正常,只須要能訪問到api地址,就說明正常,url以下:
http://192.168.0.169:2380/version
etcd.py
#!/usr/bin/env python3 # coding: utf-8 # 檢測etcd狀態 import json import requests etcd_list = ["192.168.0.169","192.168.0.168","192.168.0.167"] for ip in etcd_list: # 訪問api接口,查看版本 response=requests.get('http://%s:2380/version' %ip) # print(response.content) res = (response.content).decode('utf-8') res_dict = json.loads(res) # print(res_dict,type(res_dict)) print("ip: {} etcd版本爲: {}".format(ip,res_dict['etcdserver']))
執行輸出:
ip: 192.168.0.169 etcd版本爲: 3.3.0 ip: 192.168.0.168 etcd版本爲: 3.3.0 ip: 192.168.0.167 etcd版本爲: 3.3.0
查看mysql的運行統計時間,使用命令
show status like "uptime"
#!/usr/bin/env python3 # coding: utf-8 import pymysql conn = pymysql.connect( host="192.168.0.179", # mysql ip地址 user="root", passwd="root", port=3306 # mysql 端口號,注意:必須是int類型 ) cur = conn.cursor() # 建立遊標 # 查詢當前MySQL本次啓動後的運行統計時間 cur.execute('show status like "uptime"') data_all = cur.fetchall() # 獲取執行的返回結果 print(data_all)
執行輸出:
(('Uptime', '941067'),)
查看節點信息,須要在 主節點操做
ceph osd tree
#!/usr/bin/env python3 # coding: utf-8 import paramiko def ssh2(ip, username, passwd, cmd): """ 使用ssh鏈接遠程服務器執行命令 :param username: 用戶名 :param passwd: 密碼 :param cmd: 執行的命令 :return: """ try: ssh = paramiko.SSHClient() # 建立一個新的SSHClient實例 # 設置host key,若是在"known_hosts"中沒有保存相關的信息,SSHClient 默認行爲是拒絕鏈接,會提示yes/no ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(ip, 22, username, passwd, timeout=1) # 鏈接遠程服務器,超時時間1秒 stdin, stdout, stderr = ssh.exec_command(cmd) # 執行命令 out = stdout.readlines() # 執行結果,readlines會返回列表 ssh.close() # 關閉ssh鏈接 return out except Exception as e: print(e) return False svr_list = ["192.168.0.181"] username = "root" passwd = "root" cmd = "ceph osd tree" # 查看節點信息 for ip in svr_list: res = ssh2(ip,username,passwd,cmd) print(ip,"節點信息以下:") print("".join(res))
執行輸出:
192.168.0.181 節點信息以下: ID WEIGHT TYPE NAME UP/DOWN REWEIGHT PRIMARY-AFFINITY -1 1.02695 root default -2 0.18399 host xx-node13 0 0.18399 osd.0 up 1.00000 1.00000 -3 0.18399 host xx-node15 1 0.18399 osd.1 up 1.00000 1.00000 -4 0.18399 host xx-node17 2 0.18399 osd.2 up 1.00000 1.00000 -5 0.18399 host xx-node18 3 0.18399 osd.3 up 1.00000 1.00000 -6 0.10699 host xx-node19 4 0.10699 osd.4 up 1.00000 1.00000 -7 0.18399 host xx-node14 5 0.18399 osd.5 up 1.00000 1.00000
手動建立名爲test的topic
bin/kafka-topics.sh --create --zookeeper zookeeper-1.default.svc.cluster.local:2181,zookeeper-2.default.svc.cluster.local:2128,zookeeper-3.default.svc.cluster.local:2128 --topic test --partitions 1 --replication-factor 1
由於python中的kakfa包沒法直接建立 topic,因此須要手動建立
因爲線上kafka啓動了ACL,那麼請確保相關用戶設置了ACL規則,關於acl的配置,請參考連接:
http://www.javashuo.com/article/p-kqnuvuyr-hr.html
爲了方便,這裏直接使用超級用戶。注意:超級用戶是不須要設置ACL規則的,擁有全部權限
#!/usr/bin/env python3 # coding: utf-8 # 注意:須要手動建立topic才行執行此腳本 import time from kafka import KafkaProducer from kafka import KafkaConsumer class KafkaClient(object): # kafka客戶端程序 def __init__(self, kafka_server, port, topic,content,username,password): self.kafka_server = kafka_server # kafka服務器ip地址 self.port = port # kafka端口 self.topic = topic # topic名 self.content = content # 發送內容 self.username = username self.password = password def producer(self): """ 生產者模式 :return: object """ # 鏈接kafka服務器,好比['192.138.150.193:9092'] producer = KafkaProducer(bootstrap_servers=['%s:%s' % (self.kafka_server, self.port)], security_protocol="SASL_PLAINTEXT", # 指定SASL安全協議 sasl_mechanism='PLAIN', # 配置SASL機制 sasl_plain_username=self.username, # 認證用戶名 sasl_plain_password=self.password, # 密碼 ) producer.send(self.topic, self.content) # 發送消息,必須是二進制 producer.flush() # flush確保全部meg都傳送給broker producer.close() return producer def consumer(self): """ 消費者模式 :return: object """ # 鏈接kafka,指定組爲test_group consumer = KafkaConsumer(topic, group_id='test_group', bootstrap_servers=['%s:%s' % (kafka_server, port)], sasl_mechanism="PLAIN", security_protocol='SASL_PLAINTEXT', sasl_plain_username=self.username, sasl_plain_password=self.password, ) return consumer # for msg in consumer: # recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) # print(recv) def main(self): startime = time.time() # 開始時間 client = KafkaClient(self.kafka_server, self.port, self.topic, self.content,self.username,self.password) # 實例化客戶端 client.producer() # 執行生產者 print('執行生產者') consumer = client.consumer() # 執行消費者 print('執行消費者') print('等待結果....') flag = False for msg in consumer: # recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) # 判斷生產的消息和消費的消息是否一致 print(msg.value) # print(self.content) if msg.value == self.content: flag = True break consumer.close() # 關閉消費者對象 endtime = time.time() # 結束時間 if flag: # %.2f %(xx) 表示保留小數點2位 return "kafka驗證消息成功,花費時間", '%.2f 秒' % (endtime - startime) else: return "kafka驗證消息失敗,花費時間", '%.2f 秒' % (endtime - startime) if __name__ == '__main__': kafka_server = "kafka-1.default.svc.cluster.local" port = "9092" topic = "test" # 測試消息 content = "hello honey".encode('utf-8') username = "admin" password = "admin" client = KafkaClient(kafka_server,port,topic,content,username,password) # 實例化客戶端 print(client.main())
執行程序,輸出:
執行生產者 執行消費者 等待結果.... b'hello honey' ('kafka驗證消息成功,花費時間', '3.61 秒')
注意:第一次執行時,會卡住1分鐘。多執行幾回就會很快了,至於什麼緣由,不知道!
就是在執行這一行代碼時,會卡住
for msg in consumer: