需求設定:
家裏的寬帶最近很是不穩定,師傅上門後,總感受仍是有問題。寫了個腳本,每隔N秒連一下網,看看有沒有掉網。python
實現方案:
python3,聯網使用requests庫,
使用自帶的logging模塊,發現鏈接不上,寫入到logging.log文件中
爲了實驗多進程,特地將聯網專門啓動了一個子進程:
父進程:啓動子進程,子進程中得到消息,由於從子進程中獲取消息的方法是阻塞的,得單獨啓動一個線程,對收到的信息進行loging
子進程:每隔N秒聯網,發現鏈接不上,就向主進程發送消息。同時啓動一個線程來接收主進程相關的消息
進程中通訊,使用pipe的方式,pipe()返回一對鏈接對象,表明了pipe的兩端。每一個對象都有send()和recv()方法。網絡
代碼以下:
函數
# -*- encoding:utf-8 -*- import requests import multiprocessing import time import threading import logging import os #初始化Log,將Log記錄到 def iniLog(): logfile = 'logging.log' log_format = '%(filename)s [%(asctime)s] [%(levelname)s] %(message)s' #todox jeig support utf8 encode handler = logging.FileHandler(logfile, "a",encoding = "UTF-8") formatter = logging.Formatter(log_format) handler.setFormatter(formatter) root_logger = logging.getLogger() root_logger.addHandler(handler) root_logger.setLevel(logging.INFO) #子進程接收消息 def recv_message_child(conn): while True: print('子進程正在等待消息') print('得到消息:'+conn.recv()) #父進程接收消息 def recv_message_parent(conn): while True: print('父進程正在等待消息') print('得到消息:'+conn.recv()) logging.info(conn.recv()) #嘗試聯網的時間間隔,以秒爲單位 TRY_TIME_SPACE_SECCOND = 15 #子進程監測網絡,每隔TRY_TIME_SPACE_SECCOND秒來訪問下百度 def checkNet_process(conn): #打印進程號 print("當前進程ID:%s,父進程ID:%s" % (os.getpid(),os.getppid())) # 子進程啓動線程,來接收父進程發送的消息 t = threading.Thread(target=recv_message_child,args=(conn,)) t.setDaemon(True) t.start() #t.join() headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:43.0) Gecko/20100101 Firefox/43.0', } while True: print('child process is running') try: response = requests.get('http://www.baidu.com',headers=headers) print('返回碼:'+str(response.status_code)) print(type(response.status_code)) if response.status_code != 200: #do some task print('+++++++++++++++++++++++++++++++++++++++網絡沒法訪問,狀態碼:{0}'.format(response.status_code)) conn.send('網絡沒法訪問,狀態碼:{0}'.format(response.status_code)) else: print('網絡正常訪問') except Exception as e: print('網絡異常報錯') conn.send('網絡報錯:'+str(e)) finally: time.sleep(TRY_TIME_SPACE_SECCOND) if __name__ == '__main__': iniLog() logging.info('net check start!') #開闢兩個口,都是能進能出,括號中若是False即單向通訊 conn_parent,conn_child=multiprocessing.Pipe() p=multiprocessing.Process(target=checkNet_process,args=(conn_child,)) #子進程使用sock口,調用checkNet_process函數 p.start() #父進程啓動線程,來接收子進程發送的消息 t = threading.Thread(target=recv_message_parent, args=(conn_parent,)) t.setDaemon(True) t.start() t.join()