那些年,我爬過的北科(四)——爬蟲進階之極簡併行爬蟲框架開發

寫在前面

在看過目錄以後,讀者可能會問爲何這個教程沒有講一個框架,好比說scrapy或者pyspider。在這裏,我認爲理解爬蟲的原理更加劇要,而不是學習一個框架。爬蟲說到底就是HTTP請求,與語言無關,與框架也無關。html

在本節,咱們將用26行代碼開發一個簡單的併發的(甚至分佈式的)爬蟲框架。python

爬蟲的模塊

首先,咱們先來講一下爬蟲的幾個模塊。git

任務產生器——Producer

定義任務,如:爬取什麼頁面?怎麼解析github

下載器——Downloader

下載器,接受任務產生器的任務,下載完成後給解析器進行解析。主要是I/O操做,受限於網速。sql

解析器——Parser

解析器,將下載器下載的內容進行解析,傳給輸出管道。主要是CPU操做,受限於下載器的下載速度。數據庫

輸出管道——Pipeline

如何展現爬取的數據,如以前咱們一直都在用print,其實也就是一個ConsolePipeline。固然你也能夠定義FilePipeline、MysqlPipeline、Sqlite3Pipeline等等。json

  • ConsolePipeline: 把想要的內容直接輸出到控制檯。
  • FilePipeline: 把想要的內容輸出到文件裏保存,好比保存一個json文件。
  • MongoDBPipeline: 把想要的內容存入MongoDB數據庫中。
  • 等等......

爬蟲框架的結構

上面的四個模塊也就構成了四個部分。安全

  • 1 . 首先,會有個初始的任務產生器產生下載任務。
  • 2 . 下載器不斷從任務隊列中取出任務,下載完任務後,放到網頁池中。
  • 3 . 不一樣的解析器取出網頁進行解析,傳給對應的輸出管道。期間,解析器也會產生新的下載任務,放入到任務隊列中。
  • 4 . 輸出管道對解析的結果進行存儲、顯示。

簡易的爬蟲框架的架構

其實,咱們也能夠把爬蟲不要分的那麼細,下載+解析+輸出其實均可以歸類爲一個Worker。架構

就像下面同樣,首先初始的任務產生器會產生一個下載任務,而後系統爲下載任務建立幾個Worker,Worker對任務進行下載解析輸出,同時根據解析的一些連接產生新下載的任務放入任務隊列。如此循環,直到沒有任務。 併發

進程間通訊

下面,咱們說一下進程間通訊。

這裏咱們舉一個生產者消費者的例子。假設有兩個進程,一個叫生產者,一個叫作消費者。生產者只負責生產一些任務,並把任務放到一個池子裏面(任務隊列),消費者從任務隊列中拿到任務,並對完成任務(把任務消費掉)。

咱們這裏的任務隊列使用multiprocessing的Queue,它能夠保證多進程間操做的安全。

from multiprocessing import Process, Queue
import time


def produce(q):  # 生產
    for i in range(10):
        print('Put %d to queue...' % value)
        q.put(i)
        time.sleep(1)


def consume(q):  # 消費
    while True:
        if not q.empty():
            value = q.get(True)
            print('Consumer 1, Get %s from queue.' % value)


if __name__ == '__main__':
    q = Queue()
    producer = Process(target=produce, args=(q,))
    consumer = Process(target=consume, args=(q,))
    producer.start()
    consumer.start()

    producer.join()  # 等待結束, 死循環使用Ctrl+C退出
    consumer.join()
複製代碼

固然,也能夠嘗試有多個生產者,多個消費者。下面建立了兩個生產者和消費者。

from multiprocessing import Process, Queue
import time


def produce(q):  # 生產
    for i in range(10000):
        if i % 2 == 0:
            print("Produce ", i)
            q.put(i)
            time.sleep(1)


def produce2(q):  # 生產
    for i in range(10000):
        if i % 2 == 1:
            print "Produce ", i
            q.put(i)
            time.sleep(1)


def consume(q):  # 消費
    while True:
        if not q.empty():
            value = q.get(True)
            print 'Consumer 1, Get %s from queue.' % value


def consume2(q):  # 消費
    while True:
        if not q.empty():
            value = q.get(True)
            print 'Consumer 2, Get %s from queue.' % value


