腳本檢測CDN節點資源是否與源站資源一致

需求:python

  一、全部要檢測的資源url放到一個單獨文件中web

  二、檢測cdn節點資源大小與源站文件大小是否一致多線程

  三、隨機抽查幾個資源,檢查md5sum是否一致app

  四、使用多線程,可配置線程數dom

 

代碼目錄:python2.7

hexm:Hexm hexm$ tree ./checkcdn
./checkcdn
├── README.TXT
├── check.py   # 主程序
├── conf
│   └── url.txt  # 配置文件
├── lib
│   ├── __init__.py
│   ├── common.py 
│   └── threadpool.py # 線程池
└── tmp
    ├── cdn  # 存放從CDN節點系在的資源
    └── origin # 存放從源站下載的資源

 

README.TXTide

依賴:
    requests
兼容性:
    兼容Python3以及Python2.7

使用方法:
    usage: check.py [-h] [-t THREADS] [-c COUNTS]

    optional arguments:
      -h, --help            show this help message and exit
      -t THREADS, --threads THREADS
                            開啓多少線程,默認5個
      -c COUNTS, --counts COUNTS
                            檢測多少個包的md5值,默認3個

conf/url.txt函數

http://xxx_1020101.apk
http://xxx_1020102.apk
http://xxx_1020103.apk
http://xxx_1020104.apk

check.pythis

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# File Name    : check.py
# Author       : hexm
# Mail         : xiaoming.unix@gmail.com
# Created Time : 2017-03-24 10:03


import os
import sys
import random
import argparse
import requests


BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.append(BASE_DIR)

# 代理IP
PROXIES = {
    "http": "http://183.136.135.191:80",
}
# 配置文件
CONFIG = BASE_DIR + '/conf/url.txt'
# 保存CDN節點文件臨時目錄
CDNTEMPDIR = BASE_DIR + '/tmp/cdn/'
# 保存源站文件臨時目錄
ORIGINTEMPDIR = BASE_DIR + '/tmp/origin/'

from lib.threadpool import ThreadPool
from lib.common import isdir, download, getfilemd5

def callback(status, result):
    """
    回調函數,若是函數有返回值得話用獲得
    :param status: 狀態 True or None
    :param result: 函數返回值
    """
    pass

def checkstatus(url):
    """
    經過head方法查看源站與當前CDN節點資源大小是否一致
    :param url: url
    :return: None
    """

    r1 = requests.head(url, proxies=PROXIES)
    r2 = requests.head(url)

    if r1.status_code == 200 and r2.status_code == 200:
        if r1.headers['Content-Length'] == r2.headers['Content-Length']:
            print("%s 源站和CDN節點資源\033[0;32m一致\033[0m, 源站文件大小爲%s,CDN節點文件大小爲%s"
                  % (url,r1.headers['Content-Length'],r2.headers['Content-Length']))
        else:
            print("%s 源站和CDN節點資源\033[0;31m不一致\033[0m, 源站文件大小爲%s,CDN節點文件大小爲%s"
                  % (url,r1.headers['Content-Length'],r2.headers['Content-Length']))
    else:
        print("%s 源站和CDN節點狀態碼\033[0;31m異常\033[0m,源站狀態碼爲%s,CDN節點狀態碼爲%s"
              % (url,r1.status_code,r2.status_code))

def checkmd5(url, cdnTempDir, originTempDir):
    """
    檢查源站與當前cdn節點資源是否一致,下載超時300s
    :param url: url
    :param cdnTempDir: 保存從cdn節點下載的臨時文件目錄
    :param originTempDir: 保存從源站下載的臨時文件目錄
    :return: None
    """

    filename = url.split('/')[-1]
    tempCdnFile = cdnTempDir + filename
    tempOriginFile = originTempDir + filename

    status1 = download(url, tempOriginFile, proxies=PROXIES)

    if status1 is not None:
        if status1 == 200:
            status2 = download(url, tempCdnFile)
        else:
            print("%s \033[0;31m狀態碼異常\033[0m校驗失敗" % url)

    if status1 == 200 and status2 == 200:
        if getfilemd5(tempCdnFile) == getfilemd5(tempOriginFile):
            print("%s 源站和cdn節點資源md5值\033[0;32m一致\033[0m," % url)
        else:
            print("%s 源站和cdn節點資源md5值\033[0;31m不一致\033[0m" % url)
    elif status1 is None or status2 is None:
        print("%s \033[0;31m下載失敗\033[0m" % url)

    # 檢查後刪除下載的文件
    try:
        os.remove(tempOriginFile)
        os.remove(tempCdnFile)
    except Exception as e:
        pass

