python -- 異步編程 - 協程

咱們在生產中,經常使用的處理任務模型有三種:
  單線程
  多線程
  異步(單線程內,串行,特色是遇到阻塞(或IO之類的)就切換到其餘任務)

其中通常若是都符合要求,那麼異步是最好的選擇。
  單線程:遇到阻塞整個程序都等待
  多線程:以空間換取時間,且有時候伴隨着數據安全問題(一般加鎖來處理)
  異步:在單個線程內,且是串行執行,可是一旦遇到阻塞(IO之類的),就會切換到線程內的其餘任務(把IO操做交給操做系統處理)

當咱們面對以下的環境時,事件驅動模型(異步模型)一般是一個好的選擇(and):
  一、程序中有許多任務
  二、任務之間高度獨立(所以它們不須要互相通訊,或者等待彼此)
  三、在等待事件到來時,某些任務會阻塞。

經常使用的異步IO模型:select、poll、Epoll    (windows下只支持select)
nginx就是Epoll模型實現的。單線程,多進程(爲了利用多核,單線程只能跑在單個cup核心上)python

前面咱們說了多線程與多進程,總結其特色就是:nginx

  單線程串行執行,遇到IO就阻塞,效率低。編程

  多線程併發執行,遇到IO就切換(用空間換取執行時間),效率上去了,可是耗費資源,操做複雜。windows

針對以上問題,出現了一種新的替代品,協程。安全

協程  

  協程又名微線程,纖程。協程是一種用戶態的輕量級線程(協程由用戶切換,線程由cpu時間片控制切換)協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。跟yield過程差很少服務器


協程的定義標準:
  一、必須在一個單線程裏面實現併發
  二、修改共享數據不須要加鎖(由於協程是串行的)
  三、用戶程序裏本身保存多個控制流的上下文棧
  四、一個協程遇到IO操做(阻塞也同樣)就自動切換到其餘協程

協程的好處:
  一、無需線程上下文切換的開銷
  二、無需原子操做鎖定及同步的開銷
       "原子操做(atomic operation)是不須要synchronized",所謂原子操做是指不會被線程調度機制打斷的操做;這種操做一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另外一個線程)。原子操做能夠是一個步驟,也能夠是多個操做步驟,可是其順序是不能夠被打亂,或者切割掉只執行部分。視做總體是原子性的核心。多線程

  三、方便切換控制流,簡化編程模型
  四、高併發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。因此很適合用於高併發處理。

缺點:
  一、沒法利用多核資源:協程的本質是個單線程,它不能同時將單個CPU 的多個核用上,協程須要和進程配合才能運行在多CPU上.固然咱們平常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
  二、進行阻塞(Blocking)操做(如IO時)會阻塞掉整個程序
併發

 

協程例子

  一、用yield實現協程操做的例子:app

 1 import time
 2 
 3 def consumer(name):
 4     print("\033[32;1m ---> starting eating baozi... \033[0m")
 5     while True:
 6         new_baozi = yield
 7         print("[%s] is eating baozi %s " %(name,new_baozi))
 8         # time.sleep(1)  #在yield裏面並無實現阻塞切換
 9 
10 def producer():
11     c = con1.__next__()
12     c = con2.__next__()
13     n = 0
14     while n < 5:
15         n += 1
16         print("\033[32;1m [producer]\033[0m is making baozi %s " % n)
17         con1.send(n)
18         con2.send(n)     #用send能夠想yield發送數據
19 
20 if __name__ == '__main__':
21     con1 = consumer('c1')
22     con2 = consumer('c2')
23     p = producer()

  執行結果異步

 ---> starting eating baozi... 
 ---> starting eating baozi... 
 [producer] is making baozi 1 
[c1] is eating baozi 1 
[c2] is eating baozi 1 
 [producer] is making baozi 2 
[c1] is eating baozi 2 
[c2] is eating baozi 2 
 [producer] is making baozi 3 
[c1] is eating baozi 3 
[c2] is eating baozi 3 
 [producer] is making baozi 4 
[c1] is eating baozi 4 
[c2] is eating baozi 4 
 [producer] is making baozi 5 
[c1] is eating baozi 5 
[c2] is eating baozi 5 

 

二、使用gevent實現協程

 1 #!/usr/bin/env python
 2 # -*- coding: utf-8 -*-
 3 
 4 import gevent
 5 import time
 6 
 7 def func1():
 8     print("\033[32;1m func1函數的第1部分... \033[0m")
 9     gevent.sleep(2)
