一、單進程多線程模式python
# #!/usr/bin/env python # # -*- coding:utf-8 -*- import time import logging import requests import threading from concurrent import futures # download_url = 'http://192.168.188.110:8081//workspace/record_download/polls/82003533467_18b305da-e313-11e8-aa39-00163e0a6bde.mp3' # download_url = 'http://192.168.188.110:8081//workspace/record_download/polls/test.log' download_url = 'http://192.168.188.110:8081//workspace/record_download/polls/9921_057128214999_18210532807_20181113110420_00163e104dbfbb8b11e8e6f0d0990876(3).wav' workers = 1000 mutex = threading.Lock() session = requests.Session() contain = {'average_cost':0,'min_cost':0,'max_cost':0,'hit_count':0} def handle(cost): with mutex: min_cost = contain['min_cost'] max_cost = contain['max_cost'] hit_count = contain['hit_count'] average_cost = contain['average_cost'] if min_cost == 0: contain['min_cost'] = cost if min_cost > cost: contain['min_cost'] = cost if max_cost < cost: contain['max_cost'] = cost average_cost = (average_cost*hit_count + cost) / (hit_count + 1) hit_count +=1 contain['average_cost'] = average_cost contain['hit_count'] = hit_count logging.info(contain) def download_one(): while True: try: stime = time.time() request = requests.Request(method='GET', url=download_url,) prep = session.prepare_request(request) response = session.send(prep, timeout=100) etime = time.time() # print(response.content) logging.info('thread[%s] status[%s] cost[%s]',threading.current_thread().ident, response.status_code,etime-stime) handle(float(etime-stime)) except Exception as e: logging.error(e) print(e) def main(): with futures.ThreadPoolExecutor(workers) as executor: for i in range(workers): executor.submit(download_one) if __name__ == '__main__': logging.basicConfig(filename="client.log", level=logging.INFO, format="%(asctime)s [%(filename)s:%(lineno)d] %(message)s", datefmt="%m/%d/%Y %H:%M:%S [%A]") main()
二、多進程多線程模式session
# #!/usr/bin/env python # # -*- coding:utf-8 -*- import os import time import logging import requests import threading from multiprocessing import Lock,Manager from concurrent import futures download_url = 'http://192.168.188.105:8888' workers = 250 cpu_count = 4 session = requests.Session() def handle(cost,mutex,contain): with mutex: min_cost = contain['min_cost'] max_cost = contain['max_cost'] hit_count = contain['hit_count'] average_cost = contain['average_cost'] if min_cost == 0: contain['min_cost'] = cost if min_cost > cost: contain['min_cost'] = cost if max_cost < cost: contain['max_cost'] = cost average_cost = (average_cost*hit_count + cost) / (hit_count + 1) hit_count +=1 contain['average_cost'] = average_cost contain['hit_count'] = hit_count logging.info(contain) def download_one(mutex,contain): while True: try: stime = time.time() request = requests.Request(method='GET', url=download_url,) prep = session.prepare_request(request) response = session.send(prep, timeout=50) etime = time.time() print(response.status_code) logging.info('process[%s] thread[%s] status[%s] cost[%s]',os.getpid(),threading.current_thread().ident, response.status_code,etime-stime) handle(float(etime-stime),mutex,contain) # time.sleep(1) except Exception as e: logging.error(e) print(e) def new_thread_pool(mutex,contain): with futures.ThreadPoolExecutor(workers) as executor: for i in range(workers): executor.submit(download_one,mutex,contain) def subprocess(): manager = Manager() mutex = manager.Lock() contain = manager.dict({'average_cost': 0, 'min_cost': 0, 'max_cost': 0, 'hit_count': 0}) with futures.ProcessPoolExecutor(cpu_count) as executor: for i in range(cpu_count): executor.submit(new_thread_pool,mutex,contain) if __name__ == '__main__': logging.basicConfig(filename="client.log", level=logging.INFO, format="%(asctime)s [%(filename)s:%(lineno)d] %(message)s", datefmt="%m/%d/%Y %H:%M:%S [%A]") subprocess()