if __name__ == '__main__':
    q = Queue(5)   # 隊列最多放5個任務, 超過5個則會阻塞住
    producer = Process(target=produce, args=(q,))
    producer2 = Process(target=produce2, args=(q,))
    consumer = Process(target=consume, args=(q,))
    consumer2 = Process(target=consume2, args=(q,))

    producer.start()
    producer2.start()
    consumer.start()
    consumer2.start()

    producer.join()  # 等待結束, 死循環使用Ctrl+C退出
    producer2.join()
    consumer.join()
    consumer2.join()
複製代碼

這裏生產者生產的時間是每秒鐘兩個,消費者消費時間幾乎能夠忽略不計,屬於「狼多肉少」系列。運行後,能夠看到控制檯每秒都輸出兩行。Consumer1和Consumer2的爭搶十分激烈。

考慮一下「肉多狼少」的情形,代碼以下:

from multiprocessing import Process, Queue
import time


def produce(q):  # 生產
    for i in range(10000):
        print("Produce ", i)
        q.put(i)


def consume(q):  # 消費
    while True:
        if not q.empty():
            value = q.get(True)
            print('Consumer 1, Get %s from queue.' % value)
            time.sleep(1)


def consume2(q):  # 消費
    while True:
        if not q.empty():
            value = q.get(True)
            print('Consumer 2, Get %s from queue.' % value)
            time.sleep(1)


if __name__ == '__main__':
    q = Queue(5)    # 隊列最多放5個數據, 超過5個則會阻塞住
    producer = Process(target=produce, args=(q,))
    consumer = Process(target=consume, args=(q,))
    consumer2 = Process(target=consume2, args=(q,))

    producer.start()
    consumer.start()
    consumer2.start()

    producer.join()  # 等待結束, 死循環使用Ctrl+C退出
    consumer.join()
    consumer2.join()
複製代碼

這裏生產者不停的生產,直到把任務隊列塞滿。而兩個消費者每秒鐘消費一個,每當有任務被消費掉,生產者又會立馬生產出新的任務,把任務隊列塞滿。

上面的說明,系統總體的運行速度其實受限於速度最慢的那個。像咱們爬蟲,最耗時的操做就是下載,總體的爬取速度也就受限於網速。

以上的生產和消費者相似爬蟲中的Producer和Worker。Producer扮演生產者,生成下載任務,放入任務隊列中;Worker扮演消費者,拿到下載任務後,對某個網頁進行下載、解析、數據;在此同時,Worker也會扮演生產者,根據解析到的連接生成新的下載任務,並放到任務隊列中交給其餘的Worker執行。

DIY併發框架

下面咱們來看看咱們本身的併發爬蟲框架,這個爬蟲框架的代碼很短,只有26行,除去空行的話只有21行代碼。

from multiprocessing import Manager, Pool


class SimpleCrawler:
    def __init__(self, c_num):
        self.task_queue = Manager().Queue()  # 任務隊列
        self.workers = {}                    # Worker, 字典類型, 存放不一樣的Worker
        self.c_num = c_num                   # 併發數,開幾個進程

    def add_task(self, task):
        self.task_queue.put(task)

    def add_worker(self, identifier, worker):
        self.workers[identifier] = worker

    def start(self):
        pool = Pool(self.c_num)
        while True:
            task = self.task_queue.get(True)
            if task['id'] == "NO":  # 結束爬蟲
                pool.close()
                pool.join()
                exit(0)
            else:  # 給worker完成任務
                worker = self.workers[task['id']]
                pool.apply_async(worker, args=(self.task_queue, task))
複製代碼

這個類中一共就有四個方法:構造方法、添加初始任務方法、設置worker方法、開始爬取方法。

__init__方法:

在構造方法中,咱們建立了一個任務隊列,(這裏注意使用了Manager.Queue(),由於後面咱們要用到進程池,因此要用Manager類),workers字典,以及併發數配置。

crawler = SimpleCrawler(5)  # 併發數爲5
複製代碼

add_task方法:

負責添加初始任務方法,task的形式爲一個字典。有id、url等字段。id負責分配給不一樣的worker。以下:

crawler.add_task({
    "id": "worker",
    "url": "http://nladuo.cn/scce_site/",
    "page": 1
})
複製代碼

add_worker方法:

負責配置worker,以id做爲鍵存放在workers變量中,其中worker能夠定義爲一個抽象類或者一個函數。這裏爲了簡單起見,咱們直接弄一個函數。

def worker(queue, task):
    url = task["url"]
    resp = requests.get(url)
    # ......,爬取解析網頁
    queue.put(new_task) # 可能還會添加新的task
    # ......

crawler.add_worker("worker", worker)
複製代碼

start方法:

start方法就是啓動爬蟲,這裏看上面的代碼,建立了一個進程池用來實現併發。而後不斷的從queue中取出任務,根據任務的id分配給對應id的worker。咱們這裏規定當id爲「NO」時,咱們則中止爬蟲。

crawler.start()
複製代碼

爬取兩級頁面

下面,咱們來使用這個簡單的爬蟲框架,來實現一個兩級頁面的爬蟲。

首先看第一級頁面:nladuo.cn/scce_site/。其實就是以前的新聞列表頁。咱們能夠爬到新聞的標題,以及該標題對應的網頁連接。

第二級頁面是:nladuo.cn/scce_site/a…,也就是新聞的詳情頁,這裏能夠獲取到新聞的內容以及點擊數目等。

下面咱們建立兩個worker,一個負責爬取列表頁面,一個負責爬取新聞詳情頁。

def worker(queue, task):
    """ 爬取新聞列表頁 """
    pass


def detail_worker(queue, task):
    """ 爬取新聞詳情頁 """
    pass
複製代碼

主代碼

對於main代碼,這裏首先須要建立一個crawler。而後添加兩個worker,id分別爲「worker」和「detail_worker」。而後添加一個初始的任務,也就是爬取新聞列表頁的首頁。

if __name__ == '__main__':
    crawler = SimpleCrawler(5)
    crawler.add_worker("worker", worker)
    crawler.add_worker("detail_worker", detail_worker)
    crawler.add_task({
        "id": "worker",
        "url": "http://nladuo.cn/scce_site/",
        "page": 1
    })
    crawler.start()
複製代碼

worker代碼編寫

接下來,完成咱們的worker代碼,worker接受兩個參數:queue和task。

  • queue: 用於解析網頁後,添加新的任務
  • task: 要完成的任務

而後worker①首先下載網頁,②其次解析網頁,③再根據解析的列表進一步須要爬取詳情頁,因此要添加爬取詳情頁的任務;④最後判斷當前是否是最後一頁,若是是就發送退出信號,不然添加下一頁的新聞列表爬取任務。

def worker(queue, task):
    """ 爬取新聞列表頁 """
    # 下載任務
    url = task["url"] + "%d.html" % task["page"]
    print("downloading:", url)
    resp = requests.get(url)

    # 解析網頁
    soup = BeautifulSoup(resp.content, "html.parser")
    items = soup.find_all("div", {"class", "list_title"})

    for index, item in enumerate(items):
        detail_url = "http://nladuo.cn/scce_site/" + item.a['href']
        print("adding:", detail_url)
        # 添加新任務: 爬取詳情頁
        queue.put({
            "id": "detail_worker",
            "url": detail_url,
            "page": task["page"],
            "index": index,
            "title": item.get_text().replace("\n", "")
        })

    if task["page"] == 10:  # 添加結束信號
        queue.put({"id": "NO"})
    else:
        # 添加新任務: 爬取下一頁
        queue.put({
            "id": "worker",
            "url": "http://nladuo.cn/scce_site/",
            "page": task["page"]+1
        })
複製代碼

detail_worker代碼編寫

detail_worker的任務比較簡單,只要下載任務,而後解析網頁並打印便可。這裏爲了讓屏幕輸出沒那麼亂,咱們只獲取點擊數。

def detail_worker(queue, task):
    """ 爬取新聞詳情頁 """
    # 下載任務
    print("downloading:", task['url'])
    resp = requests.get(task['url'])
    # 解析網頁
    soup = BeautifulSoup(resp.content, "html.parser")
    click_num = soup.find("div", {"class", "artNum"}).get_text()
    print(task["page"], task["index"], task['title'], click_num)
複製代碼

思考

到這裏,咱們就用咱們本身開發的框架實現了一個多級頁面的爬蟲。讀者能夠考慮一下如下的問題。

  • 如何實現爬蟲的自動結束?考慮監控隊列的狀況和worker的狀態。
  • 如何實現一個分佈式爬蟲?考慮使用分佈式隊列:celery
相關文章
相關標籤/搜索