異常處理html
name = "aa" try: # 這一段是要執行的主體 print(name) # except Exception as msg: # 若是沒有正常執行,就執行下面的這一段。Exception 是錯誤類型。 except : # 若是沒有正常執行,就執行下面的這一段。Exception 是錯誤類型(有好多類型,能夠百度查下,錯誤類型 ,儘可能寫的準確些)。 print("name is err...") else: # 只有當try的那段 正常執行了後,纔會執行下面的這一段。 print("welcome: {}.".format(name)) finally: # 不管有沒有正常執行 都會執行這一段的。 print("this is test.")
斷言 assert python
格式: assert 表達式, '異常錯誤信息' # 當表達式不成立時,就會拋後面的錯誤信息。 assert name == "aa","name not wf"
raise 異常類型("錯誤提示") ;和try結合使用mysql
做用: 1.捕獲異常爲了讓程序不至於中斷,在邏輯控制以內。 2.拋出異常是爲了,保證程序在遇到這種問題的時候必須停止。 v1 = "wf" li = [1,23] try: # print(v1) # print(li[4]) raise IndexError("索引不存在") # 通常不用 # 爲啥寫這麼多except呢?方便記錄日誌. except IndexError as msg: # 錯誤類型是索引 print(msg) except Exception as msg: # 全部的錯誤類型,你能夠這麼些 print(msg) else: print("good") finally: print("test")
def send_mail(msg,mail): print("msg:{} to mail:{}".format(msg,mail)) send_mail("hellow","wangfei1000@yeah.net") class mail(): def __init__(self,msg,mail): #構造方法,無須調用,自動執行 self.msg = msg self.mail = mail def send_mail(self): print("send msg:{} to mail:{}".format(self.msg, self.mail)) # 建立一個對象,obj 就是對象。obj是類對象的指針。 obj = mail("hello","wf@qq.com") 他指向mail這個類。 obj.send_mail()
何時能用到類呢? 我我的理解:redis
例如我寫一個zabbix 增刪改查腳本,每次操做都須要使用zabbix建立好的這個鏈接。 我固然能夠寫成面向過程或面向函數的這種腳本。可是它的可讀性和代碼的簡潔度很差。 我寫成一個類,不管是使用仍是可讀性都會變的很好。
class c1(): # 父類;基類 def f1(self): v1 = 1 print(v1) def f2(self): v2 = 2 print(v2) class c2(c1): # 子類;派生類,如何繼承多個父類呢? c2(c1,c4) name = "wangfei" # 靜態字段 def __init__(self,age): self.age = age def f1(self): v1 = "c2_2" # 普通字段 print(v1) def f3(self): v3 = 3 print(v3) # obj = c2("18") # # # obj.f1() # 子類的方法是優先父類的方法 # # obj.f2() # 子類沒有這個方法就到父類去找 # re = c2.name # 靜態字段經過類來訪問的 # print(re) # re2 = obj # 普通字段經過對象來訪問 # print(re2)
啥叫多態呢? 能夠往類裏傳入任何數據類型的參數。 sql
多繼承注意事項。mongodb
1. 當只有一條道的時候,就是一條路走到底。子類繼承父類,父類繼承父父類。 2. 當有2條道的時候,左邊的的優先於右邊的。當左邊的道走到第三步尚未走到時,就要返回同級走第二條道到底了。 3. 這裏須要注意的是,self是表明是第一層的obj 就是最小的那代類。
類的方法編程
class c1(): v2 = 20 def __init__(self,v1): # 構造方法 self.v1 = v1 def f1(self): # 普通方法 print(self.v1) print(c1.v2) @staticmethod def f2(): # 靜態方法,經過類來調用;爲了節省內存 print(c1.v2) obj1 = c1(120) c1.f2() # 經過類來訪問靜態方法 obj1.f2() # 經過對象來訪問
類的屬性json
class c1(): v2 = 20 def __init__(self,v1): self.v1 = v1 def f1(self): print(self.v1) print(c1.v2) @property # 這個就叫作屬性. def f2(self): print(c1.v2) def f3(self): re = obj1.f2 # print(re) obj1 = c1(120) obj1.f2 # 奇怪吧? 經過訪問字段的方式進行使用的
獲取訪問url的狀態碼多線程
import requests res = requests.get("http://www.baidu.com/") print(res.status_code)
案列1:讀取url列表,進行測試,若是狀態碼是40x 50x 的就打印出來。生產環境就是調用接口進行報警。
import requests url_list = ['http://www.baidu.com', 'http://www.sina.com.cn', 'http://adminset.cn'] err_http_code = [401,402,403,404,500,501,502,503,504,505] for url in url_list: res = requests.get(url) if res.status_code in err_htttp_code: print("access {} http code failed".format(url))
案例2:上方案例從文件中讀取url列表,進行判斷。
import requests error_code_list = [401,402,403,404,500,501,502,503,504,505] with open("url_list.txt","rb") as url_file: while True: url_line = url_file.readline() if url_line: str_url_line = str(url_line,encoding="utf-8") str_url = str_url_line.split("\n")[0] # 從文件裏面讀取的行,後面是有換行符的。\n的 不能直接使用,須要去掉哦。 res = requests.get(str_url) if res.status_code in error_code_list: print("url: {}, status code: {}".format(str_url,res.status_code)) else: break
若是url是包含https,怎麼處理呢 ?
res = requests.get("https://www.12306.cn",verify = False) print(res)
請求api接口,獲取添加的主機信息。因爲請求的時候,須要token信息,url須要拼接下。
# url後面的」「 表示後面要接參數 ,由於在字典裏面定義了(para變量),requests模塊加了params後會自動進行 拼接得。 # http://115.28.147.154/cmdb/get/host/?token="token信息"&name=「user」 para = {'token': 'PnSlpwJDQE3L', 'name': 'adminset'} res = requests.get("http://115.28.147.154/cmdb/get/host/", params = para) # 查看拼接後的url print(res.url) # 獲取結果 data = r.text import json # 進行反序列化 d1 = json.loads(data) print(d1['data']['ip'])
向redis裏面寫入數據,讀取數據。
import redis redis_conn = redis.StrictRedis(host="127.0.0.1",port=6379,db=0) redis_conn.set("name","wangfei") # 寫入數據 re = redis_conn.get("name") # 讀取數據數據 print(re) redis_conn.lpush("list2","python class") # push的是一個列表
例子:獲取主機名和ip地址 ,而後寫到redis裏面去。
import redis import socket def get_sys_info(): try: hostname = socket.gethostname() ip = socket.gethostbyname(hostname) except Exception as msg: print(msg) hostname = "" ip = "" finally: data = {"hostname": hostname,"ip":ip} return data # 連接redis redis_conn = redis.StrictRedis("127.0.0.1",6379) try: # 獲取系統信息 re = get_sys_info() # 將系統信息寫到redis,做爲列表裏的一個元素。 redis_conn.lpush("host",re) except Exception as msg: print(msg) # 上面這一段或者是這麼寫。 # re = get_sys_info() # assert redis_conn.lpush("host",re),"lpush error"
Level | Numeric value |
---|---|
CRITICAL | 50 |
ERROR | 40 |
WARNING | 30 |
INFO | 20 |
DEBUG | 10 |
import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s', datefmt='%Y%m%d %H:%M:%S', filename="./agent.log", filemode='a') log = "redis connecting+++++++++++++" logging.critical(log)
psutil 模塊獲取系統信息
import psutil # 獲取cpu的數量信息 print(psutil.cpu_count()) # 邏輯核心數 print(psutil.cpu_count(logical=False)) # 物理核心數 # 獲取全部磁盤分區信息 # print(psutil.disk_partitions()) res = psutil.disk_partitions() for mount in res: print(mount[1]) # 獲取磁盤分區的使用信息 # print(psutil.disk_usage("/")) res = psutil.disk_partitions() for mount in res: print(psutil.disk_usage(mount[1])) # 獲取內存信息 print(sutil.virtual_memory()) print(sutil.virtual_memory().total)
configparser 模塊讀取配置文件。
import configparser # 如何從一個文件裏面讀取參數? # 讀取文件流程 # 第1步 初始化模塊(我本身想的,幫助記憶理解) config = configparser.RawConfigParser() # 第2步 打開配置參數文件 redis_conf_file = open("redis.conf","r") # 第3步 讀取全部參數內容 config.read_file(redis_conf_file) # 第4步 獲取對應key下面的參數。 print(config.get("redis","host")) print(config.get("redis","port")) ----------------------- 備註:redis.conf內容 [redis] host = 127.0.0.1 port = 6379
例子: 從配置文件裏面讀取redis 地址和端口信息,而後寫一個信息到redis裏面去。
import redis # python2 python3 導入configparser模塊名不同。 try: import configparser except Exception as msg: print(msg) import ConfigParser config = configparser.RawConfigParser() # 打開配置文件,讀取內容 try: with open("redis.conf","r") as redis_conf_file: config.read_file(redis_conf_file) host = config.get("redis","host") port = config.get("redis","port") except Exception as msg: print(msg) # 寫信息到redis裏面去 try: redis_conn = redis.StrictRedis(host,port,0) redis_conn.set("test_key","test_values") except Exception as msg: print(msg)
# 定時任務模塊 # time.sleep 是整個腳本程序都中斷了,任務是串行的。使用schedule 是異步執行的。 import schedule import time import threading def hello(): 60s print("2 hello") def morning(): 3600s print("1 morning") def job_thread(func): # 多線程 t = threading.Thread(target=func) t.start() schedule.every(2).seconds.do(job_thread, hello) # 每隔2秒執行一次任務。 schedule.every(1).seconds.do(job_thread, morning) # 每隔1秒 執行一次任務。 while True: # 這個是固定的寫法,每隔1s 檢查查定時任務 schedule.run_pending() time.sleep(1)
import xlrd # 讀取excel文件內容 #### 從Excel文件讀取內容 zabbix_file = xlrd.open_workbook("./zabbix-host.xls") zabbix_file_table = zabbix_file.sheet_by_name("sheet01") print(zabbix_file_table.nrows) # 獲取行數 print(zabbix_file_table.row_values(0)) # 獲取指定行的內容 print( zabbix_file_table.ncols) # 獲取列數 print(zabbix_file_table.col_values(2)) # 獲取指定列的內容 print(zabbix_file_table.cell(1,2)) # 獲取1行2列內容 #### 向Excel文件寫入內容 import xlwt # 向Excel文件裏寫入內容 zabbix_file = xlwt.Workbook(encoding="ascii") # 指定編碼 file_sheet = zabbix_file.add_sheet("sheet02") # 新建工做簿 file_sheet.write(0,1,label = "hehe") # 寫入內容 file_sheet.write(0,2,label = "hehe") file_sheet.write(0,4,label = "hehe") file_sheet.write(0,3,label = "hehe") zabbix_file.save("test.xls") # 保存
from pyzabbix import ZabbixAPI import xlrd import configparser import logging import sys import os class zabbix(): def __init__(self,host_file): self.host_file = host_file # 日誌模塊 logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s', datefmt='%Y%m%d %H:%M:%S', filename="./zabbix.log", filemode='a') # 讀取zabbix 配置文件 try: config = configparser.RawConfigParser() with open("/Users/feiwang/PycharmProjects/python-study2/work/zabbix.conf","r") as zabbix_conf_file: config.read_file(zabbix_conf_file) self.zabbix_user = config.get("zabbix","user") self.zabbix_passwd = config.get("zabbix","passwd") except Exception as msg: logging.error(msg) raise msg # 登陸zabbix try: self.zapi = ZabbixAPI("http://192.168.4.135/zabbix") self.zapi.login(self.zabbix_user,self.zabbix_passwd) logging.info("connecting zabbix successful.") except Exception as msg: logging.error("connecting zabbix failed" + "detail:{}".format(msg)) raise msg def load_zabbix(self): # 讀取excel文件 try: zabbix_file = xlrd.open_workbook(self.host_file) zabbix_file_table = zabbix_file.sheet_by_name("sheet01") logging.info("open host.xls ok.") except Exception as msg: logging.error(msg) zabbix_file = '' zabbix_file_table = '' # 讀取zabbix 信息 host_list = [] for row in range(1, zabbix_file_table.nrows): row_values = zabbix_file_table.row_values(row) visible_name = row_values[1] hostname = row_values[0] port = row_values[-1] ip = row_values[-2] group_id = "2" create_host = { "host": hostname, "description": visible_name, "name": visible_name, "interfaces": [ { "type": 1, "main": 1, "useip": 1, "ip": ip, "dns": "", "port": port }, ], "groups": [{ "groupid": group_id }, ], } host_list.append(create_host) return host_list def add_host(self): for host in self.load_zabbix(): try: self.zapi.host.create(host) logging.info("create status: ok, create host info:{}".format(host)) except Exception as msg: print(msg) logging.error("zabbix create hosts failed. {}".format(msg)) # # del host def del_host(self): ''' 循環zabbix 上獲取到的主機,而後和本地excel文件裏要刪除的主機進行對比,若是是的,那麼就把他刪掉。 刪除用hostid :return: ''' for host in obj.zapi.host.get(): for del_host in self.load_zabbix(): if host["host"] == del_host["host"]: try: self.zapi.host.delete(host["hostid"]) print("delete host{} is ok.".format(host["host"])) logging.info("del host:{} ok".format(host["hostid"])) except Exception as msg: print(msg) logging.error("del host:{} failed,".format(host["host"])) if __name__ == "__main__": action,file = sys.argv[1:] assert os.path.exists(file),"file:'{}' doesn't exists".format(file) obj = zabbix(file) if action == "add": obj.add_host() elif action == "del": obj.del_host() elif action == "update": pass elif action == "get": pass else: print("action is err.") raise Exception
要求:
Agent腳本內容
import json import platform import schedule import psutil import configparser import redis import logging import threading import socket import time import math def log(): logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s', # 日誌格式 datefmt='%Y%m%d %H:%M:%S', # 時間格式 filename="./agent.log", # 日誌文件的路徑 filemode='a') # 文件的打開模式 return logging.basicConfig log() def get_sys(): try: hostname = socket.gethostname() ip = socket.gethostbyname(hostname) except Exception as msg: print(msg) hostname = "" ip = "" data = {"hostname":hostname,"ip":ip} return data def get_cpu(): cpu_count_logical = psutil.cpu_count(logical=False) cpu_count_phycial = psutil.cpu_count(logical=True) cpu_percent = psutil.cpu_percent(interval=2) data = {"phycial_count":cpu_count_phycial,"logical":cpu_count_logical,"percent":cpu_percent} return data def get_mem(): # math.ceil(小數) 取整數 mem_total = math.ceil(psutil.virtual_memory().total /1024/1024/1024) mem_percent = psutil.virtual_memory().percent data = {"mem_total":mem_total,"mem_percent":mem_percent} return data def get_disk(): disk_list = [] all_disk_part = psutil.disk_partitions() for partition in all_disk_part: mount_point = partition[1] mount_total = math.ceil(psutil.disk_usage(mount_point).total/1024/1024/1024) mount_percent = psutil.disk_usage(mount_point).percent data = {"mount": mount_point,"total":mount_total,"percent":mount_percent} disk_list.append(data) return disk_list def thread_run(): pass def agent_info(): sys_data = {} sys_data["host"] = get_sys() sys_data["disk"] = get_disk() sys_data["mem"] = get_mem() sys_data["cpu"] = get_cpu() return json.dumps(sys_data) # 進行反序列化 def put_redis(): config = configparser.RawConfigParser() try: with open("redis.conf","r") as redis_conf_file: config.read_file(redis_conf_file) redis_host = config.get("redis","host") redis_port = config.get("redis","port") except Exception as msg: redis_host = "127.0.0.1" redis_port = 6379 print(msg) try: logging.info("connect redis server.") redis_conn = redis.StrictRedis(redis_host,redis_port,0) logging.info("put systeminfo to redis server.") redis_conn.lpush("host",xagent_info()) logging.info("data:{}".format(agent_info())) logging.info("put successful....") def thread_job(func): target =threading.Thread(target=func) target.start() if __name__ == "__main__": schedule.every(2).seconds.do(thread_job,put_redis) while True: schedule.run_pending() time.sleep(1)