python線程、協程、I/O多路複用

目錄:

  • 併發多線程
  • 協程
  • I/O多路複用(未完成,待續)

1、併發多線程

一、線程簡述:python

一條流水線的執行過程是一個線程,一條流水線必須屬於一個車間,一個車間的運行過程就是一個進程(一個進程內至少一個線程)mysql

進程是資源單位git

而線程纔是cpu上的執行單位github

二、線程的優勢:共享資源、建立開銷小web

三、線程的模塊開啓方式之threading模塊redis

multiprocess模塊的徹底模仿了threading模塊的接口。執行以下圖:sql

 

方式一: 函數式調用編程

 1 from threading import Thread
 2 import time
 3 def sayhi(name): # 定義函數
 4     time.sleep(2)
 5     print('%s say hello' %name)
 6 
 7 if __name__ == '__main__':
 8     t=Thread(target=sayhi,args=('wangshuyang',)) #定義線程類並傳參
 9     t.start()  # 調用
10     print('主線程')
View Code

方式二:類式調用服務器

 1 #方式二
 2 from threading import Thread
 3 import time
 4 class Sayhi(Thread):  # 引用類
 5     def __init__(self,name):
 6         super().__init__()
 7         self.name=name
 8     def run(self):
 9         time.sleep(2)
10         print('%s say hello' % self.name)
11 
12 
13 if __name__ == '__main__':
14     t = Sayhi('wangshuyang') #定義類
15     t.start() #調用
16     print('主線程')
View Code

 四、多進程與多線程對比:多線程

多線程比多進程更快,開銷更小

 1 from threading import Thread
 2 from multiprocessing import Process
 3 import os
 4 
 5 def work():
 6     print('hello')
 7 
 8 if __name__ == '__main__':
 9     #在主進程下開啓線程
10     t=Thread(target=work)
11     t.start()
12     print('主線程/主進程')
13     '''
14     打印結果:
15     hello
16     主線程/主進程
17     '''
18 
19     #在主進程下開啓子進程
20     t=Process(target=work)
21     t.start()
22     print('主線程/主進程')
23     '''
24     打印結果:
25     主線程/主進程
26     hello
27     '''
View Code

pid進程號對比

 1 from threading import Thread
 2 from multiprocessing import Process
 3 import os
 4 
 5 def work():
 6     print('hello',os.getpid())
 7 
 8 if __name__ == '__main__':
 9     #part1:在主進程下開啓多個線程,每一個線程都跟主進程的pid同樣
10     t1=Thread(target=work)
11     t2=Thread(target=work)
12     t1.start()
13     t2.start()
14     print('主線程/主進程pid',os.getpid())
15 
16     #part2:開多個進程,每一個進程都有不一樣的pid
17     p1=Process(target=work)
18     p2=Process(target=work)
19     p1.start()
20     p2.start()
21     print('主線程/主進程pid',os.getpid())
View Code

五、多線程併發socket示例

 1 #_*_coding:utf-8_*_
 2 #!/usr/bin/env python
 3 import multiprocessing
 4 import threading
 5 
 6 import socket
 7 s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
 8 s.bind(('127.0.0.1',8080))
 9 s.listen(5)
10 
11 def action(conn):
12     while True:
13         data=conn.recv(1024)
14         print(data)
15         conn.send(data.upper())
16 
17 if __name__ == '__main__':
18 
19     while True:
20         conn,addr=s.accept()
21 
22 
23         p=threading.Thread(target=action,args=(conn,))
24         p.start()
服務端
 1 #_*_coding:utf-8_*_
 2 #!/usr/bin/env python
 3 
 4 
 5 import socket
 6 
 7 s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
 8 s.connect(('127.0.0.1',8080))
 9 
10 while True:
11     msg=input('>>: ').strip()
12     if not msg:continue
13 
14     s.send(msg.encode('utf-8'))
15     data=s.recv(1024)
16     print(data)
客戶端

六、多線程模擬文件操做示例

 1 from threading import Thread
 2 msg_l=[]
 3 format_l=[]
 4 def talk():
 5     while True:
 6         msg=input('>>: ').strip()
 7         if not msg:continue
 8         msg_l.append(msg)
 9 
