一句話歸納本文:python
本節對queue.py模塊進行了詳細的講解,寫了一個實戰例子: 多線程抓取半次元Cos頻道的全部今日熱門圖片,最後分析了 一波模塊的源碼,瞭解他的實現套路。git
大蕾姆鎮樓:github
引言:ajax
原本是準備寫multiprocessing進程模塊的,而後呢,白天的時候隨手 想寫一個爬半次元COS頻道小姐姐的腳本,接着呢,就遇到了一個使人 很是困擾的問題:國內免費的高匿代理ip都被玩壞了(不少站點都鎖了), 幾千個裏可能就十個不到能用的,對於這種狀況,有一種應付的策略 就是:寫While True死循環,一直換代理ip直到能拿到數據爲止。 可是,假如是咱們以前的那種單線程的話,須要等待很是久的時間, 想一想一個個代理去試,而後哪怕你設置了5s的超時,也得花上很多 時間,而你抓取的網頁不止一個的話,這個時間就不是通常的長了, 這個時候不用多線程還等什麼?咱們能夠把要請求的頁面都丟到 一個容器裏,而後加鎖,而後新建頁面數量 x 訪問線程,而後每一個 線程領取一個訪問任務,而後各自執行任訪問,直到所有訪問完畢, 最後反饋完成信息。在學完threading模塊後,相信你第一個想到的 會是條件變量Contition,acquire對集合加鎖,取出一枚頁面連接, notify喚醒一枚線程,而後release鎖,接着重複這個操做,直到集合 裏的再也不有元素爲止,大概套路就是這樣,若是你有興趣能夠本身 試着去寫下,在Python的**queue模塊
**裏已經實現了一個線程安全的 多生產者,多消費者隊列,自帶鎖,多線程併發數據交換必備。數據庫
內置三種類型的隊列數組
Queue
:FIFO(先進先出);LifoQueue
:LIFO(後進先出);PriorityQueue
:優先級最小的先出;構造函數的話,都是(maxsize=0),設置隊列的容量,若是 設置的maxsize小於1,則表示隊列的長度無限長安全
兩個異常:bash
Queue.Empty
:當調用非堵塞的get()獲取空隊列元素時會引起; Queue.Full
:當調用非堵塞的put()滿隊列裏添加元素時會引起;數據結構
相關函數:多線程
qsize
():返回隊列的近似大小,注意:qsize()> 0不保證隨後的get()不會 阻塞也不保證qsize() < maxsize後的put()不會堵塞;empty
():判斷隊列是否爲空,返回布爾值,若是返回True,不保證後續 調用put()不會阻塞,同理,返回False也不保證get()調用不會被阻塞;full
():判斷隊列是否滿,返回布爾值若是返回True,不保證後續 調用get()不會阻塞,同理,返回False也不保證put()調用不會被阻塞;put
(item, block=True, timeout=None):往隊列中放入元素,若是block 爲True且timeout參數爲None(默認),爲堵塞型put(),若是timeout是 正數,會堵塞timeout時間並引起Queue.Full異常,若是block爲False則 爲非堵塞put()put_nowait
(item):等價於put(item, False),非堵塞put()get
(block=True, timeout=None):移除一個隊列元素,並返回該元素, 若是block爲True表示堵塞函數,block = False爲非堵塞函數,若是設置 了timeout,堵塞時最多堵塞超過多少秒,若是這段時間內沒有可用的 項,會引起Queue.Empty異常,若是爲非堵塞狀態,有數據可用返回數據 無數據當即拋出Queue.Empty異常;官方給出的多線程例子:
def worker():
while True:
item = q.get()
if item is None:
break
do_work(item)
q.task_done()
q = queue.Queue()
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for item in source():
q.put(item)
# block until all tasks are done
q.join()
# stop workers
for i in range(num_worker_threads):
q.put(None)
for t in threads:
t.join()
複製代碼
關於文檔的解讀大概就這些了,仍是比較簡單的,接下來實戰 寫個用到Queue隊列的多線程爬蟲例子~
拉到底部(中途加載了更多圖片,猜想又是ajax):
嗯,直接是日期耶,應該是請求參數裏的一個,F12打開開發者模式,Network 抓包開起來,隨手點開個02月08日,看下打開新連接的相關信息:
打開目錄結構看看,要找的元素都在這裏,數了下30個:
否則得出這樣的抓包信息:
抓取地址:https://bcy.net/coser/toppost100 請求方式:Get 請求參數: type(固定):lastday date:20180208
清理一波,而後滾動下,抓下加載更多的那個接口:
一樣是Ajax加載技術,不過數據不是Json,直接就是XML,點擊Preview看下:
好傢伙,果真是XML,而後不難看出**<li class="_box">
**包着的就是 一個元素,搜了下有20個,就是每次加載20個咯,算一算每日最熱 天天的圖片就是30+20 = 50個咯,整理下抓包信息:
抓取地址:https://bcy.net/coser/index/ajaxloadtoppost 請求方式:Post 請求參數: p(固定):1 type(固定):lastday date:20180207
嗯,兩個要抓的接口都一清二楚了,而後就是得到日期的範圍了, 這個就要本身慢慢試了,二分查找套路,慢慢縮減範圍,知道得 出日期的前一天和日期內容相同,日期的後一天與內容不一樣爲止, 這裏直接給出起始時間:20150918,開始抓的時間就是這個, 截止時間就是今天,好比:2018.02.09。
分析完畢,接下來就一步步寫代碼了~
比較簡單,利用datetime模塊格式化下日期,弄個循環,輕鬆完成;
簡單介紹下,cpn是我本身寫的一個模塊,**get_dx_proxy_ip()隨機獲取 一個大象代理的代理ip,接着的get_bs()**則是獲取一個BeautifulSoup對象, write_str_data()是往文件裏追加一串字符串的函數。最後還把異常給打印 出來了,運行下就知道了,這個是很是頻繁的,threading.current_thread() 得到當前線程,只是方便排查,若是不想打印任何東西,這裏直接改爲pass就 能夠了。另外,使用Θ分隔圖片名與下載連接(由於還沒學到數據庫那裏,暫時 就先寫txt裏...)
和2相似...
繼承threading.Thread類,__init__構造函數傳入一個執行函數, 重寫run函數,在此處調用傳入的執行函數。
循環,若是隊列不爲空,從裏面取出一枚數據,執行兩個抓數據 的函數,執行完畢後,調用queue對象的task_done()通知數目-1;
這裏就是建立了和任務隊列同樣數目的線程,調用daemon=True是爲了 避免由於線程死鎖或者堵塞,而後程序沒法中止的狀況,保證當程序只 剩下主線程時可以正常退出。
運行截圖:
是的,這種HTTPSConnectionPool的異常就是那麼頻發,代理ip問題,不是 你程序的緣由,打開bcycos_url.xml,驗證下數據有沒有問題:
(PS:這裏有些重複是網站原本就重複,一開始還覺得是我程序出錯... 還有,這裏沒有抓取全部的,只抓了:20150918到20150930的,數據多得一批...)
就是處理字符串,得到下載連接,還有圖片名的拼接而已~
運行截圖:
能夠打開輸入目錄驗證下:
使用Queue編寫一個多線程爬蟲就是那麼簡單~ 接下來會摳下Queue的源碼,有興趣的能夠繼續看,沒興趣的話直接跳過便可~
直接點進去queue.py,源碼只有249行,還好,看下源碼結構
點開兩個異常,很是簡單,繼承Exception而已,咱們更關注**__all__
**
all:在模塊級別暴露公共接口,好比在導庫的時候不建議寫 *from xxx import ,由於會把xxx模塊裏全部非下劃線開頭的成員都 引入到當前命名空間中,可能會污染當前命名空間。若是顯式聲明瞭 all,import * 就只會導入 all 列出的成員。 (不建議使用:**from xxx import *** 這種語法!!!)
接着看下Queue類結構,老規矩,先擼下**init**方法
文檔註釋裏寫了:建立一個maxsize大小的隊列,若是<=0,隊列大小是無窮的。 設置了maxsize,而後調用self._init(maxsize),點進去看下:
這個deque是什麼?
實際上是collections模塊提供的雙端隊列,能夠從隊列頭部快速 增長和取出對象,對應兩個方法:popleft()與appendleft(), 時間複雜度只有O(1),相比起**list對象的insert(0,v)和pop(0)**的 時間複雜度爲O(N),列表元素越多,元素進出耗時會越長!
回到源碼,接着還定義了: mutex:threading.Lock(),定義一個互斥鎖 not_empty = threading.Condition(self.mutex):定義一個非空的條件變量 not_full = threading.Condition(self.mutex):定義一個非滿的條件變量 all_tasks_done = threading.Condition(self.mutex):定義一個任務都完成的條件變量 unfinished_tasks = 0:初始化未完成的任務數量爲0
接着到**task_done()**方法:
with加鎖,未完成任務數量-1,判斷未完成的任務數量, 小於0,拋出異常:task_done調用次數過多,等於0則喚醒 全部等待線程,修改未完成任務數量;
再接着到**join()**方法:
with加鎖,若是還有未完成的任務,wait堵塞調用者進程; 接下來是qsize,empty和full函數,with加鎖返回大小而已:
接着是**put()**函數:
with加鎖,判斷maxsize是否大於0,上面也講了maxsize<=0表明 隊列是能夠無限擴展的,那就不存在隊列滿的狀況,maxsize<=0 的話直接就往隊列裏放元素就能夠了,同時未完成任務數+1,隨機 喚醒等待線程。
若是maxsize大於0表明有固定容量,就會出現隊列滿的狀況,就須要 進行細分了:
再接着是get()函數,和put()相似,只是拋出的異常爲:Empty
這兩個就不用說了,非堵塞put()和get(),最後就是操做雙端隊列的方法而已;
另外兩種類型的隊列也很是簡單,繼承Queue類,而後重寫對應的四個 方法而已~
PriorityQueue優先級隊裏的heappush()和heappop()是heapq模塊 提供的兩個方法,heap隊列,q隊列,堆通常可看作是一棵樹的 數組對象(二叉樹堆),規則以下: 某個節點的值老是不大於或不小於其孩子節點的值 而後又分最大堆和最小堆:
(這裏大概知道是二叉樹就行了,筆者數據結構也學的比較爛...)
利用:heappush()能夠把數據放到堆裏,會自動按照二叉樹的結構進行存儲; 利用:heappop(heap):從heap堆中刪除最小元素,並返回,heap再按徹底二叉樹規範重排;
queue.py模塊大概的流程就是這個樣子咯,總結下套路把:
關鍵點核心:三個條件變量,
not_empty:get的時候,隊列空或在超時時間內,堵塞讀取線程,非空喚醒讀取線程; not_full:put的時候,隊列滿或在超時時間內,堵塞寫入線程,非滿喚醒寫入線程; all_tasks_done:未完成任務unfinished_tasks不爲0的時候堵塞調用隊列的線程, 未完成任務不爲0時喚醒全部調用隊列的線程;
大概就這樣~
本節把queue模塊個擼了一遍,不止是熟悉API,還把源碼給擼了, 擼源碼感受就是在一件件脫妹子的衣服同樣,每次總能發現新大陸~ 嘿嘿,挺好玩的,就說那麼多吧~
(PS:Coser的質量真是良莠不齊,大部分是靠的化妝和濾鏡,我仍是喜歡素顏 小姐姐還有萌大奶~,最後來個辣眼睛的Coser給你洗洗眼。O(∩_∩)O)
本節源碼下載
來啊,Py交易啊
想加羣一塊兒學習Py的能夠加下,智障機器人小Pig,驗證信息裏包含: Python,python,py,Py,加羣,交易,屁眼 中的一個關鍵詞便可經過;
驗證經過後回覆 加羣 便可得到加羣連接(不要把機器人玩壞了!!!)~~~ 歡迎各類像我同樣的Py初學者,Py大神加入,一塊兒愉快地交流學♂習,van♂轉py。