需求:python
一、全部要檢測的資源url放到一個單獨文件中web
二、檢測cdn節點資源大小與源站文件大小是否一致多線程
三、隨機抽查幾個資源,檢查md5sum是否一致app
四、使用多線程,可配置線程數dom
代碼目錄:python2.7
hexm:Hexm hexm$ tree ./checkcdn ./checkcdn ├── README.TXT ├── check.py # 主程序 ├── conf │ └── url.txt # 配置文件 ├── lib │ ├── __init__.py │ ├── common.py │ └── threadpool.py # 線程池 └── tmp ├── cdn # 存放從CDN節點系在的資源 └── origin # 存放從源站下載的資源
README.TXTide
依賴: requests 兼容性: 兼容Python3以及Python2.7 使用方法: usage: check.py [-h] [-t THREADS] [-c COUNTS] optional arguments: -h, --help show this help message and exit -t THREADS, --threads THREADS 開啓多少線程,默認5個 -c COUNTS, --counts COUNTS 檢測多少個包的md5值,默認3個
conf/url.txt函數
http://xxx_1020101.apk
http://xxx_1020102.apk
http://xxx_1020103.apk
http://xxx_1020104.apk
check.pythis
#!/usr/bin/env python # -*- coding:utf-8 -*- # File Name : check.py # Author : hexm # Mail : xiaoming.unix@gmail.com # Created Time : 2017-03-24 10:03 import os import sys import random import argparse import requests BASE_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.append(BASE_DIR) # 代理IP PROXIES = { "http": "http://183.136.135.191:80", } # 配置文件 CONFIG = BASE_DIR + '/conf/url.txt' # 保存CDN節點文件臨時目錄 CDNTEMPDIR = BASE_DIR + '/tmp/cdn/' # 保存源站文件臨時目錄 ORIGINTEMPDIR = BASE_DIR + '/tmp/origin/' from lib.threadpool import ThreadPool from lib.common import isdir, download, getfilemd5 def callback(status, result): """ 回調函數,若是函數有返回值得話用獲得 :param status: 狀態 True or None :param result: 函數返回值 """ pass def checkstatus(url): """ 經過head方法查看源站與當前CDN節點資源大小是否一致 :param url: url :return: None """ r1 = requests.head(url, proxies=PROXIES) r2 = requests.head(url) if r1.status_code == 200 and r2.status_code == 200: if r1.headers['Content-Length'] == r2.headers['Content-Length']: print("%s 源站和CDN節點資源\033[0;32m一致\033[0m, 源站文件大小爲%s,CDN節點文件大小爲%s" % (url,r1.headers['Content-Length'],r2.headers['Content-Length'])) else: print("%s 源站和CDN節點資源\033[0;31m不一致\033[0m, 源站文件大小爲%s,CDN節點文件大小爲%s" % (url,r1.headers['Content-Length'],r2.headers['Content-Length'])) else: print("%s 源站和CDN節點狀態碼\033[0;31m異常\033[0m,源站狀態碼爲%s,CDN節點狀態碼爲%s" % (url,r1.status_code,r2.status_code)) def checkmd5(url, cdnTempDir, originTempDir): """ 檢查源站與當前cdn節點資源是否一致,下載超時300s :param url: url :param cdnTempDir: 保存從cdn節點下載的臨時文件目錄 :param originTempDir: 保存從源站下載的臨時文件目錄 :return: None """ filename = url.split('/')[-1] tempCdnFile = cdnTempDir + filename tempOriginFile = originTempDir + filename status1 = download(url, tempOriginFile, proxies=PROXIES) if status1 is not None: if status1 == 200: status2 = download(url, tempCdnFile) else: print("%s \033[0;31m狀態碼異常\033[0m校驗失敗" % url) if status1 == 200 and status2 == 200: if getfilemd5(tempCdnFile) == getfilemd5(tempOriginFile): print("%s 源站和cdn節點資源md5值\033[0;32m一致\033[0m," % url) else: print("%s 源站和cdn節點資源md5值\033[0;31m不一致\033[0m" % url) elif status1 is None or status2 is None: print("%s \033[0;31m下載失敗\033[0m" % url) # 檢查後刪除下載的文件 try: os.remove(tempOriginFile) os.remove(tempCdnFile) except Exception as e: pass def parse_args(): """ 解析命令行參數 :return: args """ parser = argparse.ArgumentParser() help = '開啓多少線程,默認5個' parser.add_argument('-t', '--threads', type=int, help=help, default='5') help = '檢測多少個包的md5值,默認3個' parser.add_argument('-c', '--counts', type=int, help=help, default=3) args = parser.parse_args() return args if __name__ == "__main__": if not isdir(CDNTEMPDIR): os.makedirs(CDNTEMPDIR) if not isdir(ORIGINTEMPDIR): os.makedirs(ORIGINTEMPDIR) # 從文件中獲取全部url urls = [line.strip() for line in open(CONFIG, mode='r').readlines()] args = parse_args() # 檢查包大小 pool = ThreadPool(args.threads) # 最多建立5個線程 for url in urls: pool.run(checkstatus, (url,), callback=None) # 隨機抽查3個,檢查md5 for randurl in random.sample(urls, args.counts): pool.run(checkmd5, (randurl, CDNTEMPDIR, ORIGINTEMPDIR,), callback=None) pool.close()
lib/common.pyurl
#!/usr/bin/env python # -*- coding:utf-8 -*- # File Name : common.py # Author : hexm # Mail : xiaoming.unix@gmail.com # Created Time : 2017-03-24 10:03 import os import hashlib import requests def getfilesize(path): """ 獲取文件大小 :param path: 文件路徑 :return: 返回文件大小 """ return os.path.getsize(path) def isfile(path): """ 判斷是不是文件 :param path: 文件路徑 :return: 若是是返回True,不然返回None """ if os.path.isfile(path): return True def isdir(path): """ 判斷是不是目錄 :param path: 路徑 :return: True or None """ if os.path.isdir(path): return True def getstatus(url, proxies=None): """ 返回狀態碼 :param url: url :return: 狀態碼 """ return requests.head(url, proxies).status_code def download(url, path, proxies=None): """ 下載文件,並返回狀態碼 :param url: 下載的url :param path: 保存文件的路徑 :param proxies: 使用代理的地址 :return: 返回狀態碼 """ try: response = requests.get(url, proxies=proxies, stream=True, timeout=60) status = response.status_code total_size = int(response.headers['Content-Length']) # print(response.headers) if status == 200: with open(path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) if total_size == getfilesize(path): # 下載文件大小與頭部Content-Length大小一致,則下載成功 return status # 狀態碼非200,返回狀態碼 else: return status except Exception as e: return None def getfilemd5(path): """ 返回文件的md5sum :param path: 文件路徑 :return: 返回校驗和,不然返回None """ if isfile(path): md5obj = hashlib.md5() maxbuf = 8192 f = open(path, 'rb') while True: buf = f.read(maxbuf) if not buf: break md5obj.update(buf) f.close() hash = md5obj.hexdigest() return hash return None if __name__ == "__main__": pass
lib/threadpool.py
#!/usr/bin/env python # -*- coding:utf-8 -*- # File Name : threadpool.py # Author : hexm # Mail : xiaoming.unix@gmail.com # Created Time : 2017-03-23 20:03 import sys if sys.version > '3': import queue else: import Queue as queue import threading import contextlib import time StopEvent = object() # 終止線程信號 class ThreadPool(object): """ 一、解決線程重用問題,當前線程執行完任務後,不殺掉,放到空閒線程列表,繼續執行下個任務 二、根據任務量開啓線程,若是設置10個線程,只有2個任務,最多隻會開啓兩個線程 三、若是有500個任務,任務執行很是快,2個線程就能完成,若是設置開啓10個線程, 只會開啓兩個線程 """ def __init__(self, max_num, max_task_num = None): if max_task_num: self.q = queue.Queue(max_task_num) # 指定任務最大數,默認爲None,不限定 else: self.q = queue.Queue() self.max_num = max_num # 最多多少線程 self.cancel = False # 執行完全部任務,終止線程信號 self.terminal = False # 不管執行完畢與否,都終止全部線程 self.generate_list = [] # 已建立多少線程 self.free_list = [] # 空閒多少線程 def run(self, func, args, callback=None): """ 線程池執行一個任務 :param func: 任務函數 :param args: 任務函數所需參數 :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態;二、任務函數返回值 :return: 若是線程池已經終止,則返回True不然None """ if self.cancel: return # 沒有空閒線程 而且已建立線程小於最大線程數才建立線程, if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() # 知足則建立線程,並將任務放進隊列 w = (func, args, callback,) # 函數,元組,函數 ,將這三個參數放在元組裏面,當成一個總體放到隊列裏面 self.q.put(w) # 知足條件則建立線程,並把任務放隊列裏面 def generate_thread(self): """ 建立一個線程 """ t = threading.Thread(target=self.call) # 每個線程被建立,執行call方法 t.start() def call(self): """ 循環去獲取任務函數並執行任務函數 """ current_thread = threading.currentThread() self.generate_list.append(current_thread) # 每建立一個線程,將當前線程名加進已建立的線程列表 event = self.q.get() # 在隊列中取任務, 沒任務線程就阻塞,等待取到任務,線程繼續向下執行 while event != StopEvent: # 是否知足終止線程 func, arguments, callback = event # 取出隊列中一個任務 try: result = func(*arguments) # 執行函數,並將參數傳進去 success = True except Exception as e: success = False result = None if callback is not None: try: callback(success, result) except Exception as e: pass with self.worker_state(self.free_list, current_thread): # 當前線程執行完任務,將當前線程置於空閒狀態, #這個線程等待隊列中下一個任務到來,若是沒來,一直處於空閒, 若是到來,去任務 if self.terminal: event = StopEvent else: event = self.q.get() # 將當前任務加入到空閒列表後,若是有任務,取到,沒有阻塞 取到後,移除當前線程 else: # 知足終止線程,在建立的線程列表中移除當前線程 self.generate_list.remove(current_thread) def close(self): """ 執行完全部的任務後,殺掉全部線程 """ self.cancel = True # 標誌設置爲True full_size = len(self.generate_list) + 1 # 已生成線程個數, +1 針對python2.7 while full_size: self.q.put(StopEvent) # full_size -= 1 def terminate(self): """ 不管是否還有任務,終止線程 """ self.terminal = True while self.generate_list: self.q.put(StopEvent) self.q.queue.clear() @contextlib.contextmanager def worker_state(self, state_list, worker_thread): """ 用於記錄線程中正在等待的線程數 """ state_list.append(worker_thread) # 將當前空閒線程加入空閒列表 try: yield finally: state_list.remove(worker_thread) # 取到任務後,將當前空閒線程從空閒線程裏移除, # 使用例子 if __name__ == "__main__": pool = ThreadPool(5) # 建立pool對象,最多建立5個線程 def callback(status, result): pass def action(i): time.sleep(1) print(i) for i in range(30): # 共30個任務 ret = pool.run(action, (i,), callback=None) # 將action函數,及action的參數,callback函數傳給run()方法 pool.close()
例子: