python 面向對象&經常使用模塊(二)

1.面向對象

1.1 異常處理(異常、斷言、異常類型、拋出異常、Python異常捕捉)

異常處理html

name = "aa"
try:   # 這一段是要執行的主體
    print(name)
# except Exception as msg: # 若是沒有正常執行,就執行下面的這一段。Exception 是錯誤類型。
except : # 若是沒有正常執行,就執行下面的這一段。Exception 是錯誤類型(有好多類型,能夠百度查下,錯誤類型 ,儘可能寫的準確些)。
    print("name is err...")

else: # 只有當try的那段 正常執行了後,纔會執行下面的這一段。
    print("welcome: {}.".format(name))

finally: # 不管有沒有正常執行 都會執行這一段的。
    print("this is test.")

斷言 assert python

格式: assert 表達式, '異常錯誤信息'

# 當表達式不成立時,就會拋後面的錯誤信息。
assert  name == "aa","name not wf"

raise 異常類型("錯誤提示") ;和try結合使用mysql

做用:
1.捕獲異常爲了讓程序不至於中斷,在邏輯控制以內。
2.拋出異常是爲了,保證程序在遇到這種問題的時候必須停止。

v1 = "wf"
li = [1,23]
try:
    # print(v1)
    # print(li[4])
    raise  IndexError("索引不存在") # 通常不用
# 爲啥寫這麼多except呢?方便記錄日誌.
except IndexError as msg: # 錯誤類型是索引
    print(msg)
except Exception as msg: # 全部的錯誤類型,你能夠這麼些
    print(msg)
else:
    print("good")
finally:
    print("test")

1.2 面向對象編程(面向對象基礎、封裝和調用、間接調用、定義類、類成員)

def send_mail(msg,mail):
    print("msg:{} to mail:{}".format(msg,mail))

send_mail("hellow","wangfei1000@yeah.net")

class mail():

    def __init__(self,msg,mail):
        #構造方法,無須調用,自動執行
        self.msg = msg
        self.mail = mail

    def send_mail(self):
        print("send msg:{} to mail:{}".format(self.msg, self.mail))

# 建立一個對象,obj 就是對象。obj是類對象的指針。
obj = mail("hello","wf@qq.com") 他指向mail這個類。
obj.send_mail()

何時能用到類呢? 我我的理解:redis

例如我寫一個zabbix 增刪改查腳本,每次操做都須要使用zabbix建立好的這個鏈接。
我固然能夠寫成面向過程或面向函數的這種腳本。可是它的可讀性和代碼的簡潔度很差。
我寫成一個類,不管是使用仍是可讀性都會變的很好。

1.3 繼承與多態(類的繼承、繼承順序、方法重寫)

class c1(): # 父類;基類

    def f1(self):
        v1 = 1
        print(v1)

    def f2(self):
        v2 = 2
        print(v2)

class c2(c1): # 子類;派生類,如何繼承多個父類呢? c2(c1,c4)
    name = "wangfei"   # 靜態字段

    def __init__(self,age):
        self.age = age

    def f1(self):
        v1 = "c2_2"    # 普通字段
        print(v1)

    def f3(self):
        v3 = 3
        print(v3)

# obj = c2("18") #
# # obj.f1() # 子類的方法是優先父類的方法
# # obj.f2() # 子類沒有這個方法就到父類去找
# re = c2.name # 靜態字段經過類來訪問的
# print(re)
# re2 = obj # 普通字段經過對象來訪問
# print(re2)

啥叫多態呢? 能夠往類裏傳入任何數據類型的參數。 sql

多繼承注意事項。mongodb

1. 當只有一條道的時候,就是一條路走到底。子類繼承父類,父類繼承父父類。
2. 當有2條道的時候,左邊的的優先於右邊的。當左邊的道走到第三步尚未走到時,就要返回同級走第二條道到底了。
3. 這裏須要注意的是,self是表明是第一層的obj 就是最小的那代類。

1.4 類方法與類屬性(類方法定義、方法調用、類內置屬性、運算符重載)