10 def format_msg():
11     while True:
12         if msg_l:
13             res=msg_l.pop()
14             format_l.append(res.upper())
15 
16 def save():
17     while True:
18         if format_l:
19             with open('db.txt','a',encoding='utf-8') as f:
20                 res=format_l.pop()
21                 f.write('%s\n' %res)
22 
23 if __name__ == '__main__':
24     t1=Thread(target=talk)
25     t2=Thread(target=format_msg)
26     t3=Thread(target=save)
27     t1.start()
28     t2.start()
29     t3.start()
View Code

七、threading模塊之調用方法

1)join與setdaemon

與進程的方法都是相似的,實際上是multiprocessing模仿threading的接口

join 等待線程執行完成,執行主進程

setdaemon 守護線程,主進程關閉,線程關閉

 1 from threading import Thread
 2 import time
 3 def sayhi(name):
 4     time.sleep(2)
 5     print('%s say hello' %name)
 6 
 7 if __name__ == '__main__':
 8     t=Thread(target=sayhi,args=('egon',))
 9     t.setDaemon(True)
10     t.start()
11     t.join()
12     print('主線程')
13     print(t.is_alive())
View Code

2)Thread實例對象的方法

isAlive(): 返回線程是否活動的。
getName(): 返回線程名。
setName(): 設置線程名。

3)其餘方法

threading.currentThread(): 返回當前的線程變量。
threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。

 1 from threading import Thread
 2 import threading
 3 from multiprocessing import Process
 4 import os
 5 
 6 def work():
 7     import time
 8     time.sleep(3)
 9     print(threading.current_thread().getName())
10 
11 
12 if __name__ == '__main__':
13     #在主進程下開啓線程
14     t=Thread(target=work)
15     t.start()
16 
17     print(threading.current_thread().getName())
18     print(threading.current_thread()) #主線程
19     print(threading.enumerate()) #連同主線程在內有兩個運行的線程
20     print(threading.active_count())
21     print('主線程/主進程')
22 
23     '''
24     打印結果:
25     MainThread
26     <_MainThread(MainThread, started 140735268892672)>
27     [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
28     2
29     主線程/主進程
30     Thread-1
31     '''
View Code

八、Python GIL(Global Interpreter Lock)全局解釋器鎖

在Cpython解釋器中,同一個進程下開啓的多線程,同一時刻只能有一個線程執行,沒法利用多核優點。只有cpython中必定會用GIL,Python徹底能夠不依賴於GIL。
GIL鎖原理

ps.額外應該知道的知識

對計算來講,cpu越多越好,可是對於I/O來講,再多的cpu也沒用,由於對應的硬盤,在操做系統來看,只有一塊。

如今的計算機基本上都是多核,python對於計算密集型的任務開多線程的效率並不能帶來多大性能上的提高,甚至不如串行(沒有大量切換),可是,對於IO密集型的任務效率仍是有顯著提高的。

九、python 多線程和多進程的應用場景

計算密集型

 1 #計算密集型
 2 from threading import Thread
 3 from multiprocessing import Process
 4 import os
 5 import time
 6 def work():
 7     res=0
 8     for i in range(1000000):
 9         res+=i
10 
11 if __name__ == '__main__':
12     t_l=[]
13     start_time=time.time()
14     # for i in range(300): #串行
15     #     work()
16 
17     for i in range(300):
18         t=Thread(target=work) #在個人機器上,4核cpu,多線程大概15秒
19         # t=Process(target=work) #在個人機器上,4核cpu,多進程大概10秒
20         t_l.append(t)
21         t.start()
22 
23     for i in t_l:
24         i.join()
25 
26     stop_time=time.time()
27     print('run time is %s' %(stop_time-start_time))
28 
29     print('主線程')
View Code

I/O密集型

 1 #I/O密集型
 2 from threading import Thread
 3 from multiprocessing import Process
 4 import time
 5 import os
 6 def work():
 7     time.sleep(2) #模擬I/O操做,能夠打開一個文件來測試I/O,與sleep是一個效果
 8     print(os.getpid())
 9 
10 if __name__ == '__main__':
11     t_l=[]
12     start_time=time.time()
13     for i in range(1000):
14         t=Thread(target=work) #耗時大概爲2秒
15         # t=Process(target=work) #耗時大概爲25秒,建立進程的開銷遠高於線程,並且對於I/O密集型,多cpu根本無論用
16         t_l.append(t)
17         t.start()
18 
19     for t in t_l:
20         t.join()
21     stop_time=time.time()
22     print('run time is %s' %(stop_time-start_time))
View Code

