本系列文章計劃分三個章節進行講述,分別是理論篇、基礎篇和實戰篇。理論篇主要爲構建分佈式爬蟲而儲備的理論知識,基礎篇會基於理論篇的知識寫一個簡易的分佈式爬蟲,實戰篇則會以微博爲例,教你們作一個比較完整且足夠健壯的分佈式微博爬蟲。經過這三篇文章,但願你們能掌握如何構建一個分佈式爬蟲的方法;能觸類旁通,將celery
用於除爬蟲外的其它場景。目前基本上的博客都是教你們使用scrapyd或者scrapy-redis構建分佈式爬蟲,本系列文章會從另一個角度講述如何用requests+celery構建一個健壯的、可伸縮而且可擴展的分佈式爬蟲。html
本系列文章屬於爬蟲進階文章,指望受衆是具備必定Python基礎知識和編程能力、有爬蟲經驗而且但願提高本身的同窗。小白要是感興趣,也能夠看看,看不懂的話,能夠等有了必定基礎和經驗後回過頭來再看。python
另一點說明,本系列文章不是旨在構建一個分佈式爬蟲框架或者分佈式任務調度框架,而是利用現有的分佈式任務調度工具來實現分佈式爬蟲,因此請輕噴。linux
何謂分佈式爬蟲?
通俗的講,分佈式爬蟲就是多臺機器多個 spider 對多個 url 的同時處理問題,分佈式的方式能夠極大提升程序的抓取效率。git
構建分佈式爬蟲通暢須要考慮的問題
(1)如何能保證多臺機器同時抓取同一個URL?github
(2)若是某個節點掛掉,會不會影響其它節點,任務如何繼續?
(3)既然是分佈式,如何保證架構的可伸縮性和可擴展性?不一樣優先級的抓取任務如何進行資源分配和調度?redis
基於上述問題,我選擇使用celery做爲分佈式任務調度工具,是分佈式爬蟲中任務和資源調度的核心模塊。它會把全部任務都經過消息隊列發送給各個分佈式節點進行執行,因此能夠很好的保證url不會被重複抓取;它在檢測到worker掛掉的狀況下,會嘗試向其餘的worker從新發送這個任務信息,這樣第二個問題也能夠獲得解決;celery自帶任務路由,咱們能夠根據實際狀況在不一樣的節點上運行不一樣的抓取任務(在實戰篇我會講到)。本文主要就是帶你們瞭解一下celery的方方面面(有celery相關經驗的同窗和大牛能夠直接跳過了)mongodb
按celery官網的介紹來講編程
Celery 是一個簡單、靈活且可靠的,處理大量消息的分佈式系統,而且提供維護這樣一個系統的必需工具。它是一個專一於實時處理的任務隊列,同時也支持任務調度。json
下面幾個關於celery的核心知識點windows
broker:翻譯過來叫作中間人。它是一個消息傳輸的中間件,能夠理解爲一個郵箱。每當應用程序調用celery的異步任務的時候,會向broker傳遞消息,然後celery的worker將會取到消息,執行相應程序。這其實就是消費者和生產者之間的橋樑。
backend: 一般程序發送的消息,發完就完了,可能都不知道對方時候接受了。爲此,celery實現了一個backend,用於存儲這些消息以及celery執行的一些消息和結果。
worker: Celery類的實例,做用就是執行各類任務。注意在celery3.1.25後windows是不支持celery worker的!
producer: 發送任務,將其傳遞給broker
beat: celery實現的定時任務。能夠將其理解爲一個producer,由於它也是經過網絡調用定時將任務發送給worker執行。注意在windows上celery是不支持定時任務的!
下面是關於celery的架構示意圖,結合上面文字的話應該會更好理解
因爲celery只是任務隊列,而不是真正意義上的消息隊列,它自身不具備存儲數據的功能,因此broker和backend須要經過第三方工具來存儲信息,celery官方推薦的是 RabbitMQ和Redis,另外mongodb等也能夠做爲broker或者backend,可能不會很穩定,咱們這裏選擇Redis做爲broker兼backend。
關於redis的安裝和配置能夠查看這裏
先安裝celery
pip install celery
咱們以官網給出的例子來作說明,並對其進行擴展。首先在項目根目錄下,這裏我新建一個項目叫作celerystudy
,而後切換到該項目目錄下,新建文件tasks.py
,而後在其中輸入下面代碼
from celery import Celery app = Celery('tasks', broker='redis://:''@223.129.0.190:6379/2', backend='redis://:''@223.129.0.190:6379/3') @app.task def add(x, y): return x + y
這裏我詳細講一下代碼:咱們先經過app=Celery()
來實例化一個celery對象,在這個過程當中,咱們指定了它的broker,是redis的db 2,也指定了它的backend,是redis的db3, broker和backend的鏈接形式大概是這樣
redis://:password@hostname:port/db_number
而後定義了一個add
函數,重點是@app.task
,它的做用在我看來就是**將add()
註冊爲一個相似服務的東西,原本只能經過本地調用的函數被它裝飾後,就能夠經過網絡來調用。這個tasks.py
中的app就是一個worker。它能夠有不少任務,好比這裏的任務函數add
。咱們再經過在命令行切換到項目根目錄**,執行
celery -A tasks worker -l info
啓動成功後就是下圖所示的樣子
這裏我說一下各個參數的意思,-A
指定的是app(即Celery實例)所在的文件模塊,咱們的app是放在tasks.py
中,因此這裏是 tasks
;worker表示當前以worke
r的方式運行,難道還有別的方式?對的,好比運行定時任務就不用指定worker
這個關鍵字; -l info
表示該worker節點的日誌等級是info
,更多關於啓動worker的參數(好比-c
、-Q
等經常使用的)請使用
celery worker --help
進行查看
將worker啓動起來後,咱們就能夠經過網絡來調用add
函數了。咱們在後面的分佈式爬蟲構建中也是採用這種方式分發和消費url的。在命令行先切換到項目根目錄,而後打開python交互端
from tasks import add
rs = add.delay(2, 2) # 這裏的add.delay就是經過網絡調用將任務發送給add
所在的worker執行
這個時候咱們能夠在worker的界面看到接收的任務和計算的結果。
[2017-05-19 14:22:43,038: INFO/MainProcess] Received task: tasks.add[c0dfcd0b-d05f-4285-b944-0a8aba3e7e61] # worker接收的任務
[2017-05-19 14:22:43,065: INFO/MainProcess] Task tasks.add[c0dfcd0b-d05f-4285-b944-0a8aba3e7e61] succeeded in 0.025274309000451467s: 4 # 執行結果
這裏是異步調用,若是咱們須要返回的結果,那麼要等rs
的ready
狀態true
才行。這裏add
看不出效果,不過試想一下,若是咱們是調用的比較佔時間的io任務,那麼異步任務就比較有價值了
rs # <AsyncResult: c0dfcd0b-d05f-4285-b944-0a8aba3e7e61>
rs.ready() # true 表示已經返回結果了
rs.status # 'SUCCESS' 任務執行狀態,失敗仍是成功
rs.successful() # True 表示執行成功
rs.result # 4 返回的結果
rs.get() # 4 返回的結果
<celery.backends.redis.RedisBackend object at 0x30033ec> #這裏咱們backend 結果存儲在redis裏
上面講的是從Python交互終端中調用add
函數,若是咱們要從另一個py文件調用呢?除了經過import
而後add.delay()
這種方式,咱們還能夠經過send_task()
這種方式,咱們在項目根目錄另外新建一個py文件叫作 excute_tasks.py
,在其中寫下以下的代碼
from tasks import add if __name__ == '__main__': add.delay(5, 10)
這時候能夠在celery的worker界面看到執行的結果
[2017-05-19 14:25:48,039: INFO/MainProcess] Received task: tasks.add[f5ed0d5e-a337-45a2-a6b3-38a58efd9760] [2017-05-19 14:25:48,074: INFO/MainProcess] Task tasks.add[f5ed0d5e-a337-45a2-a6b3-38a58efd9760] succeeded in 0.03369094600020617s: 15
此外,咱們還能夠經過send_task()
來調用,將excute_tasks.py
改爲這樣
from tasks import app if __name__ == '__main__': app.send_task('tasks.add', args=(10, 15),)
這種方式也是能夠的。send_task()
還可能接收到爲註冊(即經過@app.task
裝飾)的任務,這個時候worker會忽略這個消息
[2017-05-19 14:34:15,352: ERROR/MainProcess] Received unregistered task of type 'tasks.adds'.
The message has been ignored and discarded.
上面部分講了怎麼啓動worker和調用worker的相關函數,這裏再講一下celery的定時任務。
爬蟲因爲其特殊性,可能須要定時作增量抓取,也可能須要定時作模擬登錄,以防止cookie過時,而celery偏偏就實現了定時任務的功能。在上述基礎上,咱們將tasks.py
文件改爲以下內容
from celery import Celery app = Celery('add_tasks', broker='redis:''//223.129.0.190:6379/2', backend='redis:''//223.129.0.190:6379/3') app.conf.update( # 配置所在時區 CELERY_TIMEZONE='Asia/Shanghai', CELERY_ENABLE_UTC=True, # 官網推薦消息序列化方式爲json CELERY_ACCEPT_CONTENT=['json'], CELERY_TASK_SERIALIZER='json', CELERY_RESULT_SERIALIZER='json', # 配置定時任務 CELERYBEAT_SCHEDULE={ 'my_task': { 'task': 'tasks.add', # tasks.py模塊下的add方法 'schedule': 60, # 每隔60運行一次 'args': (23, 12), } } ) @app.task def add(x, y): return x + y
而後先經過ctrl+c
停掉前一個worker,由於咱們代碼改了,須要重啓worker纔會生效。咱們再次以celery -A tasks worker -l info
這個命令開啓worker。
這個時候咱們只是開啓了worker,若是要讓worker執行任務,那麼還須要經過beat給它定時發送,咱們再開一個命令行,切換到項目根目錄,經過
celery beat -A tasks -l info
celery beat v3.1.25 (Cipater) is starting. __ - ... __ - _ Configuration -> . broker -> redis://223.129.0.190:6379/2 . loader -> celery.loaders.app.AppLoader . scheduler -> celery.beat.PersistentScheduler . db -> celerybeat-schedule . logfile -> [stderr]@%INFO . maxinterval -> now (0s) [2017-05-19 15:56:57,125: INFO/MainProcess] beat: Starting...
這樣就表示定時任務已經開始運行了。
眼尖的同窗可能看到我這裏celery的版本是3.1.25
,這是由於celery支持的windows
最高版本是3.1.25。因爲個人分佈式微博爬蟲的worker也同時部署在了windows上,因此我選擇了使用 3.1.25
。若是全是linux系統,建議使用celery4。
此外,還有一點須要注意,在celery4後,定時任務(經過schedule調度的會這樣,經過crontab調度的會立刻執行)會在當前時間再過定時間隔執行第一次任務,好比我這裏設置的是60秒的間隔,那麼第一次執行add
會在咱們經過celery beat -A tasks -l info
啓動定時任務後60秒才執行;celery3.1.25則會立刻執行該任務。
關於定時任務更詳細的請看官方文檔celery定時任務
至此,咱們把構建一個分佈式爬蟲的理論知識都講了一遍,主要就是對於celery
的瞭解和使用,這裏並未涉及到celery的一些高級特性,實戰篇可能會講解一些我本身使用的特性。
下一篇我將介紹如何使用celery寫一個簡單的分佈式爬蟲,但願你們能有所收穫。
若是本文對你們有幫助,不妨點個贊鼓勵一下我,繼續創做。
此外,打一個廣告,我寫了一個分佈式的微博爬蟲,主要就是利用celery作的分佈式實戰篇我也將會以該項目其中一個模塊進行講解,有興趣的能夠點擊看看,也歡迎有需求的朋友試用。