Python 檢測系統時間,k8s版本,redis集羣,etcd,mysql,ceph,kafka

1、概述

線上有一套k8s集羣,部署了不少應用。如今須要對一些基礎服務作一些常規檢測,好比:html

  • 系統時間,要求:k8s的每個節點的時間,差值上下不超過2秒
  • k8s版本,要求:k8s的每個節點的版本必須一致
  • redis集羣,要求:1. 查看cluster nodes狀態 2. AOF狀態
  • etcd,要求:訪問etcd的api,能獲取到版本信息, 說明etcd服務正常
  • mysql,要求:獲取mysql運行統計時間,能獲取說明mysql服務正常
  • ceph,要求:使用ceph osd tree命令查看ceph節點信息
  • kafka,要求:使用生產者模式寫入一個消息,消費者模式能獲得一樣的消息,則kafka服務正常

 

python遠程執行命令

上面這麼多要求,有一大部分,都須要遠程執行命令。那麼如何使用python來執行遠程命令呢?node

使用paramiko模塊便可!python

paramiko

安裝paramiko模塊mysql

pip3 install paramiko

 

使用paramiko

這裏,我封裝了一個函數ssh2,代碼以下:linux

import paramiko
def ssh2(ip, username, passwd, cmd):
    """
    使用ssh鏈接遠程服務器執行命令
    :param username: 用戶名
    :param passwd: 密碼
    :param cmd: 執行的命令
    :return:
    """
    try:
        ssh = paramiko.SSHClient()  # 建立一個新的SSHClient實例
        # 設置host key,若是在"known_hosts"中沒有保存相關的信息,SSHClient 默認行爲是拒絕鏈接,會提示yes/no
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(ip, 22, username, passwd, timeout=1)  # 鏈接遠程服務器,超時時間1秒
        stdin, stdout, stderr = ssh.exec_command(cmd)  # 執行命令
        out = stdout.readlines()    # 執行結果,readlines會返回列表
        ssh.close()  # 關閉ssh鏈接
        return out
    except Exception as e:
        print(e)
        return False

 

執行此函數,會返回一個列表。由於是使用readlines,將結果轉換爲列表了!redis

 

若是執行時,輸出:sql

Please use EllipticCurvePublicKey.public_bytes to obtain both compressed and uncompressed point encoding.

 

緣由

paramiko 2.4.2 依賴 cryptography,而最新的cryptography==2.5裏有一些棄用的API。json

解決

刪掉cryptography,安裝2.4.2,就不會報錯了。bootstrap

pip uninstall cryptography
pip install cryptography==2.4.2

 

本文參考連接:api

https://blog.51cto.com/wangfeng7399/2376115

 

2、系統時間

獲取時間戳

咱們須要獲取多臺服務器的時間,而且還須要對比時間差。那麼最簡單辦法,就是獲取時間戳,它是一段數字,那麼數字之間,就能夠作減法了!

使用 date +%s 命令,就能夠獲取時間戳了

root@localhost:~# date +%s
1547546824

 

對比思路

怎麼去對比,每一天服務器的時間戳呢?上面已經獲取到時間戳了,關鍵問題是,如何對比?

構造字典

這裏須要構造一個數據字典,將每一臺服務器的ip以及時間戳存儲一下,數據格式以下:

{
    'ip地址': 時間戳,
    ...
}

 

對比數據

首先從字典裏面取出第一個值,因爲python 3.5是無需的,因此取出的數據,每次可能不同。不過沒有關係,取出以後,再刪除便可!

而後將取出的第一個值,和字典中的其餘值,作對比便可!注意:時間戳要轉換爲int類型才行!

 

完整代碼

ntp.py

