咱們在生產中,經常使用的處理任務模型有三種:
單線程
多線程
異步(單線程內,串行,特色是遇到阻塞(或IO之類的)就切換到其餘任務)
其中通常若是都符合要求,那麼異步是最好的選擇。
單線程:遇到阻塞整個程序都等待
多線程:以空間換取時間,且有時候伴隨着數據安全問題(一般加鎖來處理)
異步:在單個線程內,且是串行執行,可是一旦遇到阻塞(IO之類的),就會切換到線程內的其餘任務(把IO操做交給操做系統處理)
當咱們面對以下的環境時,事件驅動模型(異步模型)一般是一個好的選擇(and):
一、程序中有許多任務
二、任務之間高度獨立(所以它們不須要互相通訊,或者等待彼此)
三、在等待事件到來時,某些任務會阻塞。
經常使用的異步IO模型:select、poll、Epoll (windows下只支持select)
nginx就是Epoll模型實現的。單線程,多進程(爲了利用多核,單線程只能跑在單個cup核心上)python
前面咱們說了多線程與多進程,總結其特色就是:nginx
單線程串行執行,遇到IO就阻塞,效率低。編程
多線程併發執行,遇到IO就切換(用空間換取執行時間),效率上去了,可是耗費資源,操做複雜。windows
針對以上問題,出現了一種新的替代品,協程。安全
協程又名微線程,纖程。協程是一種用戶態的輕量級線程(協程由用戶切換,線程由cpu時間片控制切換)協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。跟yield過程差很少服務器
協程的定義標準:
一、必須在一個單線程裏面實現併發
二、修改共享數據不須要加鎖(由於協程是串行的)
三、用戶程序裏本身保存多個控制流的上下文棧
四、一個協程遇到IO操做(阻塞也同樣)就自動切換到其餘協程
協程的好處:
一、無需線程上下文切換的開銷
二、無需原子操做鎖定及同步的開銷
"原子操做(atomic operation)是不須要synchronized",所謂原子操做是指不會被線程調度機制打斷的操做;這種操做一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另外一個線程)。原子操做能夠是一個步驟,也能夠是多個操做步驟,可是其順序是不能夠被打亂,或者切割掉只執行部分。視做總體是原子性的核心。多線程
三、方便切換控制流,簡化編程模型
四、高併發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。因此很適合用於高併發處理。
缺點:
一、沒法利用多核資源:協程的本質是個單線程,它不能同時將單個CPU 的多個核用上,協程須要和進程配合才能運行在多CPU上.固然咱們平常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
二、進行阻塞(Blocking)操做(如IO時)會阻塞掉整個程序
併發
一、用yield實現協程操做的例子:app
1 import time 2 3 def consumer(name): 4 print("\033[32;1m ---> starting eating baozi... \033[0m") 5 while True: 6 new_baozi = yield 7 print("[%s] is eating baozi %s " %(name,new_baozi)) 8 # time.sleep(1) #在yield裏面並無實現阻塞切換 9 10 def producer(): 11 c = con1.__next__() 12 c = con2.__next__() 13 n = 0 14 while n < 5: 15 n += 1 16 print("\033[32;1m [producer]\033[0m is making baozi %s " % n) 17 con1.send(n) 18 con2.send(n) #用send能夠想yield發送數據 19 20 if __name__ == '__main__': 21 con1 = consumer('c1') 22 con2 = consumer('c2') 23 p = producer()
執行結果異步
---> starting eating baozi... ---> starting eating baozi... [producer] is making baozi 1 [c1] is eating baozi 1 [c2] is eating baozi 1 [producer] is making baozi 2 [c1] is eating baozi 2 [c2] is eating baozi 2 [producer] is making baozi 3 [c1] is eating baozi 3 [c2] is eating baozi 3 [producer] is making baozi 4 [c1] is eating baozi 4 [c2] is eating baozi 4 [producer] is making baozi 5 [c1] is eating baozi 5 [c2] is eating baozi 5
二、使用gevent實現協程
1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 4 import gevent 5 import time 6 7 def func1(): 8 print("\033[32;1m func1函數的第1部分... \033[0m") 9 gevent.sleep(2) 10 # time.sleep(2) #用time.sleep(2)是不行的 11 print("\033[32;1m func1函數的第2部分... \033[0m") 12 13 def func2(): 14 print("\033[31;1m func2函數的第1部分... \033[0m") 15 gevent.sleep(1) 16 # time.sleep(1) 17 print("\033[31;1m func2函數的第2部分... \033[0m") 18 19 def func3(): 20 print("\033[34;1m func3函數的第1部分... \033[0m") 21 gevent.sleep(3) 22 # time.sleep(3) 23 print("\033[34;1m func3函數的第2部分... \033[0m") 24 25 26 if __name__ == '__main__': 27 gevent.joinall([ 28 gevent.spawn(func1), 29 gevent.spawn(func2), 30 gevent.spawn(func3), 31 ])
執行結果
func1函數的第1部分...
func2函數的第1部分...
func3函數的第1部分...
func2函數的第2部分...
func1函數的第2部分...
func3函數的第2部分...
三、協程結合urllib模塊爬網站
1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 4 from gevent import monkey 5 monkey.patch_all() #遇到阻塞就切換全靠它(做用:把要用到的接口所有變爲非阻塞模式) 6 7 import gevent 8 from urllib.request import urlopen 9 10 def f(url): 11 print("GET: %s " %url) 12 resp = urlopen(url) 13 data = resp.read() 14 print("%s bytes received from %s " %(len(data),url)) 15 16 if __name__ == '__main__': 17 gevent.joinall([ 18 gevent.spawn(f, 'http://www.cdu.edu.cn/'), 19 gevent.spawn(f,'https://www.python.org/'), 20 gevent.spawn(f,'https://www.jd.com/'), 21 gevent.spawn(f,'https://www.vip.com/') 22 ])
執行結果
GET: http://www.cdu.edu.cn/ GET: https://www.python.org/ GET: https://www.jd.com/ GET: https://www.vip.com/ 137584 bytes received from https://www.jd.com/ 69500 bytes received from https://www.vip.com/ 47984 bytes received from http://www.cdu.edu.cn/ 47695 bytes received from https://www.python.org/ Process finished with exit code 0
四、利用協程實現高併發服務器
1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 4 import gevent 5 import socket 6 from gevent import monkey; monkey.patch_all() 7 8 def server(port): 9 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 10 s.bind(('0.0.0.0',port)) 11 s.listen(5000) 12 13 while True: 14 # print("\033[32;1m server is waiting... \033[0m") 15 print(" server is waiting... ") 16 conn, addr = s.accept() 17 gevent.spawn(handle_request,conn) 18 19 def handle_request(conn): 20 try: 21 while True: 22 data = conn.recv(1024) 23 print("recvied:",data.decode('utf8')) 24 conn.send(data) 25 if not data: 26 conn.shutdown(socket.SHUT_WR) 27 28 except Exception as ex: 29 print(ex) 30 31 finally: 32 conn.close() 33 34 35 if __name__ == '__main__': 36 server(9999)
1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 4 import socket 5 import threading 6 7 def run(n): 8 '''這裏是啓動多線程,而後每一個線程死循環發包給服務器,測試協程服務器的高併發和穩定性''' 9 while True: 10 # msg = input(">>:").strip() 11 # if len(msg) == 0:continue 12 # if msg == 'q':break 13 msg = 'hello %s' %n 14 sk.send(bytes(msg,'utf8')) 15 data = sk.recv(1024) 16 print("Received:",data.decode('utf8')) 17 18 sk.close() 19 20 if __name__ == '__main__': 21 IP_Port = ('127.0.0.1', 9999) 22 sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 23 sk.connect(IP_Port) 24 25 thread_list = [] 26 for i in range(2000): 27 t = threading.Thread(target=run,args=[i,]) 28 t.start() 29 thread_list.append(t) 30 31 for thread in thread_list: 32 thread.join()