def parse_args():
    """
    解析命令行參數
    :return: args
    """

    parser = argparse.ArgumentParser()
    help = '開啓多少線程,默認5個'
    parser.add_argument('-t', '--threads', type=int, help=help, default='5')

    help = '檢測多少個包的md5值,默認3個'
    parser.add_argument('-c', '--counts', type=int, help=help, default=3)

    args = parser.parse_args()
    return args

if __name__ == "__main__":

    if not isdir(CDNTEMPDIR): os.makedirs(CDNTEMPDIR)
    if not isdir(ORIGINTEMPDIR): os.makedirs(ORIGINTEMPDIR)

    # 從文件中獲取全部url
    urls = [line.strip() for line in open(CONFIG, mode='r').readlines()]
    args = parse_args()

    # 檢查包大小
    pool = ThreadPool(args.threads)  # 最多建立5個線程
    for url in urls:
        pool.run(checkstatus, (url,), callback=None)

    # 隨機抽查3個,檢查md5
    for randurl in random.sample(urls, args.counts):
        pool.run(checkmd5, (randurl, CDNTEMPDIR, ORIGINTEMPDIR,), callback=None)
    pool.close()
check.py

lib/common.pyurl

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# File Name    : common.py
# Author       : hexm
# Mail         : xiaoming.unix@gmail.com
# Created Time : 2017-03-24 10:03

import os
import hashlib
import requests

def getfilesize(path):
    """
    獲取文件大小
    :param path: 文件路徑
    :return: 返回文件大小
    """
    return os.path.getsize(path)

def isfile(path):
    """
    判斷是不是文件
    :param path: 文件路徑
    :return: 若是是返回True,不然返回None
    """
    if os.path.isfile(path): return True

def isdir(path):
    """
    判斷是不是目錄
    :param path: 路徑
    :return: True or None
    """
    if os.path.isdir(path): return True

def getstatus(url, proxies=None):
    """
    返回狀態碼
    :param url: url
    :return: 狀態碼
    """
    return requests.head(url, proxies).status_code

def download(url, path, proxies=None):
    """
    下載文件,並返回狀態碼
    :param url: 下載的url
    :param path: 保存文件的路徑
    :param proxies: 使用代理的地址
    :return: 返回狀態碼
    """
    try:
        response = requests.get(url, proxies=proxies, stream=True, timeout=60)

        status = response.status_code
        total_size = int(response.headers['Content-Length'])
    # print(response.headers)
        if status == 200:
            with open(path, 'wb') as f:
               for chunk in response.iter_content(chunk_size=8192):
                   if chunk: f.write(chunk)
            if total_size == getfilesize(path):  # 下載文件大小與頭部Content-Length大小一致,則下載成功
                return status
    # 狀態碼非200,返回狀態碼
        else: return status
    except Exception as e:
        return None

def getfilemd5(path):
    """
    返回文件的md5sum
    :param path: 文件路徑
    :return: 返回校驗和,不然返回None
    """
    if isfile(path):
        md5obj = hashlib.md5()
        maxbuf = 8192
        f = open(path, 'rb')
        while True:
            buf = f.read(maxbuf)
            if not buf:
                break
            md5obj.update(buf)
        f.close()
        hash = md5obj.hexdigest()
        return hash
    return None

if __name__ == "__main__":
    pass
View Code

lib/threadpool.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# File Name    : threadpool.py
# Author       : hexm
# Mail         : xiaoming.unix@gmail.com
# Created Time : 2017-03-23 20:03

import sys
if sys.version > '3':
    import queue
else:
    import Queue as queue
import threading
import contextlib
import time

StopEvent = object()  # 終止線程信號