類的方法編程

class c1():
    v2 = 20

    def __init__(self,v1):  # 構造方法
        self.v1 = v1

    def f1(self):  # 普通方法
        print(self.v1)
        print(c1.v2)

    @staticmethod
    def f2():   # 靜態方法,經過類來調用;爲了節省內存
        print(c1.v2)

obj1 = c1(120)
c1.f2() # 經過類來訪問靜態方法
obj1.f2() # 經過對象來訪問

類的屬性json

class c1():
    v2 = 20

    def __init__(self,v1):
        self.v1 = v1

    def f1(self):
        print(self.v1)
        print(c1.v2)

    @property # 這個就叫作屬性.
    def f2(self):
        print(c1.v2)

    def f3(self):
        re = obj1.f2
        # print(re)

obj1 = c1(120)
obj1.f2 # 奇怪吧? 經過訪問字段的方式進行使用的

2.實戰應用

2.1 模塊應用Requests(RESTAPI標準操做、HTTP請求與返回操做、API接口調用實戰)

requests模塊 參考連接api

獲取訪問url的狀態碼多線程

import  requests
res = requests.get("http://www.baidu.com/")
print(res.status_code)

案列1:讀取url列表,進行測試,若是狀態碼是40x 50x 的就打印出來。生產環境就是調用接口進行報警。

import requests
url_list = ['http://www.baidu.com', 'http://www.sina.com.cn', 'http://adminset.cn']
err_http_code = [401,402,403,404,500,501,502,503,504,505]

for url in url_list:
    res = requests.get(url)
    if res.status_code in err_htttp_code:
        print("access {} http code failed".format(url))

案例2:上方案例從文件中讀取url列表,進行判斷。

import requests
error_code_list = [401,402,403,404,500,501,502,503,504,505]
with open("url_list.txt","rb") as url_file: 
    while True:
        url_line = url_file.readline()
        if url_line:
            str_url_line = str(url_line,encoding="utf-8")
            str_url = str_url_line.split("\n")[0] # 從文件裏面讀取的行,後面是有換行符的。\n的 不能直接使用,須要去掉哦。
            res = requests.get(str_url)
            if res.status_code in error_code_list:
                print("url: {}, status code: {}".format(str_url,res.status_code))
        else:
            break

若是url是包含https,怎麼處理呢 ?

res = requests.get("https://www.12306.cn",verify = False)
print(res)

請求api接口,獲取添加的主機信息。因爲請求的時候,須要token信息,url須要拼接下。

# url後面的」「 表示後面要接參數 ,由於在字典裏面定義了(para變量),requests模塊加了params後會自動進行 拼接得。
# http://115.28.147.154/cmdb/get/host/?token="token信息"&name=「user」

para = {'token': 'PnSlpwJDQE3L', 'name': 'adminset'}
res = requests.get("http://115.28.147.154/cmdb/get/host/", params = para)
# 查看拼接後的url
print(res.url)

# 獲取結果
data = r.text

import json
# 進行反序列化
d1 = json.loads(data)
print(d1['data']['ip'])

2.2 模塊應用Redis、MySQL、MongoDB

redis參考連接

redis客戶端使用幫助

mysql參考連接

mongodb參考連接

向redis裏面寫入數據,讀取數據。

import  redis
redis_conn = redis.StrictRedis(host="127.0.0.1",port=6379,db=0)
redis_conn.set("name","wangfei") # 寫入數據
re = redis_conn.get("name") # 讀取數據數據
print(re)

redis_conn.lpush("list2","python class") # push的是一個列表

例子:獲取主機名和ip地址 ,而後寫到redis裏面去。

import  redis
import  socket
def get_sys_info():
    try:
        hostname = socket.gethostname()
        ip = socket.gethostbyname(hostname)
    except Exception as msg:
        print(msg)
        hostname = ""
        ip = ""
    finally:
        data = {"hostname": hostname,"ip":ip}
    return  data

# 連接redis
redis_conn = redis.StrictRedis("127.0.0.1",6379)

try:
    # 獲取系統信息
    re = get_sys_info()
    # 將系統信息寫到redis,做爲列表裏的一個元素。
    redis_conn.lpush("host",re)
except Exception as msg:
    print(msg)

# 上面這一段或者是這麼寫。
# re = get_sys_info()
# assert redis_conn.lpush("host",re),"lpush error"

2.3 模塊應用logging(日誌記錄,爲本身的程序添加日誌)

logging模塊官方參考連接

Level Numeric value
CRITICAL 50
ERROR 40
WARNING 30
INFO 20
DEBUG 10
import logging

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(levelname)s %(message)s',
                    datefmt='%Y%m%d %H:%M:%S',
                    filename="./agent.log",
                    filemode='a')

log = "redis connecting+++++++++++++"
logging.critical(log)

2.4 模塊應用psutil(操做系統信息採集實戰)

psutil官方參考連接

psutil 模塊獲取系統信息

import  psutil

# 獲取cpu的數量信息
print(psutil.cpu_count()) # 邏輯核心數
print(psutil.cpu_count(logical=False)) # 物理核心數

# 獲取全部磁盤分區信息
# print(psutil.disk_partitions())
res = psutil.disk_partitions()
for mount in res:
    print(mount[1])

# 獲取磁盤分區的使用信息
# print(psutil.disk_usage("/"))
res = psutil.disk_partitions()
for mount in res:
    print(psutil.disk_usage(mount[1]))

# 獲取內存信息
print(sutil.virtual_memory())
print(sutil.virtual_memory().total)

2.5 模塊應用configparser(配置文件解析,爲本身的程序增長配置文件管理)

configparser 模塊讀取配置文件。

import  configparser
# 如何從一個文件裏面讀取參數?
# 讀取文件流程

# 第1步 初始化模塊(我本身想的,幫助記憶理解)
config = configparser.RawConfigParser()
# 第2步 打開配置參數文件
redis_conf_file = open("redis.conf","r")
# 第3步 讀取全部參數內容
config.read_file(redis_conf_file)
# 第4步 獲取對應key下面的參數。
print(config.get("redis","host"))
print(config.get("redis","port"))

-----------------------
備註:redis.conf內容
    [redis]
    host = 127.0.0.1
    port = 6379

例子: 從配置文件裏面讀取redis 地址和端口信息,而後寫一個信息到redis裏面去。

import  redis

# python2 python3 導入configparser模塊名不同。 
try:
    import configparser
except Exception as msg:
    print(msg)
    import  ConfigParser

config = configparser.RawConfigParser()

# 打開配置文件,讀取內容
try:
    with open("redis.conf","r") as redis_conf_file:
        config.read_file(redis_conf_file)
        host = config.get("redis","host")
        port = config.get("redis","port")
except Exception as msg:
    print(msg)

# 寫信息到redis裏面去
try:
    redis_conn = redis.StrictRedis(host,port,0)
    redis_conn.set("test_key","test_values")
except Exception as msg:
    print(msg)

2.6 模塊應用schedule(Python計劃任務實戰)

# 定時任務模塊
# time.sleep 是整個腳本程序都中斷了,任務是串行的。使用schedule 是異步執行的。

import  schedule
import  time
import  threading

def hello(): 60s
    print("2 hello")

def morning(): 3600s
    print("1 morning")

def job_thread(func): # 多線程
    t = threading.Thread(target=func)
    t.start()

schedule.every(2).seconds.do(job_thread, hello) # 每隔2秒執行一次任務。
schedule.every(1).seconds.do(job_thread, morning) # 每隔1秒 執行一次任務。

while True: # 這個是固定的寫法,每隔1s 檢查查定時任務
    schedule.run_pending()
    time.sleep(1)

2.7 xlrd/xlwt (實現對excel文件的讀寫)

import  xlrd # 讀取excel文件內容

#### 從Excel文件讀取內容

zabbix_file = xlrd.open_workbook("./zabbix-host.xls")
zabbix_file_table = zabbix_file.sheet_by_name("sheet01")

print(zabbix_file_table.nrows) # 獲取行數
print(zabbix_file_table.row_values(0)) # 獲取指定行的內容
print( zabbix_file_table.ncols) # 獲取列數
print(zabbix_file_table.col_values(2)) # 獲取指定列的內容
print(zabbix_file_table.cell(1,2)) # 獲取1行2列內容

#### 向Excel文件寫入內容

import xlwt # 向Excel文件裏寫入內容

zabbix_file = xlwt.Workbook(encoding="ascii") # 指定編碼
file_sheet = zabbix_file.add_sheet("sheet02") # 新建工做簿

file_sheet.write(0,1,label = "hehe") # 寫入內容
file_sheet.write(0,2,label = "hehe")
file_sheet.write(0,4,label = "hehe")
file_sheet.write(0,3,label = "hehe")
zabbix_file.save("test.xls") # 保存

3.實戰案例

3.1 調用Zabbix API ,增刪改主機。

from  pyzabbix import ZabbixAPI
import xlrd
import configparser
import logging
import sys
import os

class zabbix():

    def __init__(self,host_file):

        self.host_file = host_file
        # 日誌模塊
        logging.basicConfig(level=logging.INFO,
                            format='%(asctime)s %(levelname)s %(message)s',
                            datefmt='%Y%m%d %H:%M:%S',
                            filename="./zabbix.log",
                            filemode='a')

        # 讀取zabbix 配置文件
        try:
            config = configparser.RawConfigParser()
            with open("/Users/feiwang/PycharmProjects/python-study2/work/zabbix.conf","r") as zabbix_conf_file:
                config.read_file(zabbix_conf_file)
                self.zabbix_user = config.get("zabbix","user")
                self.zabbix_passwd = config.get("zabbix","passwd")

        except Exception as msg:
            logging.error(msg)
            raise msg

        # 登陸zabbix
        try:
            self.zapi = ZabbixAPI("http://192.168.4.135/zabbix")
            self.zapi.login(self.zabbix_user,self.zabbix_passwd)
            logging.info("connecting zabbix successful.")

        except Exception as msg:
            logging.error("connecting zabbix failed" + "detail:{}".format(msg))
            raise msg

    def load_zabbix(self):

        # 讀取excel文件
        try:
            zabbix_file = xlrd.open_workbook(self.host_file)
            zabbix_file_table = zabbix_file.sheet_by_name("sheet01")
            logging.info("open host.xls ok.")
        except Exception as msg:
            logging.error(msg)
            zabbix_file = ''
            zabbix_file_table = ''

        # 讀取zabbix 信息
        host_list = []
        for row in range(1, zabbix_file_table.nrows):
            row_values = zabbix_file_table.row_values(row)
            visible_name = row_values[1]
            hostname = row_values[0]
            port = row_values[-1]
            ip = row_values[-2]
            group_id = "2"
            create_host = {
                "host": hostname,
                "description": visible_name,
                "name": visible_name,
                "interfaces": [
                    {
                        "type": 1,
                        "main": 1,
                        "useip": 1,
                        "ip": ip,
                        "dns": "",
                        "port": port
                    }, ],
                "groups": [{
                    "groupid": group_id
                }, ], }
            host_list.append(create_host)

        return host_list

    def add_host(self):
        for host in self.load_zabbix():
            try:
                self.zapi.host.create(host)
                logging.info("create status: ok, create host info:{}".format(host))
            except Exception as msg:
                print(msg)
                logging.error("zabbix create hosts failed. {}".format(msg))

    # # del host
    def del_host(self):
        '''
        循環zabbix 上獲取到的主機,而後和本地excel文件裏要刪除的主機進行對比,若是是的,那麼就把他刪掉。
        刪除用hostid
        :return:
        '''

        for host in obj.zapi.host.get():
            for del_host in self.load_zabbix():
                if host["host"] == del_host["host"]:
                    try:
                        self.zapi.host.delete(host["hostid"])
                        print("delete host{} is ok.".format(host["host"]))
                        logging.info("del host:{} ok".format(host["hostid"]))
                    except Exception as msg:
                        print(msg)
                        logging.error("del host:{} failed,".format(host["host"]))

