所謂同步就是一個任務須要依賴另外一個任務時,只有被依賴任務執行完畢以後,依賴的任務纔會完成.這是可靠的任務序列.要麼都成功,要麼失敗,兩個任務的狀態能夠保持一致.html
所謂異步不須要等待被依賴的任務完成,只是通知依賴的任務要完成什麼工做.依賴的任務也當即執行,只要本身完成了整個任務就算完成了. 至於被依賴的任務是否完成,依賴它的任務沒法肯定,是不可靠的任務序列python
### 同步和異步 ## 好比我去銀行辦理業務,可能會有兩種方式: # 第一種 :選擇排隊等候; # 第二種 :選擇取一個小紙條上面有個人號碼,等到排到我這一號時由櫃檯的人通知我輪到我去辦理業務了; # 第一種:前者(排隊等候)就是同步等待消息通知,也就是我要一直在等待銀行辦理業務狀況; # 第二種:後者(等待別人通知)就是異步等待消息通知。在異步消息處理中,等待消息通知者(在這個例子中就是等待辦理業務的人)每每註冊一個回調機制,在所等待的事件被觸發時由觸發機制(在這裏是櫃檯的人)經過某種機制(在這裏是寫在小紙條上的號碼,喊號)找到等待該事件的人。
阻塞和非阻塞兩個概念與程序(也就是執行程序的'線程')等待消息通知時的狀態相關git
在程序中,阻塞表明程序'卡'在某處,必須等待這處執行完畢才能繼續執行.一般的阻塞大多數是IO阻塞github
好比:銀行排隊取錢是一條流水線,如今負責取錢的服務人員餓了,他必須吃飯(阻塞). 只有吃完飯才能繼續回來服務你.此時你就必須等待他,不然你將沒法取錢.對於程序而言,就卡在了此處.編程
非阻塞就是沒有IO阻塞,線程在執行任務時沒有遇到IO阻塞.服務器
好比:你去銀行取錢,在排隊'等候'時什麼事情都沒有發生. 強調在執行的過程網絡
效率最低.你排着隊取錢,服務人員吃飯去了(阻塞了),此時你只能等待,不然不能取錢.這就是同步+阻塞數據結構
在銀行等待辦理業務的人,採用異步方式. 可是他不能離開銀行併發
異步操做是能夠被阻塞住的,只不過它不是在處理消息時阻塞,而是在等待消息通知時被阻塞。app
其實是效率低下的。
想象一下你一邊打着電話一邊還須要擡頭看到底隊伍排到你了沒有,若是把打電話和觀察排隊的位置當作是程序的兩個操做的話,這個程序須要在這兩種不一樣的行爲之間來回的切換,效率可想而知是低下的。
效率更高
由於打電話是你(等待者)的事情,而通知你則是櫃檯(消息觸發機制)的事情,程序沒有在兩種不一樣的操做中來回切換。
好比說,這我的忽然發覺本身煙癮犯了,須要出去抽根菸,因而他告訴大堂經理說,排到我這個號碼的時候麻煩到外面通知我一下,那麼他就沒有被阻塞在這個等待的操做上面,天然這個就是異步+非阻塞的方式了。
不少人會把同步和阻塞混淆,是由於不少時候同步操做會以阻塞的形式表現出來
,一樣的,不少人也會把異步和非阻塞混淆,由於異步操做通常都不會在真正的IO操做處被阻塞
。
####### 併發爬取 , 併發處理爬取結果 # 缺點: 1.加強了耦合性, # 2.開啓進程耗費資源 # 優勢: 1. 提升處理效率 from concurrent.futures import ProcessPoolExecutor import time import random import os import requests def get_html(url): response=requests.get(url) print(f'{os.getpid()} 正在爬取網頁~~~') if response.status_code==200: parser_html(response.text) def parser_html(obj): print(f'總字符長度:{len(obj.result()) }') if __name__ == '__main__': url_list = [ 'http://www.taobao.com', 'http://www.JD.com', 'http://www.JD.com', 'http://www.JD.com', 'http://www.baidu.com', 'https://www.cnblogs.com/jin-xin/articles/11232151.html', 'https://www.cnblogs.com/jin-xin/articles/10078845.html', 'http://www.sina.com.cn', 'https://www.sohu.com', 'https://www.youku.com', ] pool=ProcessPoolExecutor(4) # 開啓了一個進程池 有4個進程資源 for url in url_list: obj=pool.submit(get_html,url) # 異步的開啓了 10個任務,4個進程並行(併發)執行. pool.shutdown(wait=True) # 必須等待全部的子進程任務執行完畢
# 併發爬取, 串行解析結果 ########### 回調函數 + 異步 # 1. 下降了耦合性, 由回調函數 去通知執行下一個任務(形成這個任務會經歷串行) # 2. 處理爬取結果時是串行處理,影響效率 import requests from concurrent.futures import ProcessPoolExecutor from multiprocessing import Process import time import random import os def get(url): response = requests.get(url) print(f'{os.getpid()} 正在爬取:{url}') # time.sleep(random.randint(1,3)) if response.status_code == 200: return response.text def parse(obj): ''' 對爬取回來的字符串的分析 簡單用len模擬一下. :param text: :return: ''' time.sleep(1) ### obj.result() 取得結果 print(f'{os.getpid()} 分析結果:{len(obj.result())}') if __name__ == '__main__': url_list = [ 'http://www.taobao.com', 'http://www.JD.com', 'http://www.JD.com', 'http://www.JD.com', 'http://www.baidu.com', 'https://www.cnblogs.com/jin-xin/articles/11232151.html', 'https://www.cnblogs.com/jin-xin/articles/10078845.html', 'http://www.sina.com.cn', 'https://www.sohu.com', 'https://www.youku.com', ] start_time = time.time() pool = ProcessPoolExecutor(4) for url in url_list: obj = pool.submit(get, url) obj.add_done_callback(parse) # 增長一個回調函數 # 如今的進程完成的仍是網絡爬取的任務,拿到了返回值以後,結果丟給回調函數add_done_callback, # 回調函數幫助你分析結果 # 進程繼續完成下一個任務. pool.shutdown(wait=True) print(f'主: {time.time() - start_time}')
使用 queue 模塊
Queue
# -*-coding:utf-8-*- # Author:Ds import queue q = queue.Queue(3) # 先進先出隊列 q.put(1) q.put(2) q.put('123') # q.put(666) # 阻塞 卡住了 # q.put(timeout=1) # 超時1秒報錯 queue.Full # q.put(1,block=False) # 非阻塞,直接報錯 queue.Full print(q.get()) print(q.get()) print(q.get()) # print(q.get()) #阻塞 卡住 # print(q.get(timeout=1)) #超時1秒報錯 queue.Empty print(q.get(block=False)) #非阻塞,直接報錯queue.Empty
LifoQueue
# -*-coding:utf-8-*- # Author:Ds import queue q = queue.LifoQueue(3) #後進先出隊列 (棧) q.put(1) q.put(2) q.put('123') # q.put(666) # 阻塞 卡住了 ## q.put(timeout=1) # 超時1秒報錯 queue.Full ### q.put(1,block=False) # 非阻塞,直接報錯 queue.Full print(q.get()) print(q.get()) print(q.get()) # print(q.get()) #阻塞 卡住 ## print(q.get(timeout=1)) #超時1秒報錯 queue.Empty ### print(q.get(block=False)) #非阻塞,直接報錯queue.Empty
# 使用列表數據結構模擬棧 li=[] li.append(1) # 後進 添加元素到列表末尾 li.pop() # 先出 移除列表末尾元素
PriorityQueue
# -*-coding:utf-8-*- # Author:Ds import queue q = queue.PriorityQueue(3) # 優先級隊列 # 放入元組類型()數據, 第一個參數表示優先級別,第二個參數是真實數據 # 數字越低表示優先級越高 q.put((10, '垃圾消息')) q.put((-9, '緊急消息')) q.put((3, '通常消息')) # q.put((3, '我被卡主了 ')) # 卡主了 # q.put((3, '我被卡主了 '),timeout=1) # 超時報錯: queue.Full q.put((3, '我被卡主了 '),block=False) # 不阻塞: queue.Full print(q.get()) print(q.get()) print(q.get()) print(q.get()) #阻塞 卡住 print(q.get(timeout=1)) #超時1秒報錯 queue.Empty print(q.get(block=False)) #非阻塞,直接報錯queue.Empty
線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行
event.isSet():返回event的狀態值;
event.wait():若是 event.isSet()==False將阻塞線程;
event.set():設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態,等待操做系統調度;
event.clear():恢復event的狀態值爲False
import time from threading import Thread from threading import current_thread from threading import Event event = Event() # 默認是False def task(): print(f'{current_thread().name} 檢測服務器是否正常開啓....') time.sleep(3) event.set() # 改爲了True def task1(): print(f'{current_thread().name} 正在嘗試鏈接服務器') # event.wait() # 輪詢檢測event是否爲True,當其爲True,繼續下一行代碼. 阻塞. event.wait(1) # 設置超時時間,若是1s中之內,event改爲True,代碼繼續執行. # 設置超時時間,若是超過1s中,event沒作改變,代碼繼續執行. print(f'{current_thread().name} 鏈接成功') if __name__ == '__main__': t1 = Thread(target=task1,) t2 = Thread(target=task1,) t3 = Thread(target=task1,) t = Thread(target=task) t.start() t1.start() t2.start() t3.start()
# _*_coding:utf-8_*_ # Author :Ds # CreateTime 2019/5/30 17:54 import threading ,time event=threading.Event() # 聲明一個event全局變量 def lighter(): count=0 #計數 event.set() #設置有標誌 while True: #循環 if count > 5 and count<10:# 紅燈5秒 event.clear()# 清空標誌位 print("\033[41;1mred light is on...\033[0m") elif count>10: # 綠燈5秒 event.set()#變綠燈 count=0 #清空count else: print('\033[42;1mgreen light is on...\033[0m') time.sleep(1) count+=1 def car(name): while True: if event.is_set(): #is_set 判斷設置了標誌位沒有 print('[%s] running ...'%name) time.sleep(1) else: print(' [%s] see red light waiting '%name) event.wait() print('\033[43;lm [%s] green light is on ,start going ..\033[0m'%name) light=threading.Thread(target=lighter,) car1=threading.Thread(target=car,args=('特斯拉',)) car2=threading.Thread(target=car,args=('奔馳',)) light.start() car1.start() car2.start()
進程是資源分配的最小單位,線程是CPU調度的最小單
併發的本質:切換+保存狀態
cpu正在運行一個任務,會在兩種狀況下切走去執行其餘的任務(切換由操做系統強制控制),一種狀況是該任務發生了阻塞,另一種狀況是該任務計算的時間過長
ps:在介紹進程理論時,說起進程的三種執行狀態,而線程纔是執行單位,因此也能夠將上圖理解爲線程的三種狀態
1. yiled能夠保存狀態,yield的狀態保存與操做系統的保存線程狀態很像,可是yield是代碼級別控制的,更輕量級
2. send能夠把一個函數的結果傳給另一個函數,以此實現單線程內程序之間的切換
一:其中第二種狀況並不能提高效率,只是爲了讓cpu可以雨露均沾,實現看起來全部任務都被「同時」執行的效果,若是多個任務都是純計算的,這種切換反而會下降效率。爲此咱們能夠基於yield來驗證。yield自己就是一種在單線程下能夠保存任務運行狀態的方法,咱們來簡單複習一下:
''' 一、協程: 單線程實現併發 在應用程序裏控制多個任務的切換+保存狀態 優勢: 應用程序級別速度要遠遠高於操做系統的切換 缺點: 多個任務一旦有一個阻塞沒有切,整個線程都阻塞在原地 該線程內的其餘的任務都不能執行了 一旦引入協程,就須要檢測單線程下全部的IO行爲, 實現遇到IO就切換,少一個都不行,覺得一旦一個任務阻塞了,整個線程就阻塞了, 其餘的任務即使是能夠計算,可是也沒法運行了 二、協程序的目的: 想要在單線程下實現併發 併發指的是多個任務看起來是同時運行的 併發=切換+保存狀態 ''' #串行執行 import time def func1(): for i in range(10000000): i+1 def func2(): for i in range(10000000): i+1 start = time.time() func1() func2() stop = time.time() print(stop - start) #基於yield併發執行 import time def func1(): while True: yield def func2(): g=func1() for i in range(10000000): i+1 next(g) start=time.time() func2() stop=time.time() print(stop-start) # 單純地切換反而會下降運行效率
二:第一種狀況的切換。在任務一遇到io狀況下,切到任務二去執行,這樣就能夠利用任務一阻塞的時間完成任務二的計算,效率的提高就在於此。
import time def func1(): while True: print('func1') yield def func2(): g=func1() for i in range(10000000): i+1 next(g) time.sleep(3) print('func2') start=time.time() func2() stop=time.time() print(stop-start) yield不能檢測IO,實現遇到IO自動切換
協程:是單線程下的併發,又稱微線程,纖程。英文名Coroutine。
一句話說明什麼是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。
須要強調的是:
#1. python的線程屬於內核級別的,即由操做系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其餘線程運行) #2. 單線程內開啓協程,一旦遇到io,就會從應用程序級別(而非操做系統)控制切換,以此來提高效率(!!!非io操做的切換與效率無關)
1.協程的切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級
2.單線程內就能夠實現併發的效果,最大限度地利用cpu
1.協程的本質是單線程下,沒法利用多核,能夠是一個程序開啓多個進程,每一個進程內開啓多個線程,每一個線程內開啓協程
2.協程指的是單個線程,於是一旦協程出現阻塞,將會阻塞整個線程
1.必須在只有一個單線程裏實現併發
2.修改共享數據不需加鎖
3.用戶程序裏本身保存多個控制流的上下文棧
4.附加:一個協程遇到IO操做自動切換到其它協程(如何實現檢測IO,yield、greenlet都沒法實現,就用到了gevent模塊(select機制))
安裝 :pip3 install greenlet
手動實現切換
from greenlet import greenlet def eat(name): print('%s eat 1' %name) g2.switch('egon') print('%s eat 2' %name) g2.switch() def play(name): print('%s play 1' %name) g1.switch() print('%s play 2' %name) g1=greenlet(eat) g2=greenlet(play) g1.switch('egon') #能夠在第一次switch時傳入參數,之後都不須要
效率對比:
greenlet只是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時若是遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提高效率的問題。
### 串行執行計算密集型~~ 11.37856674194336 import time def f1(): res=1 for i in range(100000000): res+=i def f2(): res=1 for i in range(100000000): res*=i start_time=time.time() f1() f2() print(f'runing time {time.time()-start_time}') # runing time 11.37856674194336 ### 切換執行計算密集型~~ runing time 60.24287223815918 from greenlet import greenlet import time def f1(): res=1 for i in range(100000000): res+=i g2.switch() def f2(): res=1 for i in range(100000000): res*=i g1.switch() start_time=time.time() g1=greenlet(f1) g2=greenlet(f2) g1.switch() print(f'runing time {time.time()-start_time}') # runing time 60.24287223815918
安裝:pip3 install gevent
Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度
### 用法 g1=gevent.spawn(func,1,,2,3,x=4,y=5)建立一個協程對象g1,spawn括號內第一個參數是函數名,如eat,後面能夠有多個參數,能夠是位置實參或關鍵字實參,都是傳給函數eat的 g2=gevent.spawn(func2) g1.join() #等待g1結束 g2.join() #等待g2結束 #或者上述兩步合做一步:gevent.joinall([g1,g2]) g1.value#拿到func1的返回值
遇到IO阻塞時會自動切換任務
import gevent def eat(name): print('%s eat 1' %name) gevent.sleep(2) print('%s eat 2' %name) def play(name): print('%s play 1' %name) gevent.sleep(1) print('%s play 2' %name) g1=gevent.spawn(eat,'egon') g2=gevent.spawn(play,name='egon') g1.join() g2.join() #或者gevent.joinall([g1,g2]) print('主')
'打補丁:monkey'
# from gevent import monkey # monkey.patch_all() 必須放到被打補丁者的前面, import threading from gevent import monkey monkey.patch_all() # 打補丁,自動切換 import gevent import time def eat(): print(threading.current_thread().getName()) # 虛擬線程 DummyThread-n print('eat food 1') time.sleep(2) print('eat food 2') def play(): print(threading.current_thread().getName()) # 虛擬線程 DummyThread-n print('play 1') time.sleep(1) print('play 2') g1=gevent.spawn(eat) g2=gevent.spawn(play) gevent.joinall([g1,g2]) # 執行g1 g2 print(threading.current_thread().getName()) # MainThread 主線程 print('主')
協程應用:爬蟲:
from gevent import monkey;monkey.patch_all() import gevent import requests import time def get_page(url): print('GET: %s' %url) response=requests.get(url) if response.status_code == 200: print('%d bytes received from %s' %(len(response.text),url)) start_time=time.time() gevent.joinall([ gevent.spawn(get_page,'https://www.python.org/'), gevent.spawn(get_page,'https://www.yahoo.com/'), gevent.spawn(get_page,'https://github.com/'), ]) stop_time=time.time() print('run time is %s' %(stop_time-start_time)) # 使用協程爬取,計算爬取的時間