class ThreadPool(object):
    """
    一、解決線程重用問題,當前線程執行完任務後,不殺掉,放到空閒線程列表,繼續執行下個任務
    二、根據任務量開啓線程,若是設置10個線程,只有2個任務,最多隻會開啓兩個線程
    三、若是有500個任務,任務執行很是快,2個線程就能完成,若是設置開啓10個線程,
        只會開啓兩個線程
    """

    def __init__(self, max_num, max_task_num = None):
        if max_task_num:
            self.q = queue.Queue(max_task_num)  # 指定任務最大數,默認爲None,不限定
        else:
            self.q = queue.Queue()
        self.max_num = max_num  # 最多多少線程
        self.cancel = False  # 執行完全部任務,終止線程信號
        self.terminal = False  # 不管執行完畢與否,都終止全部線程
        self.generate_list = []  # 已建立多少線程
        self.free_list = []  # 空閒多少線程

    def run(self, func, args, callback=None):
        """
        線程池執行一個任務
        :param func: 任務函數
        :param args: 任務函數所需參數
        :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態;二、任務函數返回值
        :return: 若是線程池已經終止,則返回True不然None
        """
        if self.cancel:
            return
        # 沒有空閒線程 而且已建立線程小於最大線程數才建立線程,
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()  # 知足則建立線程,並將任務放進隊列
        w = (func, args, callback,)
        # 函數,元組,函數 ,將這三個參數放在元組裏面,當成一個總體放到隊列裏面
        self.q.put(w)  # 知足條件則建立線程,並把任務放隊列裏面


    def generate_thread(self):
        """
        建立一個線程
        """
        t = threading.Thread(target=self.call)  # 每個線程被建立,執行call方法
        t.start()

    def call(self):
        """
        循環去獲取任務函數並執行任務函數
        """
        current_thread = threading.currentThread()
        self.generate_list.append(current_thread)  # 每建立一個線程,將當前線程名加進已建立的線程列表

        event = self.q.get()  # 在隊列中取任務, 沒任務線程就阻塞,等待取到任務,線程繼續向下執行
        while event != StopEvent:  # 是否知足終止線程

            func, arguments, callback = event  # 取出隊列中一個任務
            try:
                result = func(*arguments)  # 執行函數,並將參數傳進去
                success = True
            except Exception as e:
                success = False
                result = None

            if callback is not None:
                try:
                    callback(success, result)
                except Exception as e:
                    pass

            with self.worker_state(self.free_list, current_thread):  # 當前線程執行完任務,將當前線程置於空閒狀態,
                #這個線程等待隊列中下一個任務到來,若是沒來,一直處於空閒, 若是到來,去任務
                if self.terminal:
                    event = StopEvent
                else:
                    event = self.q.get()   # 將當前任務加入到空閒列表後,若是有任務,取到,沒有阻塞 取到後,移除當前線程
        else: # 知足終止線程,在建立的線程列表中移除當前線程
            self.generate_list.remove(current_thread)

    def close(self):
        """
        執行完全部的任務後,殺掉全部線程
        """
        self.cancel = True   # 標誌設置爲True
        full_size = len(self.generate_list) + 1  # 已生成線程個數, +1 針對python2.7
        while full_size:
            self.q.put(StopEvent)  #
            full_size -= 1

    def terminate(self):
        """
        不管是否還有任務,終止線程
        """
        self.terminal = True

        while self.generate_list:
            self.q.put(StopEvent)

        self.q.queue.clear()

    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
        """
        用於記錄線程中正在等待的線程數
        """
        state_list.append(worker_thread)  # 將當前空閒線程加入空閒列表
        try:
            yield
        finally:
            state_list.remove(worker_thread)  # 取到任務後,將當前空閒線程從空閒線程裏移除,

# 使用例子
if __name__ == "__main__":

    pool = ThreadPool(5)  # 建立pool對象,最多建立5個線程

    def callback(status, result):
        pass

    def action(i):
       time.sleep(1)
       print(i)

    for i in range(30):  # 共30個任務
       ret = pool.run(action, (i,), callback=None)  # 將action函數,及action的參數,callback函數傳給run()方法
    pool.close()
View Code

 

例子:

相關文章
相關標籤/搜索