最近在作一個爬蟲相關的項目,單線程的整站爬蟲,耗時真的不是通常的巨大,運行一次也是心累,,,因此,要想實現整站爬蟲,多線程是不可避免的,那麼python多線程又應該怎樣實現呢?這裏主要要幾個問題(關於python多線程的GIL問題就再也不說了,網上太多了)。html
1、 既然多線程能夠縮短程序運行時間,那麼,是否是線程數量越多越好呢?python
顯然,並非,每個線程的從生成到消亡也是須要時間和資源的,太多的線程會佔用過多的系統資源(內存開銷,cpu開銷),並且生成太多的線程時間也是可觀的,極可能會得不償失,這裏給出一個最佳線程數量的計算方式:服務器
最佳線程數的獲取:多線程
一、經過用戶慢慢遞增來進行性能壓測,觀察QPS(即每秒的響應請求數,也便是最大吞吐能力。),響應時間app
二、根據公式計算:服務器端最佳線程數量=((線程等待時間+線程cpu時間)/線程cpu時間) * cpu數量python2.7
三、單用戶壓測,查看CPU的消耗,而後直接乘以百分比,再進行壓測,通常這個值的附近應該就是最佳線程數量。函數
2、爲何要使用線程池?性能
對於任務數量不斷增長的程序,每有一個任務就生成一個線程,最終會致使線程數量的失控,例如,整站爬蟲,假設初始只有一個連接a,那麼,這個時候只啓動一個線程,運行以後,獲得這個連接對應頁面上的b,c,d,,,等等新的連接,做爲新任務,這個時候,就要爲這些新的連接生成新的線程,線程數量暴漲。在以後的運行中,線程數量還會不停的增長,徹底沒法控制。因此,對於任務數量不端增長的程序,固定線程數量的線程池是必要的。url
3、如何實現線程池?spa
這裏,我分別介紹三種實現方式:
一、過去:
使用threadpool模塊,這是個python的第三方模塊,支持python2和python3,具體使用方式以下:
#! /usr/bin/env python # -*- coding: utf-8 -*- import threadpool import time def sayhello (a): print("hello: "+a) time.sleep(2) def main(): global result seed=["a","b","c"] start=time.time() task_pool=threadpool.ThreadPool(5) requests=threadpool.makeRequests(sayhello,seed) for req in requests: task_pool.putRequest(req) task_pool.wait() end=time.time() time_m = end-start print("time: "+str(time_m)) start1=time.time() for each in seed: sayhello(each) end1=time.time() print("time1: "+str(end1-start1)) if __name__ == '__main__': main()
運行結果以下:
threadpool是一個比較老的模塊了,如今雖然還有一些人在用,但已經再也不是主流了,關於python多線程,如今已經開始步入將來(future模塊)了
二、將來:
使用concurrent.futures模塊,這個模塊是python3中自帶的模塊,可是,python2.7以上版本也能夠安裝使用,具體使用方式以下:
#! /usr/bin/env python # -*- coding: utf-8 -*- from concurrent.futures import ThreadPoolExecutor import time def sayhello(a): print("hello: "+a) time.sleep(2) def main(): seed=["a","b","c"] start1=time.time() for each in seed: sayhello(each) end1=time.time() print("time1: "+str(end1-start1)) start2=time.time() with ThreadPoolExecutor(3) as executor: for each in seed: executor.submit(sayhello,each) end2=time.time() print("time2: "+str(end2-start2)) start3=time.time() with ThreadPoolExecutor(3) as executor1: executor1.map(sayhello,seed) end3=time.time() print("time3: "+str(end3-start3)) if __name__ == '__main__': main()
運行結果以下:
注意到一點:
concurrent.futures.ThreadPoolExecutor,在提交任務的時候,有兩種方式,一種是submit()函數,另外一種是map()函數,二者的主要區別在於:
2.一、map能夠保證輸出的順序, submit輸出的順序是亂的
2.二、若是你要提交的任務的函數是同樣的,就能夠簡化成map。可是假如提交的任務函數是不同的,或者執行的過程之可能出現異常(使用map執行過程當中發現問題會直接拋出錯誤)就要用到submit()
2.三、submit和map的參數是不一樣的,submit每次都須要提交一個目標函數和對應的參數,map只須要提交一次目標函數,目標函數的參數放在一個迭代器(列表,字典)裏就能夠。
3.如今?
這裏要考慮一個問題,以上兩種線程池的實現都是封裝好的,任務只能在線程池初始化的時候添加一次,那麼,假設我如今有這樣一個需求,須要在線程池運行時,再往裏面添加新的任務(注意,是新任務,不是新線程),那麼要怎麼辦?
其實有兩種方式:
3.一、重寫threadpool或者future的函數:
這個方法須要閱讀源模塊的源碼,必須搞清楚源模塊線程池的實現機制才能正確的根據本身的須要重寫其中的方法。
3.二、本身構建一個線程池:
這個方法就須要對線程池的有一個清晰的瞭解了,附上我本身構建的一個線程池:
#! /usr/bin/env python # -*- coding: utf-8 -*- import threading import Queue import hashlib import logging from utils.progress import PrintProgress from utils.save import SaveToSqlite class ThreadPool(object): def __init__(self, thread_num, args): self.args = args self.work_queue = Queue.Queue() self.save_queue = Queue.Queue() self.threads = [] self.running = 0 self.failure = 0 self.success = 0 self.tasks = {} self.thread_name = threading.current_thread().getName() self.__init_thread_pool(thread_num) # 線程池初始化 def __init_thread_pool(self, thread_num): # 下載線程 for i in range(thread_num): self.threads.append(WorkThread(self)) # 打印進度信息線程 self.threads.append(PrintProgress(self)) # 保存線程 self.threads.append(SaveToSqlite(self, self.args.dbfile)) # 添加下載任務 def add_task(self, func, url, deep): # 記錄任務,判斷是否已經下載過 url_hash = hashlib.new('md5', url.encode("utf8")).hexdigest() if not url_hash in self.tasks: self.tasks[url_hash] = url self.work_queue.put((func, url, deep)) logging.info("{0} add task {1}".format(self.thread_name, url.encode("utf8"))) # 獲取下載任務 def get_task(self): # 從隊列裏取元素,若是block=True,則一直阻塞到有可用元素爲止。 task = self.work_queue.get(block=False) return task def task_done(self): # 表示隊列中的某個元素已經執行完畢。 self.work_queue.task_done() # 開始任務 def start_task(self): for item in self.threads: item.start() logging.debug("Work start") def increase_success(self): self.success += 1 def increase_failure(self): self.failure += 1 def increase_running(self): self.running += 1 def decrease_running(self): self.running -= 1 def get_running(self): return self.running # 打印執行信息 def get_progress_info(self): progress_info = {} progress_info['work_queue_number'] = self.work_queue.qsize() progress_info['tasks_number'] = len(self.tasks) progress_info['save_queue_number'] = self.save_queue.qsize() progress_info['success'] = self.success progress_info['failure'] = self.failure return progress_info def add_save_task(self, url, html): self.save_queue.put((url, html)) def get_save_task(self): save_task = self.save_queue.get(block=False) return save_task def wait_all_complete(self): for item in self.threads: if item.isAlive(): # join函數的意義,只有當前執行join函數的線程結束,程序才能接着執行下去 item.join() # WorkThread 繼承自threading.Thread class WorkThread(threading.Thread): # 這裏的thread_pool就是上面的ThreadPool類 def __init__(self, thread_pool): threading.Thread.__init__(self) self.thread_pool = thread_pool #定義線程功能方法,即,當thread_1,...,thread_n,調用start()以後,執行的操做。 def run(self): print (threading.current_thread().getName()) while True: try: # get_task()獲取從工做隊列裏獲取當前正在下載的線程,格式爲func,url,deep do, url, deep = self.thread_pool.get_task() self.thread_pool.increase_running() # 判斷deep,是否獲取新的連接 flag_get_new_link = True if deep >= self.thread_pool.args.deep: flag_get_new_link = False # 此處do爲工做隊列傳過來的func,返回值爲一個頁面內容和這個頁面上全部的新連接 html, new_link = do(url, self.thread_pool.args, flag_get_new_link) if html == '': self.thread_pool.increase_failure() else: self.thread_pool.increase_success() # html添加到待保存隊列 self.thread_pool.add_save_task(url, html) # 添加新任務,即,將新頁面上的不重複的連接加入工做隊列。 if new_link: for url in new_link: self.thread_pool.add_task(do, url, deep + 1) self.thread_pool.decrease_running() # self.thread_pool.task_done() except Queue.Empty: if self.thread_pool.get_running() <= 0: break except Exception, e: self.thread_pool.decrease_running() # print str(e) break