#!/usr/bin/env python3
# coding: utf-8
import json
import paramiko
def ssh2(ip, username, passwd, cmd):
    """
    使用ssh鏈接遠程服務器執行命令
    :param username: 用戶名
    :param passwd: 密碼
    :param cmd: 執行的命令
    :return:
    """
    try:
        ssh = paramiko.SSHClient()  # 建立一個新的SSHClient實例
        # 設置host key,若是在"known_hosts"中沒有保存相關的信息,SSHClient 默認行爲是拒絕鏈接,會提示yes/no
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(ip, 22, username, passwd, timeout=1)  # 鏈接遠程服務器,超時時間1秒
        stdin, stdout, stderr = ssh.exec_command(cmd)  # 執行命令
        out = stdout.readlines()    # 執行結果,readlines會返回列表
        ssh.close()  # 關閉ssh鏈接
        return out
    except Exception as e:
        print(e)
        return False

# 服務器列表
ip_list = ["192.168.0.172","192.168.0.173"]
username = "root"
passwd = "root"
cmd = "date +%s"  # 獲取時間戳命令

result_dic = {}  # 執行結果

for ip in ip_list:  # 遍歷ip列表
    if not result_dic.get(ip):
        # 添加鍵值對
        res = ssh2(ip,username,passwd,cmd)
        if res:
            res = json.loads(res[0])
            result_dic[ip] = res
            print("添加鍵值,ip: ", ip,"value: ",res)
        else:
            print("ssh鏈接服務器失敗,ip: %s" %ip)

print("結果字典",result_dic)

# 獲取第一個ip以及時間戳
first_ip = ""
first_val = ""

for i in result_dic:  # 遍歷結果字典
    # 獲取第一個ip以及時間戳
    first_ip = i
    res = ssh2(i,username,passwd,cmd)
    first_val = json.loads(res[0])
    print("第一個ip: ", i,"value: ",first_val)

    result_dic.pop(i)  # 刪除key,避免下面的for循環重複
    break

for ip in result_dic:
    d_value = int(first_val) - int(result_dic[ip])
    # 判斷差值上下不超過2
    if d_value <= -2 or d_value >= 2:
        print("錯誤, 上下差值超過2 !!!","ip:",ip,"value:",d_value)
    else:
        print("正常",ip,"差值爲: ",d_value)
View Code

 

執行腳本,輸出以下:

添加鍵值,ip:  192.168.0.172 value:  1547540144
添加鍵值,ip:  192.168.0.173 value:  1547540145
...
第一個ip:  192.168.0.172 value:  1547540144
正常 192.168.0.173 差值爲:  1

 

3、k8s版本

獲取版本

查看k8s版本,使用命令 kubectl version 

root@localhost:~# kubectl version
Client Version: version.Info{Major:"1", Minor:"11", GitVersion:"v1.11.2", GitCommit:"bb9ffb1654d4a729bb4cec18ff088eacc153c239", GitTreeState:"clean", BuildDate:"2018-08-07T23:17:28Z", GoVersion:"go1.10.3", Compiler:"gc", Platform:"linux/amd64"}

 

那麼要獲取到 "v1.11.2" ,還須要進一步過濾

root@localhost:~# kubectl version | awk '{print $5}' | cut -d ':' -f 2 | head -1 | cut -d ',' -f 1
"v1.11.2"

 

對比版本

使用上面的ssh2函數以後,輸出的值,是這樣的

["v1.11.2\n"]

 

這裏會有一個換行符,爲了不這種問題,使用 json.loads() 反序列一下,就能夠還原爲 v1.11.2,連雙引號也沒有了!

 

完整代碼

kube_v.py

#!/usr/bin/env python3
# coding: utf-8
# 驗證k8s版本

import json
import paramiko
def ssh2(ip, username, passwd, cmd):
    """
    使用ssh鏈接遠程服務器執行命令
    :param username: 用戶名
    :param passwd: 密碼
    :param cmd: 執行的命令
    :return:
    """
    try:
        ssh = paramiko.SSHClient()  # 建立一個新的SSHClient實例
        # 設置host key,若是在"known_hosts"中沒有保存相關的信息,SSHClient 默認行爲是拒絕鏈接,會提示yes/no
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(ip, 22, username, passwd, timeout=1)  # 鏈接遠程服務器,超時時間1秒
        stdin, stdout, stderr = ssh.exec_command(cmd)  # 執行命令
        out = stdout.readlines()    # 執行結果,readlines會返回列表
        ssh.close()  # 關閉ssh鏈接
        return out
    except Exception as e:
        print(e)
        return False

