線程之間的通訊咱們列表行不行呢,固然行,那麼隊列和列表有什麼區別呢?python
queue隊列 :使用import queue,用法與進程Queue同樣nginx
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.git
queue.
Queue
(maxsize=0) #先進先出
import queue #不須要經過threading模塊裏面導入,直接import queue就能夠了,這是python自帶的 #用法基本和咱們進程multiprocess中的queue是同樣的 q=queue.Queue() q.put('first') q.put('second') q.put('third') # q.put_nowait() #沒有數據就報錯,能夠經過try來搞 print(q.get()) print(q.get()) print(q.get()) # q.get_nowait() #沒有數據就報錯,能夠經過try來搞 ''' 結果(先進先出): first second third ''' 先進先出示例代碼
class queue.
LifoQueue
(maxsize=0) #last in fisrt outgithub
import queue q=queue.LifoQueue() #隊列,相似於棧,棧咱們提過嗎,是否是先進後出的順序啊 q.put('first') q.put('second') q.put('third') # q.put_nowait() print(q.get()) print(q.get()) print(q.get()) # q.get_nowait() ''' 結果(後進先出): third second first ''' 先進後出示例代碼
class queue.
PriorityQueue
(maxsize=0) #存儲數據時可設置優先級的隊列編程
import queue q=queue.PriorityQueue() #put進入一個元組,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高 q.put((-10,'a')) q.put((-5,'a')) #負數也能夠 # q.put((20,'ws')) #若是兩個值的優先級同樣,那麼按照後面的值的acsii碼順序來排序,若是字符串第一個數元素相同,比較第二個元素的acsii碼順序 # q.put((20,'wd')) # q.put((20,{'a':11})) #TypeError: unorderable types: dict() < dict() 不能是字典 # q.put((20,('w',1))) #優先級相同的兩個數據,他們後面的值必須是相同的數據類型才能比較,能夠是元祖,也是經過元素的ascii碼順序來排序 q.put((20,'b')) q.put((20,'a')) q.put((0,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) ''' 結果(數字越小優先級越高,優先級高的優先出隊): ''' 優先級隊列示例代碼
這三種隊列都是線程安全的,不會出現多個線程搶佔同一個資源或數據的狀況。數組
協程:是單線程下的併發,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。、安全
須要強調的是:網絡
#1. python的線程屬於內核級別的,即由操做系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其餘線程運行) #2. 單線程內開啓協程,一旦遇到io,就會從應用程序級別(而非操做系統)控制切換,以此來提高效率(!!!非io操做的切換與效率無關)
對比操做系統控制線程的切換,用戶在單線程內控制協程的切換多線程
優勢以下:併發
#1. 協程的切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級 #2. 單線程內就能夠實現併發的效果,最大限度地利用cpu
缺點以下:
#1. 協程的本質是單線程下,沒法利用多核,能夠是一個程序開啓多個進程,每一個進程內開啓多個線程,每一個線程內開啓協程 #2. 協程指的是單個線程,於是一旦協程出現阻塞,將會阻塞整個線程
總結協程特色:
若是咱們在單個線程內有20個任務,要想實如今多個任務之間切換,使用yield生成器的方式過於麻煩(須要先獲得初始化一次的生成器,而後再調用send。。。很是麻煩),而使用greenlet模塊能夠很是簡單地實現這20個任務直接的切換
#安裝 pip3 install greenlet
#真正的協程模塊就是使用greenlet完成的切換 from greenlet import greenlet def eat(name): print('%s eat 1' %name) #2 g2.switch('taibai') #3 print('%s eat 2' %name) #6 g2.switch() #7 def play(name): print('%s play 1' %name) #4 g1.switch() #5 print('%s play 2' %name) #8 g1=greenlet(eat) g2=greenlet(play) g1.switch('taibai')#能夠在第一次switch時傳入參數,之後都不須要 1
單純的切換(在沒有io的狀況下或者沒有重複開闢內存空間的操做),反而會下降程序的執行速度
#順序執行 import time def f1(): res=1 for i in range(100000000): res+=i def f2(): res=1 for i in range(100000000): res*=i start=time.time() f1() f2() stop=time.time() print('run time is %s' %(stop-start)) #10.985628366470337 #切換 from greenlet import greenlet import time def f1(): res=1 for i in range(100000000): res+=i g2.switch() def f2(): res=1 for i in range(100000000): res*=i g1.switch() start=time.time() g1=greenlet(f1) g2=greenlet(f2) g1.switch() stop=time.time() print('run time is %s' %(stop-start)) # 52.763017892837524 效率對比
greenlet只是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時若是遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提高效率的問題。
上面這個圖,是協程真正的意義,雖然沒有規避固有的I/O時間,可是咱們使用這個時間來作別的事情了,通常在工做中咱們都是進程+線程+協程的方式來實現併發,以達到最好的併發效果,若是是4核的cpu,通常起5個進程,每一個進程中20個線程(5倍cpu數量),每一個線程能夠起500個協程,大規模爬取頁面的時候,等待網絡延遲的時間的時候,咱們就能夠用協程去實現併發。 併發數量 = 5 * 20 * 500 = 50000個併發,這是通常一個4cpu的機器最大的併發數。nginx在負載均衡的時候最大承載量就是5w個
單線程裏的這20個任務的代碼一般會既有計算操做又有阻塞操做,咱們徹底能夠在執行任務1時遇到阻塞,就利用阻塞的時間去執行任務2。。。。如此,才能提升效率,這就用到了Gevent模塊。
#安裝 pip3 install gevent
Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。
#用法 g1=gevent.spawn(func,1,2,3,x=4,y=5)建立一個協程對象g1,spawn括號內第一個參數是函數名,如eat,後面能夠有多個參數,能夠是位置實參或關鍵字實參,都是傳給函數eat的,spawn是異步提交任務 g2=gevent.spawn(func2) g1.join() #等待g1結束,上面只是建立協程對象,這個join纔是去執行 g2.join() #等待g2結束 有人測試的時候會發現,不寫第二個join也能執行g2,是的,協程幫你切換執行了,可是你會發現,若是g2裏面的任務執行的時間長,可是不寫join的話,就不會執行完等到g2剩下的任務了 #或者上述兩步合做一步:gevent.joinall([g1,g2]) g1.value#拿到func1的返回值
遇到IO阻塞時會自動切換任務
import gevent def eat(name): print('%s eat 1' %name) gevent.sleep(2) print('%s eat 2' %name) def play(name): print('%s play 1' %name) gevent.sleep(1) print('%s play 2' %name) g1=gevent.spawn(eat,'egon') g2=gevent.spawn(play,name='egon') g1.join() g2.join() #或者gevent.joinall([g1,g2]) print('主') 遇到I/O切換
上例gevent.sleep(2)模擬的是gevent能夠識別的io阻塞,
而time.sleep(2)或其餘的阻塞,gevent是不能直接識別的須要用下面一行代碼,打補丁,就能夠識別了
from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模塊以前
或者咱們乾脆記憶成:要用gevent,須要將from gevent import monkey;monkey.patch_all()放到文件的開頭
from gevent import monkey;monkey.patch_all() #必須寫在最上面,這句話後面的全部阻塞所有可以識別了 import gevent #直接導入便可 import time def eat(): #print() print('eat food 1') time.sleep(2) #加上mokey就可以識別到time模塊的sleep了 print('eat food 2') def play(): print('play 1') time.sleep(1) #來回切換,直到一個I/O的時間結束,這裏都是咱們個gevent作得,再也不是控制不了的操做系統了。 print('play 2') g1=gevent.spawn(eat) g2=gevent.spawn(play_phone) gevent.joinall([g1,g2]) print('主')
咱們能夠用threading.current_thread().getName()來查看每一個g1和g2,查看的結果爲DummyThread-n,即假線程,虛擬線程,其實都在一個線程裏面
進程線程的任務切換是由操做系統自行切換的,你本身不能控制
協程是經過本身的程序(代碼)來進行切換的,本身可以控制,只有遇到協程模塊可以識別的IO操做的時候,程序纔會進行任務切換,實現併發效果,若是全部程序都沒有IO操做,那麼就基本屬於串行執行了。
from gevent import spawn,joinall,monkey;monkey.patch_all() import time def task(pid): """ Some non-deterministic task """ time.sleep(0.5) print('Task %s done' % pid) def synchronous(): for i in range(10): task(i) def asynchronous(): g_l=[spawn(task,i) for i in range(10)] joinall(g_l) if __name__ == '__main__': print('Synchronous:') synchronous() print('Asynchronous:') asynchronous() #上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,後者阻塞當前流程,並執行全部給定的greenlet。執行流程只會在 全部greenlet執行完後纔會繼續向下走。 協程:同步異步對比
from gevent import monkey;monkey.patch_all() import gevent import requests import time def get_page(url): print('GET: %s' %url) response=requests.get(url) if response.status_code == 200: print('%d bytes received from %s' %(len(response.text),url)) start_time=time.time() gevent.joinall([ gevent.spawn(get_page,'https://www.python.org/'), gevent.spawn(get_page,'https://www.yahoo.com/'), gevent.spawn(get_page,'https://github.com/'), ]) stop_time=time.time() print('run time is %s' %(stop_time-start_time)) 協程應用:爬蟲
將上面的程序最後加上一段串行的代碼看看效率:若是你的程序不須要過高的效率,那就不用什麼併發啊協程啊之類的東西。
print('--------------------------------') s = time.time() requests.get('https://www.python.org/') requests.get('https://www.yahoo.com/') requests.get('https://github.com/') t = time.time() print('串行時間>>',t-s)
經過gevent實現單線程下的socket併發(from gevent import monkey;monkey.patch_all()必定要放到導入socket模塊以前,不然gevent沒法識別socket的阻塞)
一個網絡請求裏面通過多個時間延遲time
from gevent import monkey;monkey.patch_all() from socket import * import gevent #若是不想用money.patch_all()打補丁,能夠用gevent自帶的socket # from gevent import socket # s=socket.socket() def server(server_ip,port): s=socket(AF_INET,SOCK_STREAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind((server_ip,port)) s.listen(5) while True: conn,addr=s.accept() gevent.spawn(talk,conn,addr) def talk(conn,addr): try: while True: res=conn.recv(1024) print('client %s:%s msg: %s' %(addr[0],addr[1],res)) conn.send(res.upper()) except Exception as e: print(e) finally: conn.close() if __name__ == '__main__': server('127.0.0.1',8080) 服務端
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) 客戶端
from threading import Thread from socket import * import threading def client(server_ip,port): c=socket(AF_INET,SOCK_STREAM) #套接字對象必定要加到函數內,即局部名稱空間內,放在函數外則被全部線程共享,則你們公用一個套接字對象,那麼客戶端端口永遠同樣了 c.connect((server_ip,port)) count=0 while True: c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8')) msg=c.recv(1024) print(msg.decode('utf-8')) count+=1 if __name__ == '__main__': for i in range(500): t=Thread(target=client,args=('127.0.0.1',8080)) t.start() 多線程併發多個客戶端,去請求上面的服務端是沒問題的
上面代碼的服務端用gevent的時候爲何沒有用join就執行了。