10     # time.sleep(2)   #用time.sleep(2)是不行的
11     print("\033[32;1m func1函數的第2部分... \033[0m")
12 
13 def func2():
14     print("\033[31;1m func2函數的第1部分... \033[0m")
15     gevent.sleep(1)
16     # time.sleep(1)
17     print("\033[31;1m func2函數的第2部分... \033[0m")
18 
19 def func3():
20     print("\033[34;1m func3函數的第1部分... \033[0m")
21     gevent.sleep(3)
22     # time.sleep(3)
23     print("\033[34;1m func3函數的第2部分... \033[0m")
24 
25 
26 if __name__ == '__main__':
27     gevent.joinall([
28         gevent.spawn(func1),
29         gevent.spawn(func2),
30         gevent.spawn(func3),
31     ])

  執行結果

 func1函數的第1部分... 
 func2函數的第1部分... 
 func3函數的第1部分... 
 func2函數的第2部分... 
 func1函數的第2部分... 
 func3函數的第2部分... 

 

三、協程結合urllib模塊爬網站

 1 #!/usr/bin/env python
 2 # -*- coding: utf-8 -*-
 3 
 4 from gevent import monkey
 5 monkey.patch_all()  #遇到阻塞就切換全靠它(做用:把要用到的接口所有變爲非阻塞模式)
 6 
 7 import gevent
 8 from urllib.request import urlopen
 9 
10 def f(url):
11     print("GET: %s " %url)
12     resp = urlopen(url)
13     data = resp.read()
14     print("%s bytes received from %s " %(len(data),url))
15 
16 if __name__ == '__main__':
17     gevent.joinall([
18         gevent.spawn(f, 'http://www.cdu.edu.cn/'),
19         gevent.spawn(f,'https://www.python.org/'),
20         gevent.spawn(f,'https://www.jd.com/'),
21         gevent.spawn(f,'https://www.vip.com/')
22     ])

  執行結果

GET: http://www.cdu.edu.cn/ 
GET: https://www.python.org/ 
GET: https://www.jd.com/ 
GET: https://www.vip.com/ 
137584 bytes received from https://www.jd.com/ 
69500 bytes received from https://www.vip.com/ 
47984 bytes received from http://www.cdu.edu.cn/ 
47695 bytes received from https://www.python.org/ 

Process finished with exit code 0

 

四、利用協程實現高併發服務器

 1 #!/usr/bin/env python
 2 # -*- coding: utf-8 -*-
 3 
 4 import gevent
 5 import socket
 6 from gevent import monkey; monkey.patch_all()
 7 
 8 def server(port):
 9     s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
10     s.bind(('0.0.0.0',port))
11     s.listen(5000)
12 
13     while True:
14         # print("\033[32;1m server is waiting... \033[0m")
15         print(" server is waiting... ")
16         conn, addr = s.accept()
17         gevent.spawn(handle_request,conn)
18 
19 def handle_request(conn):
20     try:
21         while True:
22             data = conn.recv(1024)
23             print("recvied:",data.decode('utf8'))
24             conn.send(data)
25             if not data:
26                 conn.shutdown(socket.SHUT_WR)
27 
28     except Exception as ex:
29         print(ex)
30 
31     finally:
32         conn.close()
33 
34 
35 if __name__ == '__main__':
36     server(9999)
server
 1 #!/usr/bin/env python
 2 # -*- coding: utf-8 -*-
 3 
 4 import socket
 5 import threading
 6 
 7 def run(n):
 8     '''這裏是啓動多線程,而後每一個線程死循環發包給服務器,測試協程服務器的高併發和穩定性'''
 9     while True:
10         # msg = input(">>:").strip()
11         # if len(msg) == 0:continue
12         # if msg == 'q':break
13         msg = 'hello %s' %n
14         sk.send(bytes(msg,'utf8'))
15         data = sk.recv(1024)
16         print("Received:",data.decode('utf8'))
17 
18     sk.close()
19 
20 if __name__ == '__main__':
21     IP_Port = ('127.0.0.1', 9999)
22     sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
23     sk.connect(IP_Port)
24 
25     thread_list = []
26     for i in range(2000):
27         t = threading.Thread(target=run,args=[i,])
28         t.start()
29         thread_list.append(t)
30 
31     for thread in thread_list:
32         thread.join()
client
相關文章
相關標籤/搜索