# 服務器列表
ip_list = ["192.168.0.172","192.168.0.173"]
username = "root"
passwd = "root"

cmd = "kubectl version | awk '{print $5}' | cut -d ':' -f 2 | head -1 | cut -d ',' -f 1"  # 獲取時間戳命令
result_dic = {}  # 執行結果

for ip in ip_list:  # 遍歷ip列表
    if not result_dic.get(ip):
        # 添加鍵值對
        res = ssh2(ip,username,passwd,cmd)
        # print("res",res)
        if res:  # 判斷不爲None
            res = json.loads(res[0])  # 反序列化第一行結果

        result_dic[ip] = res
        print("添加鍵值", ip,"value",res)

print("結果字典",result_dic)
# 獲取第一個ip以及時間戳
first_ip = ""
first_val = ""

for i in result_dic:  # 遍歷結果字典
    # 獲取第一個ip以及時間戳
    first_ip = i
    first_val = ssh2(i,username,passwd,cmd)

    if first_val:
        first_val = json.loads(first_val[0])
    print("第一個ip", i, "value", first_val)

    result_dic.pop(i)  # 刪除key,比較下面的for循環重複
    break

for ip in result_dic:
    d_value = result_dic[ip]
    # print("其餘服務器,ip",ip,"value",d_value)
    # 判斷版本是否一致
    if first_val == d_value:
        print("正常", ip, "版本是", d_value)
    else:
        print("錯誤, 版本不一致!!!", "ip:", ip, "value:", d_value)
View Code

 

執行輸出:

添加鍵值 192.168.0.172 value v1.11.2
添加鍵值 192.168.0.173 value v1.11.2
...
第一個ip 192.168.0.172 value v1.11.2
正常 192.168.0.173 版本是 v1.11.2

 

4、redis集羣

cluster nodes

查看cluster nodes信息,使用命令

redis-cli -c -h 192.168.0.172 -p 7201 cluster nodes

 

請確保服務器,已經安裝了redis,能夠執行redis-cli命令

 

執行輸出:

2a77efc52a1dfec83130b908dbd3809e057956f6 192.168.0.172:7201@17201 slave c036906bf3079fa3da15ef43df545b17c3efa144 0 1547548520000 19 connected
db60724fdae507eda5d72fd1181559bc6da33097 192.168.0.169:7201@17201 slave ed5cfdfd021dcc821fc0749d318e8ac420cc5c14 0 1547548519607 18 connected
ed5cfdfd021dcc821fc0749d318e8ac420cc5c14 192.168.0.143:7201@17201 myself,master - 0 1547548519000 18 connected 0-5460
d4f928c75a2a17d9c30ec77115628eb1840f6ee4 192.168.0.171:7201@17201 slave b8c692d0eff3a5d1ee059878a4213844c6d82ddf 0 1547548520610 21 connected
b8c692d0eff3a5d1ee059878a4213844c6d82ddf 192.168.0.170:7201@17201 master - 0 1547548521614 21 connected 10923-16383
c036906bf3079fa3da15ef43df545b17c3efa144 192.168.0.168:7201@17201 master - 0 1547548518604 19 connected 5461-10922

 

從上面的輸出信息,能夠看出,有3個master節點,3個slave節點。其中紫色部分,若是id一致,表示這是一組服務器!

請確保這一組服務器不能同時掛掉,不然會形成數據丟失!

默認redis集羣要求至少6個節點,當redis集羣中的master節點,掛掉一半時,集羣不可用。

也就是說,目前有3個master節點,最多容許1臺mater節點掛掉!

判斷依據