多線程用於IO密集型,如socket,爬蟲,web
多進程用於計算密集型,如金融分析

十、同步鎖

多線程或多進程程序訪問同一份資源操做時,須要加鎖。以下:

 1 import time
 2 import threading
 3 
 4 def addNum():
 5     global num #在每一個線程中都獲取這個全局變量
 6     #num-=1
 7 
 8     temp=num
 9     time.sleep(0.1)
10     num =temp-1  # 對此公共變量進行-1操做
11 
12 num = 100  #設定一個共享變量
13 
14 thread_list = []
15 
16 for i in range(100):
17     t = threading.Thread(target=addNum)
18     t.start()
19     thread_list.append(t)
20 
21 for t in thread_list: #等待全部線程執行完畢
22     t.join()
23 
24 print('Result: ', num)
View Code

鎖一般被用來實現對共享資源的同步訪問。爲每個共享資源建立一個Lock對象,當你須要訪問該資源時,調用acquire方法來獲取鎖對象(若是其它線程已經得到了該鎖,則當前線程需等待其被釋放),待資源訪問完後,再調用release方法釋放鎖:

1 import threading
2 
3 R=threading.Lock()
4 
5 R.acquire()
6 '''
7 對公共數據的操做
8 '''
9 R.release()
View Code

GIL VS Lock
機智的同窗可能會問到這個問題,就是既然你以前說過了,Python已經有一個GIL來保證同一時間只能有一個線程來執行了,爲何這裏還須要lock?
首先咱們須要達成共識:鎖的目的是爲了保護共享的數據,同一時間只能有一個線程來修改共享的數據,而後,咱們能夠得出結論:保護不一樣的數據就應該加不一樣的鎖
最後,問題就很明朗了,GIL 與Lock是兩把鎖,保護的數據不同,前者是解釋器級別的(固然保護的就是解釋器級別的數據,好比垃圾回收的數據),後者是保護用戶本身開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock

詳細的:
由於Python解釋器幫你自動按期進行內存回收,你能夠理解爲python解釋器裏有一個獨立的線程,每過一段時間它起wake up作一次全局輪詢看看哪些內存數據是能夠被清空的,此時你本身的程序 裏的線程和 py解釋器本身的線程是併發運行的,假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程當中的clearing時刻,可能一個其它線程正好又從新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了,爲了解決相似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動,這樣就解決了上述的問題, 這能夠說是Python早期版本的遺留問題。

十一、死鎖與遞歸鎖

進程也有死鎖與遞歸鎖,在進程那裏忘記說了,放到這裏一切說了額
所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程,以下就是死鎖

 1 from threading import Thread,Lock
 2 import time
 3 mutexA=Lock()
 4 mutexB=Lock()
 5 
 6 class MyThread(Thread):
 7     def run(self):
 8         self.func1()
 9         self.func2()
10     def func1(self):
11         mutexA.acquire()
12         print('\033[41m%s 拿到A鎖\033[0m' %self.name)
13 
14         mutexB.acquire()
15         print('\033[42m%s 拿到B鎖\033[0m' %self.name)
16         mutexB.release()
17 
18         mutexA.release()
19 
20     def func2(self):
21         mutexB.acquire()
22         print('\033[43m%s 拿到B鎖\033[0m' %self.name)
23         time.sleep(2)
24 
25         mutexA.acquire()
26         print('\033[44m%s 拿到A鎖\033[0m' %self.name)
27         mutexA.release()
28 
29         mutexB.release()
30 
31 if __name__ == '__main__':
32     for i in range(10):
33         t=MyThread()
34         t.start()
35 
36 '''
37 Thread-1 拿到A鎖
38 Thread-1 拿到B鎖
39 Thread-1 拿到B鎖
40 Thread-2 拿到A鎖
41 而後就卡住,死鎖了
42 '''
View Code

解決方法,遞歸鎖,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。
這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖:

mutexA=mutexB=threading.RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的狀況,則counter繼續加1,這期間全部其餘線程都只能等待,等待該線程釋放全部鎖,即counter遞減到0爲止

十二、信號量Semahpore

Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()。

示例:(同時只有5個線程能夠得到semaphore,便可以限制最大鏈接數爲5):

 1 import threading
 2 import time
 3 
 4 semaphore = threading.Semaphore(5)
 5 
 6 def func():
 7     if semaphore.acquire():
 8         print (threading.currentThread().getName() + ' get semaphore')
 9         time.sleep(2)
10         semaphore.release()
11 
12 for i in range(20):
13   t1 = threading.Thread(target=func)
14   t1.start()
View Code

與進程池是徹底不一樣的概念,進程池Pool(4),最大隻能產生4個進程,並且從頭至尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程

1三、事件Event

線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就 會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行

event.isSet():返回event的狀態值;
event.wait():若是 event.isSet()==False將阻塞線程;
event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;
event.clear():恢復event的狀態值爲False。

 

能夠考慮一種應用場景(僅僅做爲說明),例如,咱們有多個線程從Redis隊列中讀取數據來處理,這些線程都要嘗試去鏈接Redis的服務,通常狀況下,若是Redis鏈接不成功,在各個線程的代碼中,都會去嘗試從新鏈接。若是咱們想要在啓動時確保Redis服務正常,才讓那些工做線程去鏈接Redis服務器,那麼咱們就能夠採用threading.Event機制來協調各個工做線程的鏈接操做:主線程中會去嘗試鏈接Redis服務,若是正常的話,觸發事件,各工做線程會嘗試鏈接Redis服務。

 1 import threading
 2 import time
 3 import logging
 4 
 5 logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)
 6 
 7 def worker(event):
 8     logging.debug('Waiting for redis ready...')
 9     event.wait()
10     logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
11     time.sleep(1)
12 
13 def main():
14     readis_ready = threading.Event()
15     t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1')
16     t1.start()
17 
18     t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2')
19     t2.start()
20 
21     logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event')
22     time.sleep(3) # simulate the check progress
23     readis_ready.set()
24 
25 if __name__=="__main__":
26     main()
redis
 1 from threading import Thread,Event
 2 import threading
 3 import time,random
 4 def conn_mysql():
 5     print('\033[42m%s 等待鏈接mysql。。。\033[0m' %threading.current_thread().getName())
 6     event.wait()
 7     print('\033[42mMysql初始化成功,%s開始鏈接。。。\033[0m' %threading.current_thread().getName())
 8 
 9 
10 def check_mysql():
11     print('\033[41m正在檢查mysql。。。\033[0m')
12     time.sleep(random.randint(1,3))
13     event.set()
14     time.sleep(random.randint(1,3))
15 
16 if __name__ == '__main__':
17     event=Event()
18     t1=Thread(target=conn_mysql) #等待鏈接mysql
19     t2=Thread(target=conn_mysql) #等待鏈接myqsl
20     t3=Thread(target=check_mysql) #檢查mysql
21 
22     t1.start()
23     t2.start()
24     t3.start()
mysql

threading.Event的wait方法還接受一個超時參數,默認狀況下若是事件一致沒有發生,wait方法會一直阻塞下去,而加入這個超時參數以後,若是阻塞時間超過這個參數設定的值以後,wait方法會返回。對應於上面的應用場景,若是Redis服務器一致沒有啓動,咱們但願子線程可以打印一些日誌來不斷地提醒咱們當前沒有一個能夠鏈接的Redis服務,咱們就能夠經過設置這個超時參數來達成這樣的目的:

1 def conn_mysql():
2     count=0
3     while not e.is_set():
4         print('%s 第 <%s> 次嘗試' %(threading.current_thread().getName(),count))
5         count+=1
6         e.wait(0.5)
7     print('%s ready to conn mysql' %threading.current_thread().getName())
8     time.sleep(1)
mysql
 1 from threading import Thread,Event
 2 import threading
 3 import time,random
 4 def conn_mysql():
 5     while not event.is_set():
 6         print('\033[42m%s 等待鏈接mysql。。。\033[0m' %threading.current_thread().getName())
 7         event.wait(0.1)
 8     print('\033[42mMysql初始化成功,%s開始鏈接。。。\033[0m' %threading.current_thread().getName())
 9 
10 
11 def check_mysql():
12     print('\033[41m正在檢查mysql。。。\033[0m')
13     time.sleep(random.randint(1,3))
14     event.set()
15     time.sleep(random.randint(1,3))
16 
17 if __name__ == '__main__':
18     event=Event()
19     t1=Thread(target=conn_mysql)
20     t2=Thread(target=conn_mysql)
21     t3=Thread(target=check_mysql)
22 
23     t1.start()
24     t2.start()
25     t3.start()
View Code

這樣,咱們就能夠在等待Redis服務啓動的同時,看到工做線程里正在等待的狀況。

應用:鏈接池

1四、條件Condition

使得線程等待,只有知足某條件時,才釋放n個線程

 1 import threading
 2  
 3 def run(n):
 4     con.acquire()
 5     con.wait()
 6     print("run the thread: %s" %n)
 7     con.release()
 8  
 9 if __name__ == '__main__':
10  
11     con = threading.Condition()
12     for i in range(10):
13         t = threading.Thread(target=run, args=(i,))
14         t.start()
15  
16     while True:
17         inp = input('>>>')
18         if inp == 'q':
19             break
20         con.acquire()
21         con.notify(int(inp))
22         con.release()
View Code

1五、定時器

定時器,指定n秒後執行某操做

1 from threading import Timer
2  
3  
4 def hello():
5     print("hello, world")
6  
7 t = Timer(1, hello)
8 t.start()  # after 1 seconds, "hello, world" will be printed
View Code

1六、線程隊列queue

queue隊列 :使用import queue,用法與進程Queue同樣

 1 import queue
 2 
 3 q=queue.Queue()
 4 q.put('first')
 5 q.put('second')
 6 q.put('third')
 7 
 8 print(q.get())
 9 print(q.get())
10 print(q.get())
11 '''
12 結果(先進先出):
13 first
14 second
15 third
16 '''
View Code

class queue.LifoQueue(maxsize=0) #last in fisrt out

 1 import queue
 2 
 3 q=queue.LifoQueue()
 4 q.put('first')
 5 q.put('second')
 6 q.put('third')
 7 
 8 print(q.get())
 9 print(q.get())
10 print(q.get())
11 '''
12 結果(後進先出):
13 third
14 second
15 first
16 '''
View Code

class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列

 1 import queue
 2 
 3 q=queue.PriorityQueue()
 4 #put進入一個元組,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高
 5 q.put((20,'a'))
 6 q.put((10,'b'))
 7 q.put((30,'c'))
 8 
 9 print(q.get())
10 print(q.get())
11 print(q.get())
12 '''
13 結果(數字越小優先級越高,優先級高的優先出隊):
14 (10, 'b')
15 (20, 'a')
16 (30, 'c')
17 '''
View Code

生產者、消費者模型

 1 #生產者消費者模型
 2 import queue
 3 import threading
 4 import time
 5 
 6 #建立隊列
 7 q = queue.Queue(50)
 8 
 9 #定義消費者
10 def productor(arg):
11     '''
12     買票
13     :param arg:
14     :return:
15     '''
16     while True:
17         q.put(str(arg) + '號產生訂單')#提交到隊列
18 
19 #建立300個線程發送請求
20 for i in range(300):#300個線程同時提交訂單至關於300我的同時提交訂單
21     t = threading.Thread(target= productor,args= (i,))
22     t.start()
23 
24 #定義生產者
25 def consumer(arg):
26     '''
27     服務器後臺
28     :param arg:
29     :return:
30     '''
31     while True:
32         print(str(arg) + '處理了'+q.get())#進程從隊列中取訂單進行處理
33 
34 #3個線程同時工做
35 for j in range(3):
36     t = threading.Thread(target=consumer,args=(j,))
37     t.start()
View Code

2、協程

 一、協程:

是單線程下的併發,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。

協程定義:

1.必須在只有一個單線程裏實現併發
2.修改共享數據不需加鎖
3.用戶程序裏本身保存多個控制流的上下文棧
4.附加:一個協程遇到IO操做自動切換到其它協程(如何實現檢測IO,yield、greenlet都沒法實現,就用到了gevent模塊(select機制))

yield切換在沒有io的狀況下或者沒有重複開闢內存空間的操做,對效率沒有什麼提高,甚至更慢,爲此,能夠用greenlet來爲你們演示這種切換

二、python協程的優勢:

python的線程屬於內核級別的,即由操做系統控制調度(如單線程一旦遇到io就被迫交出cpu執行權限,切換其餘線程運行)

單線程內開啓協程,一旦遇到io,從應用程序級別(而非操做系統)控制切換

優勢:

協程的切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級

單線程內就能夠實現併發的效果,最大限度地利用cpu

要實現協程,關鍵在於用戶程序本身控制程序切換,切換以前必須由用戶程序本身保存協程上一次調用時的狀態,如此,每次從新調用時,可以從上次的位置繼續執行。

協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧

ps.yiled複習:

 1 import time
 2 def consumer(item):
 3     # print('拿到包子%s' %item)
 4     x=11111111111
 5     x1=12111111111
 6     x3=13111111111
 7     x4=14111111111
 8     y=22222222222
 9     z=33333333333
10 
11     pass
12 def producer(target,seq):
13     for item in seq:
14         target(item) #每次調用函數,會臨時產生名稱空間,調用結束則釋放,循環100000000次,則重複這麼屢次的建立和釋放,開銷很是大
15 
16 start_time=time.time()
17 producer(consumer,range(100000000))
18 stop_time=time.time()
19 print('run time is:%s' %(stop_time-start_time)) #30.132838010787964
20 
21 
22 #使用yield:無需重複開闢內存空間,即重複建立名稱空間,於是開銷小
23 import time
24 def init(func):
25     def wrapper(*args,**kwargs):
26         g=func(*args,**kwargs)
27         next(g)
28         return g
29     return wrapper
30 
31 @init
32 def consumer():
33     x=11111111111
34     x1=12111111111
35     x3=13111111111
36     x4=14111111111
37     y=22222222222
38     z=33333333333
39     while True:
40         item=yield
41         # print('拿到包子%s' %item)
42         pass
43 def producer(target,seq):
44     for item in seq:
45         target.send(item) #無需從新建立名稱空間,從上一次暫停的位置繼續,相比上例,開銷小
46 
47 start_time=time.time()
48 producer(consumer(),range(100000000))
49 stop_time=time.time()
50 print('run time is:%s' %(stop_time-start_time)) #21.882073879241943
View Code

缺點:

協程的本質是單線程下,沒法利用多核,能夠是一個程序開啓多個進程,每一個進程內開啓多個線程,每一個線程內開啓協程

協程指的是單個線程,於是一旦協程出現阻塞,將會阻塞整個線程

三、協程模塊之Greenlet

greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可使你在任意函數之間隨意切換,而不需把這個函數先聲明爲generator

 1 from greenlet import greenlet
 2 
 3 def test1():
 4     print('test1,first')
 5     gr2.switch()
 6     print('test1,sencod')
 7     gr2.switch()
 8 def test2():
 9     print('test2,first')
10     gr1.switch()
11     print('test2,sencod')
12 
13 
14 gr1=greenlet(test1)
15 gr2=greenlet(test2)
16 gr1.switch()
View Code

能夠在第一次switch時傳入參數

 1 #順序執行
 2 import time
 3 def f1():
 4     res=0
 5     for i in range(10000000):
 6         res+=i
 7 
 8 def f2():
 9     res=0
10     for i in range(10000000):
11         res*=i
12 
13 
14 start_time=time.time()
15 f1()
16 f2()
17 stop_time=time.time()
18 print('run time is: %s' %(stop_time-start_time)) #1.7395639419555664
19 
20 
21 #切換
22 from greenlet import greenlet
23 import time
24 def f1():
25     res=0
26     for i in range(10000000):
27         res+=i
28         gr2.switch()
29 
30 
31 def f2():
32     res=0
33     for i in range(10000000):
34         res*=i
35         gr1.switch()
36 
37 gr1=greenlet(f1)
38 gr2=greenlet(f2)
39 
40 start_time=time.time()
41 gr1.switch()
42 stop_time=time.time()
43 print('run time is: %s' %(stop_time-start_time)) #7.789067983627319
View Code

greenlet只是提供了一種比generator更加便捷的切換方式,仍然是沒有解決遇到IO自動切換的問題

四、協程模塊之第三方Gevent

Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。

g1=gevent.spawn()建立一個協程對象g1,

spawn括號內第一個參數是函數名,如eat,後面能夠有多個參數,能夠是位置實參或關鍵字實參,都是傳給函數eat的。

a.遇到IO阻塞時會自動切換任務

 1 import gevent
 2 import time
 3 
 4 
 5 def eat():
 6     print('eat food 1')
 7     gevent.sleep(2) #等飯來
 8     print('eat food 2')
 9 
