攝影:產品經理
與產品經理環遊世界的瞬間
這篇文章不會涉及到Kafka 的具體操做,而是告訴你 Kafka 是什麼,以及它能在爬蟲開發中扮演什麼重要角色。數據庫
一個簡單的需求後端
假設咱們須要寫一個微博爬蟲,老闆給的需求以下:
服務器
開發爬蟲對你來講很是簡單,因而三下五除二你就把爬蟲開發好了:
架構
接下來開始作報警功能,邏輯也很是簡單:
併發
再來看看統計關鍵詞的功能,這個功能背後有一個網頁,會實時顯示抓取數據量的變化狀況,能夠顯示每分鐘、每小時的某個關鍵詞的抓取量。ide
這個功能對你來講也挺簡單,因而你實現了以下邏輯:性能
最後一個需求,對微博數據進行情感分析。情感分析的模塊由別的部門同事開發,你要作的就是每一個小時拉取一批數據,發送到接口,獲取返回,而後存入後端須要的數據庫:學習
任務完成,因而你高興地回家睡覺了。網站
困難接踵而至線程
爬蟲變慢了
隨着老闆逐漸增長新的關鍵詞,你發現每一次完整抓取的時間愈來愈長,一開始是2分鐘抓取一輪,後來變成10分鐘一輪,而後變成30分鐘一輪,接下來變成1小時才能抓取一輪。隨着延遲愈來愈高,你的報警愈來愈不許確,微博都發出來一小時了,你的報警尚未發出來,由於那一條微博尚未來得及入庫。
你的爬蟲技術很是好,能繞過全部反爬蟲機制,你有無限個代理 IP,因而你輕輕鬆鬆就把爬蟲提升到了每秒一百萬併發。如今只須要1分鐘你就能完成所有數據的抓取。這下沒問題了吧。
但是報警仍是沒有發出來。這是怎麼回事?
數據庫撐不住了
通過排查,你發現了問題。數據抓取量上來了,可是 MongoDB 卻沒法同時接收那麼多的數據寫入。數據寫入速度遠遠小於爬取數據,大量的數據堆積在內存中。因而你的服務器爆炸了。
你緊急搭建了100個數據庫並編號0-99,對於抓取到的微博,先把每一條微博的 ID對100求餘數,而後把數據存入餘數對應的 MongoDB 中。每一臺 MongoDB 的壓力降低到了原來的1%。數據終於能夠即時存進數據庫裏面了。
但是報警仍是沒有發出來,不只如此,如今實時抓取量統計功能也不能用了,還有什麼問題?
查詢來不及了
如今報警程序要遍歷100個數據庫最近5分鐘裏面的每一條數據,確認是否有須要報警的內容。可是這個遍歷過程就遠遠超過5分鐘。
時間錯開了
因爲微博的綜合搜索功能不是按照時間排序的,那麼就會出現這樣一種狀況,早上10:01發的微博,你在12:02的時候才抓到。
不論你是在報警的時候篩選數據,仍是篩選數據推送給 NLP 分析接口,若是你是以微博的發佈時間來搜索,那麼這一條都會被你直接漏掉——當你在10:05的時候檢索10:00-10:05這5分鐘發表的微博,因爲這一條微博沒有抓到,你天然搜索不到。
當你12:05開始檢索12:00-12:05的數據時,你搜索的是發佈時間爲12:00-12:05的數據,因而10:01這條數據雖然是在12:02抓到的,但你也沒法篩選出來。
那麼是否是能夠用抓取時間來搜索呢?例如10:05開始檢索在10:00-10:05抓取到的數據,不管它的發佈時間是多少,都檢索出來。
這樣作確實能夠保證不漏掉數據,但這樣作的代價是你必須保存、檢索很是很是多的數據。例如每次抓取,只要發佈時間是最近10小時的,都要保存下來。因而報警程序在檢索數據時,就須要檢索這5分鐘入庫的,實際上發佈時間在10小時內的所有數據。
什麼,你說每次保存以前檢查一下這條微博是否已經存在,若是存在就不保存?別忘了批量寫入時間都不夠了,你還準備分一些時間去查詢?
髒數據來了
老闆忽然來跟你說,關鍵詞「籃球」裏面有大量的關於 蔡徐坤的內容,因此要你把全部包含蔡徐坤的數據所有刪掉。
那麼,這個過濾邏輯放在哪裏?放在爬蟲的 pipelines.py 裏面嗎?那你要從新部署全部爬蟲。今天是過濾蔡徐坤,明天是過濾範層層,後天是過濾王一博,天天增長關鍵詞,你天天都得從新部署爬蟲?
那你把關鍵詞放在 Redis 或者 MongoDB 裏面,每次插入數據前,讀取全部關鍵詞,看微博裏面不包含再存。
仍是那個問題,插入時間原本就不夠了,你還要查數據庫?
好,關鍵詞過濾不放在爬蟲裏面了。你寫了一個腳本,每分鐘檢查一次MongoDB新增的數據,若是發現包含 不須要的關鍵詞,就把他刪除。
如今問題來了,刪除數據的程序每分鐘檢查一次,報警程序每5分鐘檢查一次。中間一定存在某些數據,尚未來得及刪除,報警程序就報警了,老闆收到報警來看數據,而你的刪除程序又在這時把這個髒數據刪了。
這下好了,每天報假警,狼來了的故事重演了。
5個問題1個救星
若是你在爬蟲開發的過程當中遇到過上面的諸多問題,那麼,你就應該試一試使用 Kafka。一次性解決上面的全部問題。
把 Kafka 加入到你的爬蟲流程中,那麼你的爬蟲架構變成了下面這樣:
這看起來彷佛和數據直接寫進 MongoDB 裏面,而後各個程序讀取 MongoDB 沒什麼區別啊?那 Kafka 能解決什麼問題?
咱們來看看,在這個爬蟲架構裏面,咱們將會用到的 Kafka 的特性:
與其說 Kafka 在這個爬蟲架構中像 MongoDB,不如說更像 Redis 的列表。
如今來簡化一下咱們的模型,若是如今爬蟲只有一個需求,就是搜索,而後報警。那麼咱們能夠這樣設計:
爬蟲爬下來的數據,直接塞進 Redis 的列表右側。報警程序從 Redis 列表左側一條一條讀取。讀取一條檢視一條,若是包含報警關鍵詞,就報警。而後讀取下一條。
這樣作有什麼好處?
由於報警程序直接從 Redis 裏面一條一條讀取,不存在按時間搜索數據的過程,因此不會有數據延遲的問題。因爲 Redis 是單線程數據庫,因此能夠同時啓動不少個報警程序。因爲 lpop 讀取一條就刪除一條,若是報警程序由於某種緣由崩潰了,再把它啓動起來便可,它會接着工做,不會重複報警。
但使用 Redis 列表的優點也是劣勢:列表中的信息只能消費1次,被彈出了就沒有了。
因此若是既須要報警,還須要把數據存入 MongoDB 備份,那麼只有一個辦法,就是報警程序檢查完數據之後,把數據存入 MongoDB。
可我只是一個哨兵,爲何要讓我作後勤兵的工做?」
一個報警程序,讓它作報警的事情就行了,它不該該作儲存數據的事情。
而使用 Kafka,它有 Redis 列表的這些好處,但又沒有 Redis 列表的弊端!
咱們徹底能夠分別實現4個程序,不一樣程序之間消費數據的快慢互不影響。但同一個程序,不管是關閉再打開,仍是同時運行屢次,都不會重複消費。
程序1:報警
從 Kafka 中一條一條讀取數據,作報警相關的工做。程序1能夠同時啓動多個。關了再從新打開也不會重複消費。
程序2:儲存原始數據
這個程序從 Kafka 中一條一條讀取數據,每湊夠1000條就批量寫入到 MongoDB 中。這個程序不要求實時儲存數據,有延遲也不要緊。存入MongoDB中也只是原始數據存檔。通常狀況下不會再從 MongoDB 裏面讀取出來。
程序3:統計
從 Kafka 中讀取數據,記錄關鍵詞、發佈時間。按小時和分鐘分別對每一個關鍵詞的微博計數。最後把計數結果保存下來。
程序4:情感分析
從 Kafka 中讀取每一條數據,湊夠一批發送給 NLP 分析接口。拿到結果存入後端數據庫中。
若是要清洗數據怎麼辦
4個需求都解決了,那麼若是仍是須要你首先移除髒數據,再分析怎麼辦呢?實際上很是簡單,你加一個 Kafka(Topic) 就行了!
大批量通用爬蟲
除了上面的微博例子之外,咱們再來看看在開發通用爬蟲的時候,如何應用 Kafka。
在任什麼時候候,不管是 XPath 提取數據仍是解析網站返回的 JSON,都不是爬蟲開發的主要工做。爬蟲開發的主要工做一直是爬蟲的調度和反爬蟲的開發。
咱們如今寫 Scrapy 的時候,處理反爬蟲的邏輯和提取數據的邏輯都是寫在一個爬蟲項目中的,那麼在開發的時候實際上很難實現多人協做。
如今咱們把網站內容的爬蟲和數據提取分開,實現下面這樣一個爬蟲架構:
爬蟲開發技術好的同窗,負責實現繞過反爬蟲,獲取網站的內容,不管是 HTML 源代碼仍是接口返回的JSON。拿到之後,直接塞進 Kafka。
爬蟲技術相對通常的同窗、實習生,須要作的只是從 Kafka 裏面獲取數據,不須要關心這個數據是來自於 Scrapy 仍是 Selenium。他們要作的只是把這些HTML 或者JSON 按照產品要求解析成格式化的數據,而後塞進 Kafka,供後續數據分析的同窗繼續讀取並使用。
如此一來,一個數據小組的工做就分開了,每一個人作各自負責的事情,約定好格式,同步開發,互不影響。
爲何是 Kafka 而不是其餘
上面描述的功能,實際上有很多 MQ 都能實現。但爲何是 Kafka 而不是其餘呢?由於Kafka 集羣的性能很是高,在垃圾電腦上搭建的集羣能抗住每秒10萬併發的數據寫入量。而若是選擇性能好一些的服務器,每秒100萬的數據寫入也能輕鬆應對。
總結
這篇文章經過兩個例子介紹了 Kafka 在爬蟲開發中的做用。做爲一個爬蟲工程師,做爲個人讀者。請必定要掌握 Kafka。
下一篇文章,咱們來說講如何使用 Kafka。比你在網上看到的教程會更簡單,更容易懂。
關注本公衆號,回覆「爬蟲與Kafka」獲取本文對應的思惟導圖原圖。