【暫時Over】Python 從零開始爬蟲(十)給爬蟲加速:多線程,多進程

事前最好了解一下什麼是進程,什麼是線程,什麼是GIL,本文再也不贅述,直接介紹模塊的使用:html


普通的python爬蟲是單進程單線程的,這樣在遇到大量重複的操做時就只能逐個進行,咱們就很難過了。舉個栗子:你有1000個美圖的連接,逐個餵給下載器(函數),看着圖片只能一個個蹦出來,你不心急嗎?因而咱們想,能不能同時跑多個下載器,實現多圖同時下載?——答案是能夠的,使用多進程/多線程,把每一個帶着不一樣參數下載器分給每一個進程/線程就,而後同時跑多個進程/線程就好了。python

本文就介紹如何用多線程和多進程給爬蟲加速算法


補充主線程與子線程(進程同理):json

  • 一個py程序就有一個主線程,主線程負責整個py程序的代碼,當主線程處理到啓用多線程的代碼時,就會建立若干個子線程,這裏就有選擇了,主線程是等待子線程的結束再繼續處理仍是直接繼續處理讓子線程在外頭跑

多進程

Python標準庫本來有threading和multiprocessing模塊編寫相應的多線程/多進程代碼。但從Python3.2開始,標準庫爲咱們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現了對threading和multiprocessing的更高級的抽象,對編寫線程池/進程池提供了直接的支持。多進程咱們介紹futures的ProcessPoolExecutor
注:python 2.7 請安裝future模塊,pip install futuresegmentfault


ProcessPoolExecutor類是Executor類的子類,實例化ProcessPoolExecutor類以建立進程池,在實例化的過程當中應指定同時運行的最大進程數api

from concurrent.futures import  ProcessPoolExecutor
pool = ProcessPoolExecutor(max_workers=4) # 運行最大進程數4
#進程池的操做...
pool.shutdown(wait=True) # 關閉進程池,默認等待全部進程的完成。
print('Deep') # 有shutdown的狀況下全部進程完成後纔會運行下面的print,沒有的話會立刻運行

'建立進程也可用with,這時會自帶shutdown功能
with ProcessPoolExecutor(4) as pool:
    #進程池的操做...
'

該類有兩種方法對進程池提交任務創建進程(函數及一組參數構成一個任務),分別是submit()map(),若是單純想多開進程別無他想,用哪一個都行,但submit()會有更靈活的用法網絡

map(fn,*iterables)

  • fn:函數
  • *iterables:函數每一個參數的集合,N個參數就接N個集合

能夠理解這是python自帶map()的多進程版,他返回的是一個迭代器,包含每一個任務對應的返回值(有序的),下面用例子來分析多線程

from concurrent.futures import  ProcessPoolExecutor
import time


def test(x):
    time.sleep(x) # 時間阻塞
    print(str(x)+'s')
    return x

if __name__ == '__main__':
    with ProcessPoolExecutor(4) as pool:
        p = pool.map(test,[2,3,10,5,6])
        for i in p:
            print(i)

輸出併發

2s
2
3s
3
5s
6s
10s
10
5
6

分析(下面以參數代替某個進程):app

  • 帶s的是函數輸出的,進程池最大容許4個進程同時運行,因此參數 2,3,10,5 首先一塊兒進去。2最快完成,立刻讓給6進去,2+6<10 ,因此後進6完成得比10快,最後輸出順序就是 2s,3s,5s,6s,10s
  • 不帶s的是for循環打印迭代器中的結果,由輸出可見,i的值分配是會等待進程完成返回值的,等2的完成返回2,等3的完成返回3,等10的完成返回10,因爲10完成前5和6早就完成了,因此返回10後緊接着返回5和6,最後輸出順序爲2,3,10,5,6,是有序的,對應各任務的返回值

在爬蟲中,上面代碼中的時間阻塞會對應着網絡I/O阻塞,任務中每每包含着網絡請求。好比你有不少個圖片連接,就寫一個下載圖片的函數(接收一個圖片連接的參數),把函數和圖片連接的集合餵給map()就實現多進程了加速了。

submit(fn, *arg)

  • fn:函數
  • *arg:函數的參數

該方法是往進程池中提交可回調的任務,並返回一個future實例。提交多個任務可用循環實現,返回的future實例用列表存起來,每一個future表明一個進程。關於future對象有許多方法:

  • future.running():判斷某個future(進程)是否運行中
  • future.done():判斷某個future(進程)是否正常結束
  • future.cancel():終止某個future(進程),終止失敗返回False,成功返回True
  • future.result():獲取future對應任務返回的結果。若是future還沒完成就會去等待
  • future.add_done_callback(fn):接收函數fn,將fn綁定到future對象上。當future對象被終止或完成時,fn將會被調用並接受該future對象
  • as_completed(fs):接收futures列表,futures列表中一旦有某個future(進程)完成就將該future對象yield回來,是個迭代器
from concurrent.futures import ProcessPoolExecutor,as_completed
import time

def test(x):
    time.sleep(x)
    print(str(x)+'s')
    return x

if __name__ == '__main__':
    with ProcessPoolExecutor(4) as pool:
        futures = [pool.submit(test,i) for i in [2,3,10,5,6]]

        '''for j in futures:
            print(j.result()) # 對應接收參數有序輸出,輸出2,3,10,5,6
        '''
        for j in as_completed(futures):
            print(j.result()) # 對應進程完成順序輸出,輸出2,3,5,6,10

多線程

建議當心使用,雖然多線程能實現高併發,但因爲線程資源共享的特性,某個線程操做這些共享的資源時可能操到一半就中止讓給另外一個線程操做,致使錯亂的發生。爲避免此狀況發生對某些操做須要加鎖,因此這裏介紹對鎖有支持的threading模塊,python自帶直接導入。
若是你確信這些操做不會發生錯亂,能夠直接使用concurrent.future 的 ThreadPoolExecutor,方法什麼的和ProcessPoolExecutor的同樣

線程

建立線程有兩種方法:

  1. 實例化 threading.Thread 類,target接收函數,arg以可迭代形式接收參數。這種方法最簡單

    import threading
         import time
         
         def test(x):
            time.sleep(x)
            print(str(x)+'s')
            return x
            
         t1 = threading.Thread(target=test, args=(1,)) # 建立線程
         t2 = threading.Thread(target=test, args=(3,))
         t1.start() # 啓動線程
         t2.start()
  2. 繼承threading.Thread 類,重寫run方法,把函數及參數接收寫進本身寫的多線程類中。這種方法更靈活,threading.Thread 類並無供獲取線程調用函數返回值的方法,若是須要函數返回值就須要繼承該類本身實現

    import threading
    import time
    
    class TestThread(threading.Thread):
        def __init__(self,x):
            threading.Thread.__init__(self)
            self.x = x  # 參數接收
    
        def run(self):
            time.sleep(self.x)  # 原來的函數寫到run中
            print(str(self.x)+'s')
    
        def result(self):  # 實現獲取調用函數的返回值的方法
            return self.x
    
    t1 = TestThread(1)  #建立線程
    t2 = TestThread(3)
    t1.start()  # 啓動線程
    t2.start()
    t1.join()  # 等待線程結束
    t2.join()
    print(t1.result(),t2.result())

線程相關方法和屬性:

  • Thread.start():啓動線程
  • Thread.join():等待線程的結束,沒有join的話會接着運行join下面的代碼
  • Thread.is_alive():判斷線程是否在運行,線程未開啓/結束時返回 False
  • Thread.name:返回線程的名字,默認線程名是Thread-N,N指第N個開啓的線程
  • Thread.setName(str):給線程命名
  • Thread.setDaemon(True/False):設置子線程是否會隨主線程結束而結束,本來全部子線程默認是不會隨主線程結束而結束的

線程間資源共享,若是多個線程共同對某個數據修改,可能會出現錯誤,爲了保證數據的正確性,須要對多個線程進行同步。這時就須要引入鎖了(利用GIL),鎖只有一個,一個線程在持有鎖的狀態下對某些數據進行操做,其餘線程就沒法對該數據進行操做,直至該線程釋放鎖讓其餘線程搶,誰搶到誰就有權修改。

threading提供Lock和RLock兩類鎖,前者一個線程只能獲取獲取一次鎖,後者容許一個線程能重複獲取鎖。若是某個線程對全局數據的操做是割裂的(分塊的),那就使用RLock。

  • acquire():獲取鎖
  • release():釋放鎖
  • 有數據操做放在acquire 和 release 之間,就不會出現多個線程修改同一個數據的風險了
  • acquire 和 release 必須成對存在,若是一個線程只拿不放,其餘線程沒有鎖能搶就只能永遠阻塞(中止)

一個錯亂的例子及鎖的使用:

import time, threading

lock = threading.Lock() # rlock = threading.RLock()
balance = [0]

def test(n):
    for i in range(100000): # 理想的狀況是執行了+n,-n操做後才讓另外一個線程處理,結果永0
        #lock.acquire()
        balance[0] = balance[0] + n  # 某個線程可能處理到這裏就終止讓給另外一個線程處理了,循環一大,結果可能錯亂不爲0
        balance[0] = balance[0] - n
        #lock.release()
t1 = threading.Thread(target=test, args=(5,))
t2 = threading.Thread(target=test, args=(8.0,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance[0])

在不加鎖的狀況下多跑幾回,你會的到不一樣的結果。可是加了鎖以後,+n,-n兩個操做完整執行,不會中途中斷,結果永0。

限制同時運行線程數

使用 threading.Semaphore 類就行,Semaphore 在內部管理着一個計數器。調用 acquire() 會使這個計數器減1,release() 則是加1。計數器的值永遠不會小於 0。當計數器到 0 時,再調用 acquire() 就會阻塞,直到其餘線程來調用release(),這樣就限制了同時運行線程的數量。

使用上很是簡單,實例化Semaphore並指定線程數後,給函數的頭加個acquire(),尾加個release()就行。

import threading, time

def test(x):
    semaphore.acquire()
    time.sleep(x)
    print(x)
    semaphore.release()
    
semaphore = threading.Semaphore(4) # 最大4個線程同時進行
ts = [threading.Thread(target=test,args=(i,)) for i in [2,3,5,10,6]]
[t.start() for t in ts]

'輸出:2,3,5,6,10 
(原理和上面多進程的那個差很少)'

關於threading的其餘高級用法本文並未說起,以上都是些經常使用的用法,若是有更高級的須要,能夠參考這文章

應用在爬蟲上

講了這麼多,都是模塊的用法,沒怎麼提到爬蟲。那麼最後大概的講下如何把多進程/多線程運用到爬蟲中,並給個代碼實例用做參考。

  • 若是爬蟲須要重複進行某個操做(以下載一張圖片,爬取一張網頁的源碼,破解一次加密【加密耗cpu最好多進程】),那把這個操做抽象成一個接收相應參數的函數,把函數餵給進程/線程便可。
  • 沒了,大概就這麼用Ծ‸ Ծ

下面給個多進程/多線程結合的網易雲音樂評論下載器(下載某首音樂的多頁評論),包含加密算法,如不清楚可看以前的文章,咱們用多進程加速加密過程,用多線程加速爬取過程。
本代碼較長,長到高亮效果都沒有了,所以該長代碼分爲兩部分,前半部分是以前文章提到的加密方法,後半部分是本文的多進程多線程重點代碼:

import json, re, base64, random, requests, binascii, threading
from Crypto.Cipher import AES#新的加密模塊只接受bytes數據,否者報錯,密匙明文什麼的要先轉碼
from concurrent.futures import ProcessPoolExecutor
from math import ceil

secret_key = b'0CoJUm6Qyw8W8jud'#第四參數,aes密匙
pub_key ="010001"#第二參數,rsa公匙組成
modulus = "00e0b509f6259df8642dbc35662901477df22677ec152b5ff68ace615bb7b725152b3ab17a876aea8a5aa76d2e417629ec4ee341f56135fccf695280104e0312ecbda92557c93870114af6c9d05c4f7f0c3685b7a46bee255932575cce10b424d813cfe4875d3e82047b97ddef52741d546b8e289dc6935b3ece0462db0a22b8e7"#第三參數,rsa公匙組成
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.146 Safari/537.36'}

def random_16():
    return bytes(''.join(random.sample('1234567890DeepDarkFantasy',16)),'utf-8')

#aes加密
def aes_encrypt(text,key):
    pad = 16 - len(text)%16#對長度不是16倍數的字符串進行補全,而後在轉爲bytes數據
    try:                    #若是接到bytes數據(如第一次aes加密獲得的密文)要解碼再進行補全
        text = text.decode()
    except:
        pass
    text = text + pad * chr(pad)
    try:
        text = text.encode()
    except:
        pass
    encryptor = AES.new(key,AES.MODE_CBC,b'0102030405060708')
    ciphertext = encryptor.encrypt(text)
    ciphertext = base64.b64encode(ciphertext)#獲得的密文還要進行base64編碼
    return ciphertext

#rsa加密
def rsa_encrypt(ran_16,pub_key,modulus):
    text = ran_16[::-1]#明文處理,反序並hex編碼
    rsa = int(binascii.hexlify(text), 16) ** int(pub_key, 16) % int(modulus, 16)
    return format(rsa, 'x').zfill(256)

#返回加密後內容
def encrypt_data(data):
    ran_16 = random_16()
    text = json.dumps(data)
    params = aes_encrypt(text,secret_key)
    params = aes_encrypt(params,ran_16)
    encSecKey = rsa_encrypt(ran_16,pub_key,modulus)
    return  {'params':params.decode(),
             'encSecKey':encSecKey  }
class OnePageComment(threading.Thread):  # 下載一頁評論的線程類
    def __init__(self,post_url, enc_data):
        threading.Thread.__init__(self)
        self.post_url = post_url
        self.enc_data = enc_data
        self.comment = '' # 建立一個comment變量儲存爬到的數據

    def run(self):
        semaphore.acquire()
        content = requests.post(self.post_url, headers = headers, data = self.enc_data ).json()
        if 'hotComments' in content:
            if content['hotComments']:
                self.comment +=  '*************精彩評論\n\n'
                self.common(content, 'hotComments')

            self.comment += '\n\n*************最新評論\n\n'
            self.common(content, 'comments')
        else:
            self.common(content, 'comments')
        semaphore.release()

    def common(self, content,c_type):
        for each in content[c_type]:
            if each ['beReplied']:
                if each['beReplied'][0]['content']:
                    self.comment += each['content'] + '\n\t回覆:\n\t' + each['beReplied'][0]['content'] + '\n' + '-' * 60 + '\n'
            else:
                self.comment += each['content'] + '\n' + '-' * 60 + '\n'

    def get_comment(self):  # 選擇返回評論而不是直接寫入文件,由於多個線程同時操做一個文件有風險,應先返回,後統一寫入
        return self.comment


def get_enc_datas(pages, max_workers=4): # 多進程加密
    raw_datas = []
    for i in range(pages):
        if i == 0:
            raw_datas.append({'rid':"", 'offset':'0', 'total':"true", 'limit':"20", 'csrf_token':""})
        else:
            raw_datas.append({'rid':"", 'offset':str(i*20), 'total':"false", 'limit':"20", 'csrf_token':""})
    with ProcessPoolExecutor(max_workers) as pool:  # 多進程適合計算密集型任務,如加密
        result = pool.map(encrypt_data,raw_datas)
    return list(result)

def one_song_comment(id_): # 爬取一首歌的評論並寫入txt,網絡I/O密集使用多線程
    post_url = 'http://music.163.com/weapi/v1/resource/comments/R_SO_4_' + str(id_) + '?csrf_token='
    ts = [OnePageComment(post_url,i) for i in enc_datas]
    [i.start() for i in ts]
    [i.join() for i in ts]
    comments = [i.get_comment() for i in ts]
    with open(id_ + '.txt', 'w', encoding='utf-8') as f:
        f.writelines(comments)


if __name__ == '__main__':
    semaphore = threading.Semaphore(4) # 最大線程4
    enc_datas = get_enc_datas(10)  # 獲取加密後的數據,對全部歌曲都是通用的,這裏有十頁的加密數據,對應爬十頁評論
    one_song_comment('29498682')

效果提高驚人!!不信你跑一下上面的程序,而後和本身寫的單線程/單進程比較

clipboard.pngcpu和網絡都跑到了峯值,網絡峯值在cpu峯值以後,由於是先多進程加密數據,後多線程爬取

相關文章
相關標籤/搜索