還有一點,當有任意一個節點狀態爲fail時,也表示集羣不正常!說明:mater節點已經掛掉了一半!

所以,使用python來判斷集羣是否狀態的關鍵點,就是判斷狀態中是否有fail便可!

 

aof狀態

在redis.conf配置文件中, 有一個參數 appendonly ,表示是否開啓aof,默認是關閉的。

那麼使用redis-cli能夠獲取它的狀態

redis-cli -c -h 192.168.0.172 -p 7201 config get appendonly

 

執行輸出:

1) "appendonly"
2) "no"

 

上面這段輸出,表示 沒有開啓aof

 

完整代碼

redis.py

#!/usr/bin/env python3
# coding: utf-8
# 檢測redis集羣

import paramiko

def ssh2(ip, username, passwd, cmd):
    """
    使用ssh鏈接遠程服務器執行命令
    :param username: 用戶名
    :param passwd: 密碼
    :param cmd: 執行的命令
    :return:
    """
    try:
        ssh = paramiko.SSHClient()  # 建立一個新的SSHClient實例
        # 設置host key,若是在"known_hosts"中沒有保存相關的信息,SSHClient 默認行爲是拒絕鏈接,會提示yes/no
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(ip, 22, username, passwd, timeout=1)  # 鏈接遠程服務器,超時時間1秒
        stdin, stdout, stderr = ssh.exec_command(cmd)  # 執行命令
        out = stdout.readlines()    # 執行結果,readlines會返回列表
        ssh.close()  # 關閉ssh鏈接
        return out
    except Exception as e:
        print(e)
        return False

# 服務器列表
redis_apps = ["192.168.0.168","192.168.0.172"]

username = "root"
passwd = "root"
port = "6379"
cmd1 = "cluster nodes"  # cluster nodes狀態
cmd2 = "config get appendonly"  # AOF狀態

result_dic = {}  # 執行結果

for ip in redis_apps:  # 遍歷ip列表
    if not result_dic.get(ip):
        result_dic[ip] = {}
        # nodes
        if not result_dic[ip].get('nodes'):
            # 添加鍵值對,統一由192.168.0.167這臺服務器執行redis-cli,由於它安裝了redis-cli
            res = ssh2("192.168.0.167",username,passwd,"redis-cli -c -h {} -p {} {}".format(ip,port,cmd1))
            result_dic[ip]['nodes'] = res
            print("添加鍵值,ip: ", ip,"value: ",res)

        # aof
        if not result_dic[ip].get('aof'):
            # 添加鍵值對
            res = ssh2("192.168.0.167",username,passwd,"redis-cli -c -h {} -p {} {}".format(ip,port,cmd2))
            # print(res,type(res))
            res = res[1].split("\n")[0]  # 獲取選項值
            result_dic[ip]['aof'] = res
            print("添加鍵值,ip: ", ip,"value: ",res)

print("結果字典",result_dic)


# 標誌位
flag_nodes = True
for i in result_dic:
    for j in result_dic[i]['nodes']:  # 遍歷結果列表
        if "fail" in j:  # 判斷是否有fail狀態
            print("狀態異常: ",j)
            flag_nodes = False

# 判斷標誌位
if flag_nodes:
    print("redis cluster nodes 狀態正常")
else:
    print("redis cluster nodes 狀態異常")

# 輸出aof狀態
print("redis aof狀態以下: ")
for i in result_dic:
    print("ip: {} 狀態: {}".format(i,result_dic[i]['aof']))
View Code

 

執行程序,輸出:

添加鍵值,ip:  192.168.0.168 value: [...]
...
redis cluster nodes 狀態正常
redis aof狀態以下: 
ip: 192.168.0.168 狀態: no
ip: 192.168.0.172 狀態: no

 

5、etcd

etcd api

判斷etcd工做是否正常,只須要能訪問到api地址,就說明正常,url以下:

http://192.168.0.169:2380/version

 

完整代碼

etcd.py

#!/usr/bin/env python3
# coding: utf-8
# 檢測etcd狀態

import json
import requests


etcd_list = ["192.168.0.169","192.168.0.168","192.168.0.167"]

for ip in etcd_list:
    # 訪問api接口,查看版本
    response=requests.get('http://%s:2380/version' %ip)
    # print(response.content)
    res = (response.content).decode('utf-8')
    res_dict = json.loads(res)
    # print(res_dict,type(res_dict))
    print("ip: {} etcd版本爲: {}".format(ip,res_dict['etcdserver']))
View Code

 

執行輸出:

ip: 192.168.0.169 etcd版本爲: 3.3.0
ip: 192.168.0.168 etcd版本爲: 3.3.0
ip: 192.168.0.167 etcd版本爲: 3.3.0

 

6、mysql

運行統計時間

查看mysql的運行統計時間,使用命令

show status like "uptime"

 

完整代碼

#!/usr/bin/env python3
# coding: utf-8

import pymysql

conn = pymysql.connect(
    host="192.168.0.179",  # mysql ip地址
    user="root",
    passwd="root",
    port=3306  # mysql 端口號,注意:必須是int類型
)

cur = conn.cursor()  # 建立遊標

# 查詢當前MySQL本次啓動後的運行統計時間
cur.execute('show status like "uptime"')

data_all = cur.fetchall()  # 獲取執行的返回結果
print(data_all)
View Code

 

執行輸出:

(('Uptime', '941067'),)

 

7、ceph

節點信息

查看節點信息,須要在 主節點操做

ceph osd tree

 

完整代碼

#!/usr/bin/env python3
# coding: utf-8

import paramiko

def ssh2(ip, username, passwd, cmd):
    """
    使用ssh鏈接遠程服務器執行命令
    :param username: 用戶名
    :param passwd: 密碼
    :param cmd: 執行的命令
    :return:
    """
    try:
        ssh = paramiko.SSHClient()  # 建立一個新的SSHClient實例
        # 設置host key,若是在"known_hosts"中沒有保存相關的信息,SSHClient 默認行爲是拒絕鏈接,會提示yes/no
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(ip, 22, username, passwd, timeout=1)  # 鏈接遠程服務器,超時時間1秒
        stdin, stdout, stderr = ssh.exec_command(cmd)  # 執行命令
        out = stdout.readlines()    # 執行結果,readlines會返回列表
        ssh.close()  # 關閉ssh鏈接
        return out
    except Exception as e:
        print(e)
        return False

svr_list = ["192.168.0.181"]
username = "root"
passwd = "root"
cmd = "ceph osd tree"  # 查看節點信息

for ip in svr_list:
    res = ssh2(ip,username,passwd,cmd)
    print(ip,"節點信息以下:")
    print("".join(res))
View Code

 

執行輸出:

192.168.0.181 節點信息以下:
ID WEIGHT  TYPE NAME           UP/DOWN REWEIGHT PRIMARY-AFFINITY 
-1 1.02695 root default                                          
-2 0.18399     host xx-node13                                   
 0 0.18399         osd.0            up  1.00000          1.00000 
-3 0.18399     host xx-node15                                   
 1 0.18399         osd.1            up  1.00000          1.00000 
-4 0.18399     host xx-node17                                   
 2 0.18399         osd.2            up  1.00000          1.00000 
-5 0.18399     host xx-node18                                   
 3 0.18399         osd.3            up  1.00000          1.00000 
-6 0.10699     host xx-node19                                   
 4 0.10699         osd.4            up  1.00000          1.00000 
-7 0.18399     host xx-node14                                   
 5 0.18399         osd.5            up  1.00000          1.00000 

 

8、kafka

建立topic

手動建立名爲test的topic

bin/kafka-topics.sh --create --zookeeper zookeeper-1.default.svc.cluster.local:2181,zookeeper-2.default.svc.cluster.local:2128,zookeeper-3.default.svc.cluster.local:2128 --topic test --partitions 1 --replication-factor 1

 

由於python中的kakfa包沒法直接建立 topic,因此須要手動建立

 

完整代碼

因爲線上kafka啓動了ACL,那麼請確保相關用戶設置了ACL規則,關於acl的配置,請參考連接:

http://www.javashuo.com/article/p-kqnuvuyr-hr.html

 

爲了方便,這裏直接使用超級用戶。注意:超級用戶是不須要設置ACL規則的,擁有全部權限

#!/usr/bin/env python3
# coding: utf-8
# 注意:須要手動建立topic才行執行此腳本

import time
from kafka import KafkaProducer
from kafka import KafkaConsumer


class KafkaClient(object):  # kafka客戶端程序
    def __init__(self, kafka_server, port, topic,content,username,password):
        self.kafka_server = kafka_server  # kafka服務器ip地址
        self.port = port  # kafka端口
        self.topic = topic  # topic名
        self.content = content  # 發送內容
        self.username = username
        self.password = password

    def producer(self):
        """
        生產者模式
        :return: object
        """

        # 鏈接kafka服務器,好比['192.138.150.193:9092']
        producer = KafkaProducer(bootstrap_servers=['%s:%s' % (self.kafka_server, self.port)],
                                 security_protocol="SASL_PLAINTEXT",  # 指定SASL安全協議
                                 sasl_mechanism='PLAIN',  # 配置SASL機制
                                 sasl_plain_username=self.username,  # 認證用戶名
                                 sasl_plain_password=self.password,  # 密碼
                                 )

        producer.send(self.topic, self.content)  # 發送消息,必須是二進制
        producer.flush()  # flush確保全部meg都傳送給broker
        producer.close()
        return producer

    def consumer(self):
        """
        消費者模式
        :return: object
        """

        # 鏈接kafka,指定組爲test_group
        consumer = KafkaConsumer(topic, group_id='test_group', bootstrap_servers=['%s:%s' % (kafka_server, port)],
                                 sasl_mechanism="PLAIN",
                                 security_protocol='SASL_PLAINTEXT',
                                 sasl_plain_username=self.username,
                                 sasl_plain_password=self.password,
                                 )

        return consumer
        # for msg in consumer:
        #     recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
        #     print(recv)

    def main(self):
        startime = time.time()  # 開始時間

        client = KafkaClient(self.kafka_server, self.port, self.topic, self.content,self.username,self.password)  # 實例化客戶端
        client.producer()  # 執行生產者
        print('執行生產者')
        consumer = client.consumer()  # 執行消費者
        print('執行消費者')
        print('等待結果....')
        flag = False

        for msg in consumer:
            # recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
            # 判斷生產的消息和消費的消息是否一致
            print(msg.value)
            # print(self.content)
            if msg.value == self.content:
                flag = True
                break

        consumer.close()  # 關閉消費者對象
        endtime = time.time()  # 結束時間

        if flag:
            # %.2f %(xx) 表示保留小數點2位
            return "kafka驗證消息成功,花費時間", '%.2f 秒' % (endtime - startime)
        else:
            return "kafka驗證消息失敗,花費時間", '%.2f 秒' % (endtime - startime)


if __name__ == '__main__':
    kafka_server = "kafka-1.default.svc.cluster.local"
    port = "9092"
    topic = "test"
    # 測試消息
    content = "hello honey".encode('utf-8')

    username = "admin"
    password = "admin"

    client = KafkaClient(kafka_server,port,topic,content,username,password)  # 實例化客戶端
    print(client.main())
View Code

 

執行程序,輸出:

執行生產者
執行消費者
等待結果....
b'hello honey'
('kafka驗證消息成功,花費時間', '3.61 秒')

 

注意:第一次執行時,會卡住1分鐘。多執行幾回就會很快了,至於什麼緣由,不知道!

就是在執行這一行代碼時,會卡住

for msg in consumer:
相關文章
相關標籤/搜索