if __name__ == "__main__":
    action,file = sys.argv[1:]
    assert os.path.exists(file),"file:'{}' doesn't exists".format(file)
    obj = zabbix(file)

    if action == "add":
        obj.add_host()
    elif action == "del":
        obj.del_host()
    elif action == "update":
        pass
    elif action == "get":
        pass
    else:
        print("action is err.")
        raise Exception

3.2 開發一個Agent,採集系統信息。

要求:

  1. 獲取cpu 內存 磁盤(磁盤使用信息) 主機名 主機ip地址 等信息
  2. 組裝成json格式上傳到redis裏。
  3. 要求記錄日誌
  4. 定時執行每10分鐘更新一次。
  5. 用多線程執行

Agent腳本內容

import json
import platform
import schedule
import psutil
import configparser
import redis
import logging
import threading
import socket
import time
import math

def log():
    logging.basicConfig(level=logging.INFO,
                        format='%(asctime)s %(levelname)s %(message)s',  # 日誌格式
                        datefmt='%Y%m%d %H:%M:%S',  # 時間格式
                        filename="./agent.log",  # 日誌文件的路徑
                        filemode='a')  # 文件的打開模式
    return logging.basicConfig
log()

def get_sys():
    try:
        hostname = socket.gethostname()
        ip = socket.gethostbyname(hostname)
    except Exception as msg:
        print(msg)
        hostname = ""
        ip = ""

    data = {"hostname":hostname,"ip":ip}
    return  data

def get_cpu():
    cpu_count_logical = psutil.cpu_count(logical=False)
    cpu_count_phycial = psutil.cpu_count(logical=True)
    cpu_percent = psutil.cpu_percent(interval=2)

    data = {"phycial_count":cpu_count_phycial,"logical":cpu_count_logical,"percent":cpu_percent}
    return data

def get_mem():
    # math.ceil(小數) 取整數
    mem_total = math.ceil(psutil.virtual_memory().total /1024/1024/1024)
    mem_percent = psutil.virtual_memory().percent

    data = {"mem_total":mem_total,"mem_percent":mem_percent}
    return data

def get_disk():
    disk_list = []
    all_disk_part = psutil.disk_partitions()
    for partition in all_disk_part:
        mount_point = partition[1]
        mount_total = math.ceil(psutil.disk_usage(mount_point).total/1024/1024/1024)
        mount_percent = psutil.disk_usage(mount_point).percent

        data = {"mount": mount_point,"total":mount_total,"percent":mount_percent}
        disk_list.append(data)

    return  disk_list

def thread_run():
    pass

def agent_info():
    sys_data = {}
    sys_data["host"] = get_sys()
    sys_data["disk"] = get_disk()
    sys_data["mem"] = get_mem()
    sys_data["cpu"] = get_cpu()

    return  json.dumps(sys_data) # 進行反序列化

def put_redis():
    config = configparser.RawConfigParser()
    try:
        with open("redis.conf","r") as redis_conf_file:
            config.read_file(redis_conf_file)
            redis_host = config.get("redis","host")
            redis_port = config.get("redis","port")
    except Exception as msg:
        redis_host = "127.0.0.1"
        redis_port = 6379
        print(msg)

    try:
        logging.info("connect redis server.")
        redis_conn = redis.StrictRedis(redis_host,redis_port,0)
        logging.info("put systeminfo to redis server.")
        redis_conn.lpush("host",xagent_info())
        logging.info("data:{}".format(agent_info()))
        logging.info("put successful....")

def thread_job(func):
    target =threading.Thread(target=func)
    target.start()

if __name__ == "__main__":
    schedule.every(2).seconds.do(thread_job,put_redis)
    while True:
        schedule.run_pending()
        time.sleep(1)
相關文章
相關標籤/搜索