10 def play_phone():
11     print('play phone 1')
12     gevent.sleep(1) #網卡了
13     print('play phone 2')
14 
15 
16 
17 # gevent.spawn(eat)
18 # gevent.spawn(play_phone)
19 # print('主') # 直接結束
20 
21 
22 #於是也須要join方法,進程或現場的jion方法只能join一個,而gevent的join方法能夠join多個
23 
24 g1=gevent.spawn(eat)
25 g2=gevent.spawn(play_phone)
26 gevent.joinall([g1,g2])
27 print('')
View Code

b.同步與異步

 1 import gevent
 2  
 3 def task(pid):
 4     """
 5     Some non-deterministic task
 6     """
 7     gevent.sleep(0.5)
 8     print('Task %s done' % pid)
 9  
10 def synchronous():
11     for i in range(1,10):
12         task(i)
13  
14 def asynchronous():
15     threads = [gevent.spawn(task, i) for i in range(10)]
16     gevent.joinall(threads)
17  
18 print('Synchronous:')
19 synchronous()
20  
21 print('Asynchronous:')
22 asynchronous()
View Code

上例gevent.sleep(2)模擬的是gevent能夠識別的io阻塞,
而time.sleep(2)或其餘的阻塞,gevent是不能直接識別的須要用下面一行代碼,打補丁,就能夠識別了
from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模塊以前
或者咱們乾脆記憶成:要用gevent,須要將from gevent import monkey;monkey.patch_all()放到文件的開頭

c.gevent線程的一些用法:

g1=gevent.spawn(func,1,,2,3,x=4,y=5)
g2=gevent.spawn(func2)
g1.join() #等待g1結束
g2.join() #等待g2結束
#或者上述兩步合做一步:gevent.joinall([g1,g2])
g1.value#拿到func1的返回值

五、協程應用:

a. 爬蟲

 1 from gevent import monkey;monkey.patch_all()
 2 import gevent
 3 import requests
 4 import time
 5 
 6 def get_page(url):
 7     print('GET: %s' %url)
 8     response=requests.get(url)
 9     if response.status_code == 200:
10         print('%d bytes received from %s' %(len(response.text),url))
11 
12 
13 start_time=time.time()
14 gevent.joinall([
15     gevent.spawn(get_page,'https://www.python.org/'),
16     gevent.spawn(get_page,'https://www.yahoo.com/'),
17     gevent.spawn(get_page,'https://github.com/'),
18 ])
19 stop_time=time.time()
20 print('run time is %s' %(stop_time-start_time))
View Code

b.socket併發

 1 from gevent import monkey;monkey.patch_all()
 2 from socket import *
 3 import gevent
 4 
 5 #若是不想用money.patch_all()打補丁,能夠用gevent自帶的socket
 6 # from gevent import socket
 7 # s=socket.socket()
 8 
 9 def server(server_ip,port):
10     s=socket(AF_INET,SOCK_STREAM)
11     s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
12     s.bind((server_ip,port))
13     s.listen(5)
14     while True:
15         conn,addr=s.accept()
16         gevent.spawn(talk,conn,addr)
17 
18 def talk(conn,addr):
19     try:
20         while True:
21             res=conn.recv(1024)
22             print('client %s:%s msg: %s' %(addr[0],addr[1],res))
23             conn.send(res.upper())
24     except Exception as e:
25         print(e)
26     finally:
27         conn.close()
28 
29 if __name__ == '__main__':
30     server('127.0.0.1',8080)
服務端
 1 from socket import *
 2 
 3 client=socket(AF_INET,SOCK_STREAM)
 4 client.connect(('127.0.0.1',8080))
 5 
 6 
 7 while True:
 8     msg=input('>>: ').strip()
 9     if not msg:continue
10 
11     client.send(msg.encode('utf-8'))
12     msg=client.recv(1024)
13     print(msg.decode('utf-8'))
客戶端

c.socket客戶端併發

 1 from threading import Thread
 2 from socket import *
 3 import threading
 4 
 5 def client(server_ip,port):
 6     c=socket(AF_INET,SOCK_STREAM)
 7     c.connect((server_ip,port))
 8 
 9     count=0
10     while True:
11         c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8'))
12         msg=c.recv(1024)
13         print(msg.decode('utf-8'))
14         count+=1
15 if __name__ == '__main__':
16     for i in range(500):
17         t=Thread(target=client,args=('127.0.0.1',8080))
18         t.start()
View Code
相關文章
相關標籤/搜索