事前最好了解一下什麼是進程,什麼是線程,什麼是GIL,本文再也不贅述,直接介紹模塊的使用:html
普通的python爬蟲是單進程單線程的,這樣在遇到大量重複的操做時就只能逐個進行,咱們就很難過了。舉個栗子:你有1000個美圖的連接,逐個餵給下載器(函數),看着圖片只能一個個蹦出來,你不心急嗎?因而咱們想,能不能同時跑多個下載器,實現多圖同時下載?——答案是能夠的,使用多進程/多線程,把每一個帶着不一樣參數下載器分給每一個進程/線程就,而後同時跑多個進程/線程就好了。python
本文就介紹如何用多線程和多進程給爬蟲加速算法
補充主線程與子線程(進程同理):json
Python標準庫本來有threading和multiprocessing模塊編寫相應的多線程/多進程代碼。但從Python3.2開始,標準庫爲咱們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現了對threading和multiprocessing的更高級的抽象,對編寫線程池/進程池提供了直接的支持。多進程咱們介紹futures的ProcessPoolExecutor
注:python 2.7 請安裝future模塊,pip install future
segmentfault
ProcessPoolExecutor類是Executor類的子類,實例化ProcessPoolExecutor類以建立進程池,在實例化的過程當中應指定同時運行的最大進程數api
from concurrent.futures import ProcessPoolExecutor pool = ProcessPoolExecutor(max_workers=4) # 運行最大進程數4 #進程池的操做... pool.shutdown(wait=True) # 關閉進程池,默認等待全部進程的完成。 print('Deep') # 有shutdown的狀況下全部進程完成後纔會運行下面的print,沒有的話會立刻運行 '建立進程也可用with,這時會自帶shutdown功能 with ProcessPoolExecutor(4) as pool: #進程池的操做... '
該類有兩種方法對進程池提交任務創建進程(函數及一組參數構成一個任務),分別是submit()
和map()
,若是單純想多開進程別無他想,用哪一個都行,但submit()會有更靈活的用法網絡
能夠理解這是python自帶map()的多進程版,他返回的是一個迭代器,包含每一個任務對應的返回值(有序的),下面用例子來分析多線程
from concurrent.futures import ProcessPoolExecutor import time def test(x): time.sleep(x) # 時間阻塞 print(str(x)+'s') return x if __name__ == '__main__': with ProcessPoolExecutor(4) as pool: p = pool.map(test,[2,3,10,5,6]) for i in p: print(i)
輸出併發
2s 2 3s 3 5s 6s 10s 10 5 6
分析(下面以參數代替某個進程):app
2s,3s,5s,6s,10s
2,3,10,5,6
,是有序的,對應各任務的返回值在爬蟲中,上面代碼中的時間阻塞會對應着網絡I/O阻塞,任務中每每包含着網絡請求。好比你有不少個圖片連接,就寫一個下載圖片的函數(接收一個圖片連接的參數),把函數和圖片連接的集合餵給map()就實現多進程了加速了。
該方法是往進程池中提交可回調的任務,並返回一個future實例。提交多個任務可用循環實現,返回的future實例用列表存起來,每一個future表明一個進程。關於future對象有許多方法:
from concurrent.futures import ProcessPoolExecutor,as_completed import time def test(x): time.sleep(x) print(str(x)+'s') return x if __name__ == '__main__': with ProcessPoolExecutor(4) as pool: futures = [pool.submit(test,i) for i in [2,3,10,5,6]] '''for j in futures: print(j.result()) # 對應接收參數有序輸出,輸出2,3,10,5,6 ''' for j in as_completed(futures): print(j.result()) # 對應進程完成順序輸出,輸出2,3,5,6,10
建議當心使用,雖然多線程能實現高併發,但因爲線程資源共享的特性,某個線程操做這些共享的資源時可能操到一半就中止讓給另外一個線程操做,致使錯亂的發生。爲避免此狀況發生對某些操做須要加鎖,因此這裏介紹對鎖有支持的threading模塊,python自帶直接導入。
若是你確信這些操做不會發生錯亂,能夠直接使用concurrent.future 的 ThreadPoolExecutor,方法什麼的和ProcessPoolExecutor的同樣
建立線程有兩種方法:
實例化 threading.Thread 類,target接收函數,arg以可迭代形式接收參數。這種方法最簡單
import threading import time def test(x): time.sleep(x) print(str(x)+'s') return x t1 = threading.Thread(target=test, args=(1,)) # 建立線程 t2 = threading.Thread(target=test, args=(3,)) t1.start() # 啓動線程 t2.start()
繼承threading.Thread 類,重寫run方法,把函數及參數接收寫進本身寫的多線程類中。這種方法更靈活,threading.Thread 類並無供獲取線程調用函數返回值的方法,若是須要函數返回值就須要繼承該類本身實現
import threading import time class TestThread(threading.Thread): def __init__(self,x): threading.Thread.__init__(self) self.x = x # 參數接收 def run(self): time.sleep(self.x) # 原來的函數寫到run中 print(str(self.x)+'s') def result(self): # 實現獲取調用函數的返回值的方法 return self.x t1 = TestThread(1) #建立線程 t2 = TestThread(3) t1.start() # 啓動線程 t2.start() t1.join() # 等待線程結束 t2.join() print(t1.result(),t2.result())
線程相關方法和屬性:
線程間資源共享,若是多個線程共同對某個數據修改,可能會出現錯誤,爲了保證數據的正確性,須要對多個線程進行同步。這時就須要引入鎖了(利用GIL),鎖只有一個,一個線程在持有鎖的狀態下對某些數據進行操做,其餘線程就沒法對該數據進行操做,直至該線程釋放鎖讓其餘線程搶,誰搶到誰就有權修改。
threading提供Lock和RLock兩類鎖,前者一個線程只能獲取獲取一次鎖,後者容許一個線程能重複獲取鎖。若是某個線程對全局數據的操做是割裂的(分塊的),那就使用RLock。
一個錯亂的例子及鎖的使用:
import time, threading lock = threading.Lock() # rlock = threading.RLock() balance = [0] def test(n): for i in range(100000): # 理想的狀況是執行了+n,-n操做後才讓另外一個線程處理,結果永0 #lock.acquire() balance[0] = balance[0] + n # 某個線程可能處理到這裏就終止讓給另外一個線程處理了,循環一大,結果可能錯亂不爲0 balance[0] = balance[0] - n #lock.release() t1 = threading.Thread(target=test, args=(5,)) t2 = threading.Thread(target=test, args=(8.0,)) t1.start() t2.start() t1.join() t2.join() print(balance[0])
在不加鎖的狀況下多跑幾回,你會的到不一樣的結果。可是加了鎖以後,+n,-n兩個操做完整執行,不會中途中斷,結果永0。
使用 threading.Semaphore 類就行,Semaphore 在內部管理着一個計數器。調用 acquire() 會使這個計數器減1,release() 則是加1。計數器的值永遠不會小於 0。當計數器到 0 時,再調用 acquire() 就會阻塞,直到其餘線程來調用release(),這樣就限制了同時運行線程的數量。
使用上很是簡單,實例化Semaphore並指定線程數後,給函數的頭加個acquire(),尾加個release()就行。
import threading, time def test(x): semaphore.acquire() time.sleep(x) print(x) semaphore.release() semaphore = threading.Semaphore(4) # 最大4個線程同時進行 ts = [threading.Thread(target=test,args=(i,)) for i in [2,3,5,10,6]] [t.start() for t in ts] '輸出:2,3,5,6,10 (原理和上面多進程的那個差很少)'
關於threading的其餘高級用法本文並未說起,以上都是些經常使用的用法,若是有更高級的須要,能夠參考這文章
講了這麼多,都是模塊的用法,沒怎麼提到爬蟲。那麼最後大概的講下如何把多進程/多線程運用到爬蟲中,並給個代碼實例用做參考。
下面給個多進程/多線程結合的網易雲音樂評論下載器(下載某首音樂的多頁評論),包含加密算法,如不清楚可看以前的文章,咱們用多進程加速加密過程,用多線程加速爬取過程。
本代碼較長,長到高亮效果都沒有了,所以該長代碼分爲兩部分,前半部分是以前文章提到的加密方法,後半部分是本文的多進程多線程重點代碼:
import json, re, base64, random, requests, binascii, threading from Crypto.Cipher import AES#新的加密模塊只接受bytes數據,否者報錯,密匙明文什麼的要先轉碼 from concurrent.futures import ProcessPoolExecutor from math import ceil secret_key = b'0CoJUm6Qyw8W8jud'#第四參數,aes密匙 pub_key ="010001"#第二參數,rsa公匙組成 modulus = "00e0b509f6259df8642dbc35662901477df22677ec152b5ff68ace615bb7b725152b3ab17a876aea8a5aa76d2e417629ec4ee341f56135fccf695280104e0312ecbda92557c93870114af6c9d05c4f7f0c3685b7a46bee255932575cce10b424d813cfe4875d3e82047b97ddef52741d546b8e289dc6935b3ece0462db0a22b8e7"#第三參數,rsa公匙組成 headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.146 Safari/537.36'} def random_16(): return bytes(''.join(random.sample('1234567890DeepDarkFantasy',16)),'utf-8') #aes加密 def aes_encrypt(text,key): pad = 16 - len(text)%16#對長度不是16倍數的字符串進行補全,而後在轉爲bytes數據 try: #若是接到bytes數據(如第一次aes加密獲得的密文)要解碼再進行補全 text = text.decode() except: pass text = text + pad * chr(pad) try: text = text.encode() except: pass encryptor = AES.new(key,AES.MODE_CBC,b'0102030405060708') ciphertext = encryptor.encrypt(text) ciphertext = base64.b64encode(ciphertext)#獲得的密文還要進行base64編碼 return ciphertext #rsa加密 def rsa_encrypt(ran_16,pub_key,modulus): text = ran_16[::-1]#明文處理,反序並hex編碼 rsa = int(binascii.hexlify(text), 16) ** int(pub_key, 16) % int(modulus, 16) return format(rsa, 'x').zfill(256) #返回加密後內容 def encrypt_data(data): ran_16 = random_16() text = json.dumps(data) params = aes_encrypt(text,secret_key) params = aes_encrypt(params,ran_16) encSecKey = rsa_encrypt(ran_16,pub_key,modulus) return {'params':params.decode(), 'encSecKey':encSecKey }
class OnePageComment(threading.Thread): # 下載一頁評論的線程類 def __init__(self,post_url, enc_data): threading.Thread.__init__(self) self.post_url = post_url self.enc_data = enc_data self.comment = '' # 建立一個comment變量儲存爬到的數據 def run(self): semaphore.acquire() content = requests.post(self.post_url, headers = headers, data = self.enc_data ).json() if 'hotComments' in content: if content['hotComments']: self.comment += '*************精彩評論\n\n' self.common(content, 'hotComments') self.comment += '\n\n*************最新評論\n\n' self.common(content, 'comments') else: self.common(content, 'comments') semaphore.release() def common(self, content,c_type): for each in content[c_type]: if each ['beReplied']: if each['beReplied'][0]['content']: self.comment += each['content'] + '\n\t回覆:\n\t' + each['beReplied'][0]['content'] + '\n' + '-' * 60 + '\n' else: self.comment += each['content'] + '\n' + '-' * 60 + '\n' def get_comment(self): # 選擇返回評論而不是直接寫入文件,由於多個線程同時操做一個文件有風險,應先返回,後統一寫入 return self.comment def get_enc_datas(pages, max_workers=4): # 多進程加密 raw_datas = [] for i in range(pages): if i == 0: raw_datas.append({'rid':"", 'offset':'0', 'total':"true", 'limit':"20", 'csrf_token':""}) else: raw_datas.append({'rid':"", 'offset':str(i*20), 'total':"false", 'limit':"20", 'csrf_token':""}) with ProcessPoolExecutor(max_workers) as pool: # 多進程適合計算密集型任務,如加密 result = pool.map(encrypt_data,raw_datas) return list(result) def one_song_comment(id_): # 爬取一首歌的評論並寫入txt,網絡I/O密集使用多線程 post_url = 'http://music.163.com/weapi/v1/resource/comments/R_SO_4_' + str(id_) + '?csrf_token=' ts = [OnePageComment(post_url,i) for i in enc_datas] [i.start() for i in ts] [i.join() for i in ts] comments = [i.get_comment() for i in ts] with open(id_ + '.txt', 'w', encoding='utf-8') as f: f.writelines(comments) if __name__ == '__main__': semaphore = threading.Semaphore(4) # 最大線程4 enc_datas = get_enc_datas(10) # 獲取加密後的數據,對全部歌曲都是通用的,這裏有十頁的加密數據,對應爬十頁評論 one_song_comment('29498682')
效果提高驚人!!不信你跑一下上面的程序,而後和本身寫的單線程/單進程比較
cpu和網絡都跑到了峯值,網絡峯值在cpu峯值以後,由於是先多進程加密數據,後多線程爬取