如今有一段代碼,須要掃描一個網段內的ip地址,是否能夠ping通。html
執行起來效率太慢,須要使用協程。python
#!/usr/bin/env python # -*- coding: utf-8 -*- import os import time import signal import subprocess import gevent import gevent.pool from gevent import monkey;monkey.patch_all() def custom_print(content,colour='white'): """ 寫入日誌文件 :param content: 內容 :param colour: 顏色 :return: None """ # 顏色代碼 colour_dict = { 'red': 31, # 紅色 'green': 32, # 綠色 'yellow': 33, # 黃色 'blue': 34, # 藍色 'purple_red': 35, # 紫紅色 'bluish_blue': 36, # 淺藍色 'white': 37, # 白色 } choice = colour_dict.get(colour) # 選擇顏色 info = "\033[1;{};1m{}\033[0m".format(choice, content) print(info) def execute_linux2(cmd, timeout=10, skip=False): """ 執行linux命令,返回list :param cmd: linux命令 :param timeout: 超時時間,生產環境, 特別卡, 所以要3秒 :param skip: 是否跳過超時限制 :return: list """ p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid) t_beginning = time.time() # 開始時間 while True: if p.poll() is not None: break seconds_passed = time.time() - t_beginning if not skip: if seconds_passed > timeout: # p.terminate() # p.kill() # raise TimeoutError(cmd, timeout) custom_print('錯誤, 命令: {},本地執行超時!'.format(cmd),"red") # 當shell=True時,只有os.killpg才能kill子進程 try: # time.sleep(1) os.killpg(p.pid, signal.SIGUSR1) except Exception as e: pass return False result = p.stdout.readlines() # 結果輸出列表 return result class NetworkTest(object): def __init__(self): self.flag_list = [] def check_ping(self,ip): """ 檢查ping :param ip: ip地址 :return: none """ cmd = "ping %s -c 2" % ip # print(cmd) # 本機執行命令 res = execute_linux2(cmd,2) # print(res) if not res: custom_print("錯誤, 執行命令: {} 失敗".format(cmd), "red") self.flag_list.append(False) return False res.pop() # 刪除最後一個元素 last_row = res.pop().decode('utf-8').strip() # 再次獲取最後一行結果 if not last_row: custom_print("錯誤,執行命令: {} 異常","red") self.flag_list.append(False) return False res = last_row.split() # 切割結果 # print(res,type(res),len(res)) if len(res) <10: custom_print("錯誤,切割 ping 結果異常","red") self.flag_list.append(False) return False if res[5] == "0%": # 判斷丟包率 custom_print("正常, ip: {} ping正常 丟包率0%".format(ip), "green") else: self.flag_list.append(False) custom_print("錯誤, ip: {} ping異常 丟包率100%".format(ip), "red") def main(self): """ 主程序 :return: """ for num in range(1, 256): ip = '192.168.10.{}'.format(num) self.check_ping(ip) if __name__ == '__main__': startime = time.time() # 開始時間 NetworkTest().main() endtime = time.time() take_time = endtime - startime if take_time < 1: # 判斷不足1秒時 take_time = 1 # 設置爲1秒 # 計算花費時間 m, s = divmod(take_time, 60) h, m = divmod(m, 60) custom_print("本次花費時間 %02d:%02d:%02d" % (h, m, s),"green")
改形成,協程執行。linux
#!/usr/bin/env python # -*- coding: utf-8 -*- import os import time import signal import subprocess import gevent import gevent.pool from gevent import monkey;monkey.patch_all() def custom_print(content,colour='white'): """ 寫入日誌文件 :param content: 內容 :param colour: 顏色 :return: None """ # 顏色代碼 colour_dict = { 'red': 31, # 紅色 'green': 32, # 綠色 'yellow': 33, # 黃色 'blue': 34, # 藍色 'purple_red': 35, # 紫紅色 'bluish_blue': 36, # 淺藍色 'white': 37, # 白色 } choice = colour_dict.get(colour) # 選擇顏色 info = "\033[1;{};1m{}\033[0m".format(choice, content) print(info) def execute_linux2(cmd, timeout=10, skip=False): """ 執行linux命令,返回list :param cmd: linux命令 :param timeout: 超時時間,生產環境, 特別卡, 所以要3秒 :param skip: 是否跳過超時限制 :return: list """ p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid) t_beginning = time.time() # 開始時間 while True: if p.poll() is not None: break seconds_passed = time.time() - t_beginning if not skip: if seconds_passed > timeout: # p.terminate() # p.kill() # raise TimeoutError(cmd, timeout) custom_print('錯誤, 命令: {},本地執行超時!'.format(cmd),"red") # 當shell=True時,只有os.killpg才能kill子進程 try: # time.sleep(1) os.killpg(p.pid, signal.SIGUSR1) except Exception as e: pass return False result = p.stdout.readlines() # 結果輸出列表 return result class NetworkTest(object): def __init__(self): self.flag_list = [] def check_ping(self,ip): """ 檢查ping :param ip: ip地址 :return: none """ cmd = "ping %s -c 2" % ip # print(cmd) # 本機執行命令 res = execute_linux2(cmd,2) # print(res) if not res: custom_print("錯誤, 執行命令: {} 失敗".format(cmd), "red") self.flag_list.append(False) return False res.pop() # 刪除最後一個元素 last_row = res.pop().decode('utf-8').strip() # 再次獲取最後一行結果 if not last_row: custom_print("錯誤,執行命令: {} 異常","red") self.flag_list.append(False) return False res = last_row.split() # 切割結果 # print(res,type(res),len(res)) if len(res) <10: custom_print("錯誤,切割 ping 結果異常","red") self.flag_list.append(False) return False if res[5] == "0%": # 判斷丟包率 custom_print("正常, ip: {} ping正常 丟包率0%".format(ip), "green") else: self.flag_list.append(False) custom_print("錯誤, ip: {} ping異常 丟包率100%".format(ip), "red") def main(self): """ 主程序 :return: """ process_list = [] for num in range(1, 256): ip = '192.168.10.{}'.format(num) # self.check_ping(ip) # 將任務加到列表中 process_list.append(gevent.spawn(self.check_ping, ip)) gevent.joinall(process_list) # 等待全部協程結束 if __name__ == '__main__': startime = time.time() # 開始時間 NetworkTest().main() endtime = time.time() take_time = endtime - startime if take_time < 1: # 判斷不足1秒時 take_time = 1 # 設置爲1秒 # 計算花費時間 m, s = divmod(take_time, 60) h, m = divmod(m, 60) custom_print("本次花費時間 %02d:%02d:%02d" % (h, m, s),"green")
執行輸出:shell
... 錯誤, 命令: ping 192.168.10.250 -c 2,本地執行超時! 錯誤, 執行命令: ping 192.168.10.250 -c 2 失敗 錯誤, 命令: ping 192.168.10.255 -c 2,本地執行超時! 錯誤, 執行命令: ping 192.168.10.255 -c 2 失敗 本次花費時間 00:00:07
注意:切勿在windows系統中運行,不然會報錯windows
AttributeError: module 'os' has no attribute 'setsid'
上面直接將全部任務加到列表中,而後一次性,所有異步執行。那麼同一時刻,最多有多少任務執行呢?服務器
不知道,可能有256個吧?併發
注意:若是這個一個很耗CPU的程序,可能會致使服務器,直接卡死。app
那麼,咱們應該要限制它的併發數。這個時候,須要使用協程池,固定併發數。異步
好比:固定爲100個ide
#!/usr/bin/env python # -*- coding: utf-8 -*- import os import time import signal import subprocess import gevent import gevent.pool from gevent import monkey;monkey.patch_all() def custom_print(content,colour='white'): """ 寫入日誌文件 :param content: 內容 :param colour: 顏色 :return: None """ # 顏色代碼 colour_dict = { 'red': 31, # 紅色 'green': 32, # 綠色 'yellow': 33, # 黃色 'blue': 34, # 藍色 'purple_red': 35, # 紫紅色 'bluish_blue': 36, # 淺藍色 'white': 37, # 白色 } choice = colour_dict.get(colour) # 選擇顏色 info = "\033[1;{};1m{}\033[0m".format(choice, content) print(info) def execute_linux2(cmd, timeout=10, skip=False): """ 執行linux命令,返回list :param cmd: linux命令 :param timeout: 超時時間,生產環境, 特別卡, 所以要3秒 :param skip: 是否跳過超時限制 :return: list """ p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid) t_beginning = time.time() # 開始時間 while True: if p.poll() is not None: break seconds_passed = time.time() - t_beginning if not skip: if seconds_passed > timeout: # p.terminate() # p.kill() # raise TimeoutError(cmd, timeout) custom_print('錯誤, 命令: {},本地執行超時!'.format(cmd),"red") # 當shell=True時,只有os.killpg才能kill子進程 try: # time.sleep(1) os.killpg(p.pid, signal.SIGUSR1) except Exception as e: pass return False result = p.stdout.readlines() # 結果輸出列表 return result class NetworkTest(object): def __init__(self): self.flag_list = [] def check_ping(self,ip): """ 檢查ping :param ip: ip地址 :return: none """ cmd = "ping %s -c 2" % ip # print(cmd) # 本機執行命令 res = execute_linux2(cmd,2) # print(res) if not res: custom_print("錯誤, 執行命令: {} 失敗".format(cmd), "red") self.flag_list.append(False) return False res.pop() # 刪除最後一個元素 last_row = res.pop().decode('utf-8').strip() # 再次獲取最後一行結果 if not last_row: custom_print("錯誤,執行命令: {} 異常","red") self.flag_list.append(False) return False res = last_row.split() # 切割結果 # print(res,type(res),len(res)) if len(res) <10: custom_print("錯誤,切割 ping 結果異常","red") self.flag_list.append(False) return False if res[5] == "0%": # 判斷丟包率 custom_print("正常, ip: {} ping正常 丟包率0%".format(ip), "green") else: self.flag_list.append(False) custom_print("錯誤, ip: {} ping異常 丟包率100%".format(ip), "red") def main(self): """ 主程序 :return: """ process_list = [] pool= gevent.pool.Pool(100) # 協程池固定爲100個 for num in range(1, 256): ip = '192.168.10.{}'.format(num) # self.check_ping(ip) # 將任務加到列表中 process_list.append(pool.spawn(self.check_ping, ip)) gevent.joinall(process_list) # 等待全部協程結束 if __name__ == '__main__': startime = time.time() # 開始時間 NetworkTest().main() endtime = time.time() take_time = endtime - startime if take_time < 1: # 判斷不足1秒時 take_time = 1 # 設置爲1秒 # 計算花費時間 m, s = divmod(take_time, 60) h, m = divmod(m, 60) custom_print("本次花費時間 %02d:%02d:%02d" % (h, m, s),"green")
再次執行,效果以下:
... 錯誤, 執行命令: ping 192.168.10.254 -c 2 失敗 錯誤, 命令: ping 192.168.10.255 -c 2,本地執行超時! 錯誤, 執行命令: ping 192.168.10.255 -c 2 失敗 本次花費時間 00:00:15
能夠,發現花費的時間,明顯要比上面慢了!
其實,還有一種寫法,使用pool.map,語法以下:
pool.map(func,iterator)
好比:
pool.map(self.get_kernel, NODE_LIST)
注意:func是一個方法,iterator是一個迭代器。好比:list就是一個迭代器
使用map時,func只能接收一個參數。這個參數就是,遍歷迭代器的每個值。
使用map,完整代碼以下:
#!/usr/bin/env python # -*- coding: utf-8 -*- import os import time import signal import subprocess import gevent import gevent.pool from gevent import monkey;monkey.patch_all() def custom_print(content,colour='white'): """ 寫入日誌文件 :param content: 內容 :param colour: 顏色 :return: None """ # 顏色代碼 colour_dict = { 'red': 31, # 紅色 'green': 32, # 綠色 'yellow': 33, # 黃色 'blue': 34, # 藍色 'purple_red': 35, # 紫紅色 'bluish_blue': 36, # 淺藍色 'white': 37, # 白色 } choice = colour_dict.get(colour) # 選擇顏色 info = "\033[1;{};1m{}\033[0m".format(choice, content) print(info) def execute_linux2(cmd, timeout=10, skip=False): """ 執行linux命令,返回list :param cmd: linux命令 :param timeout: 超時時間,生產環境, 特別卡, 所以要3秒 :param skip: 是否跳過超時限制 :return: list """ p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid) t_beginning = time.time() # 開始時間 while True: if p.poll() is not None: break seconds_passed = time.time() - t_beginning if not skip: if seconds_passed > timeout: # p.terminate() # p.kill() # raise TimeoutError(cmd, timeout) custom_print('錯誤, 命令: {},本地執行超時!'.format(cmd),"red") # 當shell=True時,只有os.killpg才能kill子進程 try: # time.sleep(1) os.killpg(p.pid, signal.SIGUSR1) except Exception as e: pass return False result = p.stdout.readlines() # 結果輸出列表 return result class NetworkTest(object): def __init__(self): self.flag_list = [] def check_ping(self,ip): """ 檢查ping :param ip: ip地址 :return: none """ cmd = "ping %s -c 2" % ip # print(cmd) # 本機執行命令 res = execute_linux2(cmd,2) # print(res) if not res: custom_print("錯誤, 執行命令: {} 失敗".format(cmd), "red") self.flag_list.append(False) return False res.pop() # 刪除最後一個元素 last_row = res.pop().decode('utf-8').strip() # 再次獲取最後一行結果 if not last_row: custom_print("錯誤,執行命令: {} 異常","red") self.flag_list.append(False) return False res = last_row.split() # 切割結果 # print(res,type(res),len(res)) if len(res) <10: custom_print("錯誤,切割 ping 結果異常","red") self.flag_list.append(False) return False if res[5] == "0%": # 判斷丟包率 custom_print("正常, ip: {} ping正常 丟包率0%".format(ip), "green") else: self.flag_list.append(False) custom_print("錯誤, ip: {} ping異常 丟包率100%".format(ip), "red") def main(self): """ 主程序 :return: """ pool= gevent.pool.Pool(100) # 協程池固定爲100個 ip_list = ["192.168.10.{}".format(i) for i in range(1, 256)] # 使用pool.map,語法:pool.map(func,iterator) pool.map(self.check_ping, ip_list) if __name__ == '__main__': startime = time.time() # 開始時間 NetworkTest().main() endtime = time.time() take_time = endtime - startime if take_time < 1: # 判斷不足1秒時 take_time = 1 # 設置爲1秒 # 計算花費時間 m, s = divmod(take_time, 60) h, m = divmod(m, 60) custom_print("本次花費時間 %02d:%02d:%02d" % (h, m, s),"green")
注意:方法只有一個參數的狀況下,使用pool.map,一行就能夠搞定。這樣看起來,比較精簡!
若是方法,有多個參數,須要借用偏函數實現。
完整代碼以下:
#!/usr/bin/env python3 # coding: utf-8 #!/usr/bin/env python # -*- coding: utf-8 -*- import os import time import signal import subprocess import gevent import gevent.pool from gevent import monkey;monkey.patch_all() from functools import partial def custom_print(content,colour='white'): """ 寫入日誌文件 :param content: 內容 :param colour: 顏色 :return: None """ # 顏色代碼 colour_dict = { 'red': 31, # 紅色 'green': 32, # 綠色 'yellow': 33, # 黃色 'blue': 34, # 藍色 'purple_red': 35, # 紫紅色 'bluish_blue': 36, # 淺藍色 'white': 37, # 白色 } choice = colour_dict.get(colour) # 選擇顏色 info = "\033[1;{};1m{}\033[0m".format(choice, content) print(info) def execute_linux2(cmd, timeout=10, skip=False): """ 執行linux命令,返回list :param cmd: linux命令 :param timeout: 超時時間,生產環境, 特別卡, 所以要3秒 :param skip: 是否跳過超時限制 :return: list """ p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid) t_beginning = time.time() # 開始時間 while True: if p.poll() is not None: break seconds_passed = time.time() - t_beginning if not skip: if seconds_passed > timeout: # p.terminate() # p.kill() # raise TimeoutError(cmd, timeout) custom_print('錯誤, 命令: {},本地執行超時!'.format(cmd),"red") # 當shell=True時,只有os.killpg才能kill子進程 try: # time.sleep(1) os.killpg(p.pid, signal.SIGUSR1) except Exception as e: pass return False result = p.stdout.readlines() # 結果輸出列表 return result class NetworkTest(object): def __init__(self): self.flag_list = [] def check_ping(self,ip,timeout): """ 檢查ping :param ip: ip地址 :param ip: 超時時間 :return: none """ cmd = "ping %s -c 2 -W %s" %(ip,timeout) # print(cmd) # 本機執行命令 res = execute_linux2(cmd,2) # print("res",res,"ip",ip,"len",len(res)) if not res: custom_print("錯誤, 執行命令: {} 失敗".format(cmd), "red") self.flag_list.append(False) return False if len(res) != 7: custom_print("錯誤,執行命令: {} 異常".format(cmd), "red") self.flag_list.append(False) return False res.pop() # 刪除最後一個元素 last_row = res.pop().decode('utf-8').strip() # 再次獲取最後一行結果 if not last_row: custom_print("錯誤,執行命令: {} 獲取結果異常","red") self.flag_list.append(False) return False res = last_row.split() # 切割結果 # print(res,type(res),len(res)) if len(res) <10: custom_print("錯誤,切割 ping 結果異常","red") self.flag_list.append(False) return False if res[5] == "0%": # 判斷丟包率 custom_print("正常, ip: {} ping正常 丟包率0%".format(ip), "green") else: self.flag_list.append(False) custom_print("錯誤, ip: {} ping異常 丟包率100%".format(ip), "red") def main(self): """ 主程序 :return: """ pool= gevent.pool.Pool(100) # 協程池固定爲100個 ip_list = ["192.168.0.{}".format(i) for i in range(1, 256)] # 使用協程池,執行任務。語法: pool.map(func,iterator) # partial使用偏函數傳遞參數 # 注意:has_null第一個參數,必須是迭代器遍歷的值 pool.map(partial(self.check_ping, timeout=1), ip_list) if __name__ == '__main__': startime = time.time() # 開始時間 NetworkTest().main() endtime = time.time() take_time = endtime - startime if take_time < 1: # 判斷不足1秒時 take_time = 1 # 設置爲1秒 # 計算花費時間 m, s = divmod(take_time, 60) h, m = divmod(m, 60) custom_print("本次花費時間 %02d:%02d:%02d" % (h, m, s),"green")
執行腳本,效果同上
本文參考連接: