# ========== 協程# 協程介紹# 協程:是單線程下的併發,又稱微線程,纖程。英文名Coroutine。# 一句話說明什麼是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。、#強調的#1. python的線程屬於內核級別的,即由操做系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,#切換其餘線程運行)#2. 單線程內開啓協程,一旦遇到io,就會從應用程序級別(而非操做系統)控制切換,以此來提高效率# (!!!非io操做的切換與效率無關)#對比操做系統控制線程的切換,用戶在單線程內控制協程的切換# 優勢#1. 協程的切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級#2. 單線程內就能夠實現併發的效果,最大限度地利用cpu#缺點#1. 協程的本質是單線程下,沒法利用多核,能夠是一個程序開啓多個進程,每一個進程內開啓多個線程,每一個線程內開啓協程#2. 協程指的是單個線程,於是一旦協程出現阻塞,將會阻塞整個線程#總結協程特色:# 1 必須在只有一個單線程裏實現併發# 2 修改共享數據不需加鎖# 3 用戶程序裏本身保存多個控制流的上下文棧# 4 附加:一個協程遇到IO操做自動切換到其它協程(如何實現檢測IO,yield、greenlet都沒法實現,就用到了gevent模塊(select機制))## 進程 啓動多個進程 進程之間是由操做系統負責調用# 線程 啓動多個線程 真正被CPU執行的最小單位實際是線程 # 開啓一個線程 建立一個線程 寄存器 堆棧 # 關閉一個線程# 協程 # 本質上是一個線程 # 可以在多個任務之間切換來節省一些IO時間 # 協程中任務之間的切換也消耗時間,可是開銷要遠遠小於進程線程之間的切換# 實現併發的手段# import time# def consumer():# while True:# x = yield# time.sleep(1)# print('處理了數據 :',x)## def producer():# c = consumer()# next(c)# for i in range(10):# time.sleep(1)# print('生產了數據 :',i)# c.send(i)## producer()# greenlet 模塊# greenlet只是提供了一種比generator更加便捷的切換方式,# 當切到一個任務執行時若是遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提高效率的問題。# 真正的協程模塊就是使用greenlet完成的切換#g1.switch('egon')#能夠在第一次switch時傳入參數,之後都不須要# from greenlet import greenlet# def eat():# print('eating start')# g2.switch()# print('eating end')# g2.switch()## def play():# print('playing start')# g1.switch()# print('playing end')# g1 = greenlet(eat)# g2 = greenlet(play)# g1.switch()# from gevent import monkey;monkey.patch_all()# import time# import gevent# import threading# def eat():# print(threading.current_thread().getName())# print(threading.current_thread())# print('eating start')# time.sleep(1)# print('eating end')## def play():# print(threading.current_thread().getName())# print(threading.current_thread())# print('playing start')# time.sleep(1)# print('playing end')## g1 = gevent.spawn(eat)# g2 = gevent.spawn(play)# g1.join()# g2.join()# gevent 模塊# 進程和線程的任務切換右操做系統完成# 協程任務之間的切換由程序(代碼)完成,只有遇到協程模塊能識別的IO操做的時候,程序纔會進行任務切換,實現併發的效果# 上例gevent.sleep(2)模擬的是gevent能夠識別的io阻塞,而time.sleep(2)或其餘的阻塞,\# gevent是不能直接識別的須要用下面一行代碼,打補丁,就能夠識別了# from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模塊以前# 或者咱們乾脆記憶成:要用gevent,須要將from gevent import monkey;monkey.patch_all()放到文件的開頭# gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet,# 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。# 方法# g1=gevent.spawn(func,1,,2,3,x=4,y=5)建立一個協程對象g1,spawn括號內第一個參數是函數名,如eat,# 後面能夠有多個參數,能夠是位置實參或關鍵字實參,都是傳給函數eat的# g2=gevent.spawn(func2)#g1.join() #等待g1結束#g2.join() #等待g2結束#或者上述兩步合做一步:gevent.joinall([g1,g2])#g1.value#拿到func1的返回值# 同步 和 異步# from gevent import monkey;monkey.patch_all()# import time# import gevent## def task(n):# time.sleep(1)# print(n)## def sync():# for i in range(10):# task(i)## def async():# g_lst = []# for i in range(10):# g = gevent.spawn(task,i)# g_lst.append(g)# gevent.joinall(g_lst) # for g in g_lst:g.join()# sync()# async()# 協程 : 可以在一個線程中實現併發效果的概念 # 可以規避一些任務中的IO操做 # 在任務的執行過程當中,檢測到IO就切換到其餘任務# 多線程 被弱化了# 協程 在一個線程上 提升CPU 的利用率# 協程相比於多線程的優點 切換的效率更快# 爬蟲的例子# 請求過程當中的IO等待# from gevent import monkey;monkey.patch_all()# import gevent# from urllib.request import urlopen # 內置的模塊# def get_url(url):# response = urlopen(url)# content = response.read().decode('utf-8')# return len(content)## g1 = gevent.spawn(get_url,'http://www.baidu.com')# g2 = gevent.spawn(get_url,'http://www.sogou.com')# g3 = gevent.spawn(get_url,'http://www.taobao.com')# g4 = gevent.spawn(get_url,'http://www.hao123.com')# g5 = gevent.spawn(get_url,'http://www.cnblogs.com')# gevent.joinall([g1,g2,g3,g4,g5])# print(g1.value)# print(g2.value)# print(g3.value)# print(g4.value)# print(g5.value)# ret = get_url('http://www.baidu.com')# print(ret)# socket server#經過gevent實現單線程下的socket併發#注意 :from gevent import monkey;monkey.patch_all()必定要放到導入socket模塊以前,不然gevent沒法識別socket的阻塞#server 端# from gevent import monkey;monkey.patch_all()# import socket# import gevent# def talk(conn):# conn.send(b'hello')# print(conn.recv(1024).decode('utf-8'))# conn.close()## sk = socket.socket()# sk.bind(('127.0.0.1',8080))# sk.listen()# while True:# conn,addr = sk.accept()# gevent.spawn(talk,conn)# sk.close()#client 端# import socket# sk = socket.socket()# sk.connect(('127.0.0.1',8080))# print(sk.recv(1024))# msg = input('>>>').encode('utf-8')# sk.send(msg)# sk.close()# ===================== IO 模型# IO模型 #阻塞 非阻塞 io多路複用#阻塞# 幾乎全部的程序員第一次接觸到的網絡編程都是從listen()、send()、recv() 等接口開始的,# 使用這些接口能夠很方便的構建服務器/客戶機的模型。然而大部分的socket接口都是阻塞型的# 實際上,除非特別指定,幾乎全部的IO接口 ( 包括socket接口 ) 都是阻塞型的。這給網絡編程帶來了一個很大的問題,# 如在調用recv(1024)的同時,線程將被阻塞,在此期間,線程將沒法執行任何運算或響應任何的網絡請求。# 非阻塞#在非阻塞式IO中,用戶進程實際上是須要不斷的主動詢問kernel數據準備好了沒有# 因此,在非阻塞式IO中,用戶進程實際上是須要不斷的主動詢問kernel數據準備好了沒有。# 可是非阻塞IO模型毫不被推薦。# 咱們不可否則其優勢:可以在等待任務完成的時間裏幹其餘活了# (包括提交其餘任務,也就是 「後臺」 能夠有多個任務在「」同時「」執行)。#可是也難掩其缺點: #1. 循環調用recv()將大幅度推高CPU佔用率;這也是咱們在代碼中留一句time.sleep(2)的緣由,不然在低配主機下極容易出現卡機狀況 #2. 任務完成的響應延遲增大了,由於每過一段時間纔去輪詢一次read操做, # 而任務可能在兩次輪詢之間的任意時間完成。這會致使總體數據吞吐量的下降。# 非阻塞 代碼 server 端# import socket# sk = socket.socket()# sk.bind(('127.0.0.1',9000))# sk.setblocking(False)# sk.listen()# conn_l = []# del_conn = []# while True:# try:# conn,addr = sk.accept() #不阻塞,可是沒人連我會報錯# print('創建鏈接了:',addr)# conn_l.append(conn)# except BlockingIOError:# for con in conn_l:# try:# msg = con.recv(1024) # 非阻塞,若是沒有數據就報錯# if msg == b'':# del_conn.append(con)# continue# print(msg)# con.send(b'byebye')# except BlockingIOError:pass# for con in del_conn:# con.close()# conn_l.remove(con)# del_conn.clear()# while True : 10000 500 501# 非阻塞 client 端# import time# import socket# import threading# def func():# sk = socket.socket()# sk.connect(('127.0.0.1',9000))# sk.send(b'hello')# time.sleep(1)# print(sk.recv(1024))# sk.close()# for i in range(2):# threading.Thread(target=func).start()# IO 模型多慮複用# 當用戶進程調用了select,那麼整個進程會被block,而同時,kernel會「監視」全部select負責的socket,# 當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程。# 強調:# 1 。若是處理的鏈接數不是很高的話,使用select / epoll的web# server不必定比使用multi - threading + blockingIO的web# server性能更好,可能延遲還更大。select / epoll的優點並非對於單個鏈接能處理得更快,而是在於能處理更多的鏈接。# 2 。 在多路複用模型中,對於每個socket,通常都設置成爲non - blocking,可是,如上圖所示,# 整個用戶的process實際上是一直被block的。只不過process是被select這個函數block,而不是被socketIO給block。# 結論: select的優點在於能夠處理多個鏈接,不適用於單個鏈接#該模型的優勢:#相比其餘模型,使用select() 的事件驅動模型只用單線程(進程)執行,佔用資源少,# 不消耗太多 CPU,同時可以爲多客戶端提供服務。若是試圖創建一個簡單的事件驅動的服務器程序,這個模型有必定的參考價值。#該模型的缺點:#首先select()接口並非實現「事件驅動」的最好選擇。由於當須要探測的句柄值較大時,select()接口自己須要消耗大量時間去輪詢各個句柄。#不少操做系統提供了更爲高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。#若是須要實現更高效的服務器程序,相似epoll這樣的接口更被推薦。遺憾的是不一樣的操做系統特供的epoll接口有很大差別,#因此使用相似於epoll的接口實現具備較好跨平臺能力的服務器會比較困難。#其次,該模型將事件探測和事件響應夾雜在一塊兒,一旦事件響應的執行體龐大,則對整個模型是災難性的。# 代碼 server 端# import select# import socket## sk = socket.socket()# sk.bind(('127.0.0.1',8000))# sk.setblocking(False)# sk.listen()## read_lst = [sk]# while True: # [sk,conn]# r_lst,w_lst,x_lst = select.select(read_lst,[],[])# for i in r_lst:# if i is sk:# conn,addr = i.accept()# read_lst.append(conn)# else:# ret = i.recv(1024)# if ret == b'':# i.close()# read_lst.remove(i)# continue# print(ret)# i.send(b'goodbye!')# client 端# import time# import socket# import threading# def func():# sk = socket.socket()# sk.connect(('127.0.0.1',8000))# sk.send(b'hello')# time.sleep(3)# print(sk.recv(1024))# sk.close()## for i in range(20):# threading.Thread(target=func).start()# 異步IO(Asynchronous I/O)#Linux下的asynchronous IO其實用得很少# 用戶進程發起read操做以後,馬上就能夠開始去作其它的事。而另外一方面,從kernel的角度,# 當它受到一個asynchronous read以後,首先它會馬上返回,因此不會對用戶進程產生任何block。而後,# kernel會等待數據準備完成,而後將數據拷貝到用戶內存,當這一切都完成以後,kernel會給用戶進程發送一個signal,# 告訴它read操做完成了# 代碼# import selectors# from socket import *## def accept(sk,mask):# conn,addr=sk.accept()# sel.register(conn,selectors.EVENT_READ,read)## def read(conn,mask):# try:# data=conn.recv(1024)# if not data:# print('closing',conn)# sel.unregister(conn)# conn.close()# return# conn.send(data.upper()+b'_SB')# except Exception:# print('closing', conn)# sel.unregister(conn)# conn.close()## sk=socket()# sk.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)# sk.bind(('127.0.0.1',8088))# sk.listen(5)# sk.setblocking(False) #設置socket的接口爲非阻塞# sel=selectors.DefaultSelector() # 選擇一個適合個人IO多路複用的機制# sel.register(sk,selectors.EVENT_READ,accept)#至關於網select的讀列表裏append了一個sk對象,而且綁定了一個回調函數accept# 說白了就是 若是有人請求鏈接sk,就調用accrpt方法# while True:# events=sel.select() #檢測全部的sk,conn,是否有完成wait data階段# for sel_obj,mask in events: # [conn]# callback=sel_obj.data #callback=read# callback(sel_obj.fileobj,mask) #read(conn,1)