協程 和 io模型

# ==========       協程# 協程介紹# 協程:是單線程下的併發,又稱微線程,纖程。英文名Coroutine。# 一句話說明什麼是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。、#強調的#1. python的線程屬於內核級別的,即由操做系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,#切換其餘線程運行)#2. 單線程內開啓協程,一旦遇到io,就會從應用程序級別(而非操做系統)控制切換,以此來提高效率# (!!!非io操做的切換與效率無關)#對比操做系統控制線程的切換,用戶在單線程內控制協程的切換# 優勢#1. 協程的切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級#2. 單線程內就能夠實現併發的效果,最大限度地利用cpu#缺點#1. 協程的本質是單線程下,沒法利用多核,能夠是一個程序開啓多個進程,每一個進程內開啓多個線程,每一個線程內開啓協程#2. 協程指的是單個線程,於是一旦協程出現阻塞,將會阻塞整個線程#總結協程特色:# 1 必須在只有一個單線程裏實現併發# 2 修改共享數據不需加鎖# 3 用戶程序裏本身保存多個控制流的上下文棧# 4 附加:一個協程遇到IO操做自動切換到其它協程(如何實現檢測IO,yield、greenlet都沒法實現,就用到了gevent模塊(select機制))## 進程 啓動多個進程 進程之間是由操做系統負責調用# 線程 啓動多個線程 真正被CPU執行的最小單位實際是線程    # 開啓一個線程 建立一個線程 寄存器 堆棧    # 關閉一個線程# 協程    # 本質上是一個線程    # 可以在多個任務之間切換來節省一些IO時間    # 協程中任務之間的切換也消耗時間,可是開銷要遠遠小於進程線程之間的切換# 實現併發的手段# import time# def consumer():#     while True:#         x = yield#         time.sleep(1)#         print('處理了數據 :',x)## def producer():#     c = consumer()#     next(c)#     for i in range(10):#         time.sleep(1)#         print('生產了數據 :',i)#         c.send(i)## producer()#                 greenlet  模塊# greenlet只是提供了一種比generator更加便捷的切換方式,# 當切到一個任務執行時若是遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提高效率的問題。# 真正的協程模塊就是使用greenlet完成的切換#g1.switch('egon')#能夠在第一次switch時傳入參數,之後都不須要# from greenlet import greenlet# def eat():#     print('eating start')#     g2.switch()#     print('eating end')#     g2.switch()## def play():#     print('playing start')#     g1.switch()#     print('playing end')# g1 = greenlet(eat)# g2 = greenlet(play)# g1.switch()# from gevent import monkey;monkey.patch_all()# import time# import gevent# import threading# def eat():#     print(threading.current_thread().getName())#     print(threading.current_thread())#     print('eating start')#     time.sleep(1)#     print('eating end')## def play():#     print(threading.current_thread().getName())#     print(threading.current_thread())#     print('playing start')#     time.sleep(1)#     print('playing end')## g1 = gevent.spawn(eat)# g2 = gevent.spawn(play)# g1.join()# g2.join()#                      gevent   模塊# 進程和線程的任務切換右操做系統完成# 協程任務之間的切換由程序(代碼)完成,只有遇到協程模塊能識別的IO操做的時候,程序纔會進行任務切換,實現併發的效果# 上例gevent.sleep(2)模擬的是gevent能夠識別的io阻塞,而time.sleep(2)或其餘的阻塞,\#                                                    gevent是不能直接識別的須要用下面一行代碼,打補丁,就能夠識別了# from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模塊以前# 或者咱們乾脆記憶成:要用gevent,須要將from gevent import monkey;monkey.patch_all()放到文件的開頭# gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet,# 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。# 方法# g1=gevent.spawn(func,1,,2,3,x=4,y=5)建立一個協程對象g1,spawn括號內第一個參數是函數名,如eat,# 後面能夠有多個參數,能夠是位置實參或關鍵字實參,都是傳給函數eat的# g2=gevent.spawn(func2)#g1.join() #等待g1結束#g2.join() #等待g2結束#或者上述兩步合做一步:gevent.joinall([g1,g2])#g1.value#拿到func1的返回值# 同步 和 異步# from gevent import monkey;monkey.patch_all()# import time# import gevent## def task(n):#     time.sleep(1)#     print(n)## def sync():#     for i in range(10):#         task(i)## def async():#     g_lst = []#     for i in range(10):#         g = gevent.spawn(task,i)#         g_lst.append(g)#     gevent.joinall(g_lst)  # for g in g_lst:g.join()# sync()# async()# 協程 : 可以在一個線程中實現併發效果的概念    #    可以規避一些任務中的IO操做    #    在任務的執行過程當中,檢測到IO就切換到其餘任務# 多線程 被弱化了# 協程 在一個線程上 提升CPU 的利用率# 協程相比於多線程的優點 切換的效率更快# 爬蟲的例子# 請求過程當中的IO等待# from gevent import monkey;monkey.patch_all()# import gevent# from urllib.request import urlopen    # 內置的模塊# def get_url(url):#     response = urlopen(url)#     content = response.read().decode('utf-8')#     return len(content)## g1 = gevent.spawn(get_url,'http://www.baidu.com')# g2 = gevent.spawn(get_url,'http://www.sogou.com')# g3 = gevent.spawn(get_url,'http://www.taobao.com')# g4 = gevent.spawn(get_url,'http://www.hao123.com')# g5 = gevent.spawn(get_url,'http://www.cnblogs.com')# gevent.joinall([g1,g2,g3,g4,g5])# print(g1.value)# print(g2.value)# print(g3.value)# print(g4.value)# print(g5.value)# ret = get_url('http://www.baidu.com')# print(ret)# socket server#經過gevent實現單線程下的socket併發#注意 :from gevent import monkey;monkey.patch_all()必定要放到導入socket模塊以前,不然gevent沒法識別socket的阻塞#server 端# from gevent import monkey;monkey.patch_all()# import socket# import gevent# def talk(conn):#     conn.send(b'hello')#     print(conn.recv(1024).decode('utf-8'))#     conn.close()## sk = socket.socket()# sk.bind(('127.0.0.1',8080))# sk.listen()# while True:#     conn,addr = sk.accept()#     gevent.spawn(talk,conn)# sk.close()#client 端# import socket# sk = socket.socket()# sk.connect(('127.0.0.1',8080))# print(sk.recv(1024))# msg = input('>>>').encode('utf-8')# sk.send(msg)# sk.close()#         =====================      IO 模型#  IO模型    #阻塞    非阻塞   io多路複用#阻塞# 幾乎全部的程序員第一次接觸到的網絡編程都是從listen()、send()、recv() 等接口開始的,# 使用這些接口能夠很方便的構建服務器/客戶機的模型。然而大部分的socket接口都是阻塞型的# 實際上,除非特別指定,幾乎全部的IO接口 ( 包括socket接口 ) 都是阻塞型的。這給網絡編程帶來了一個很大的問題,# 如在調用recv(1024)的同時,線程將被阻塞,在此期間,線程將沒法執行任何運算或響應任何的網絡請求。# 非阻塞#在非阻塞式IO中,用戶進程實際上是須要不斷的主動詢問kernel數據準備好了沒有#  因此,在非阻塞式IO中,用戶進程實際上是須要不斷的主動詢問kernel數據準備好了沒有。# 可是非阻塞IO模型毫不被推薦。#     咱們不可否則其優勢:可以在等待任務完成的時間裏幹其餘活了#     (包括提交其餘任務,也就是 「後臺」 能夠有多個任務在「」同時「」執行)。#可是也難掩其缺點:    #1. 循環調用recv()將大幅度推高CPU佔用率;這也是咱們在代碼中留一句time.sleep(2)的緣由,不然在低配主機下極容易出現卡機狀況    #2. 任務完成的響應延遲增大了,由於每過一段時間纔去輪詢一次read操做,    # 而任務可能在兩次輪詢之間的任意時間完成。這會致使總體數據吞吐量的下降。#      非阻塞 代碼     server 端# import socket# sk = socket.socket()# sk.bind(('127.0.0.1',9000))# sk.setblocking(False)# sk.listen()# conn_l = []# del_conn = []# while True:#     try:#         conn,addr = sk.accept()  #不阻塞,可是沒人連我會報錯#         print('創建鏈接了:',addr)#         conn_l.append(conn)#     except BlockingIOError:#         for con in conn_l:#             try:#                 msg = con.recv(1024)  # 非阻塞,若是沒有數據就報錯#                 if msg == b'':#                     del_conn.append(con)#                     continue#                 print(msg)#                 con.send(b'byebye')#             except BlockingIOError:pass#         for con in del_conn:#             con.close()#             conn_l.remove(con)#         del_conn.clear()# while True : 10000   500  501#        非阻塞 client 端# import time# import socket# import threading# def func():#     sk = socket.socket()#     sk.connect(('127.0.0.1',9000))#     sk.send(b'hello')#     time.sleep(1)#     print(sk.recv(1024))#     sk.close()# for i in range(2):#     threading.Thread(target=func).start()#    IO 模型多慮複用# 當用戶進程調用了select,那麼整個進程會被block,而同時,kernel會「監視」全部select負責的socket,# 當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程。# 強調:#  1 。若是處理的鏈接數不是很高的話,使用select / epoll的web# server不必定比使用multi - threading + blockingIO的web# server性能更好,可能延遲還更大。select / epoll的優點並非對於單個鏈接能處理得更快,而是在於能處理更多的鏈接。# 2 。 在多路複用模型中,對於每個socket,通常都設置成爲non - blocking,可是,如上圖所示,# 整個用戶的process實際上是一直被block的。只不過process是被select這個函數block,而不是被socketIO給block。# 結論: select的優點在於能夠處理多個鏈接,不適用於單個鏈接#該模型的優勢:#相比其餘模型,使用select() 的事件驅動模型只用單線程(進程)執行,佔用資源少,# 不消耗太多 CPU,同時可以爲多客戶端提供服務。若是試圖創建一個簡單的事件驅動的服務器程序,這個模型有必定的參考價值。#該模型的缺點:#首先select()接口並非實現「事件驅動」的最好選擇。由於當須要探測的句柄值較大時,select()接口自己須要消耗大量時間去輪詢各個句柄。#不少操做系統提供了更爲高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。#若是須要實現更高效的服務器程序,相似epoll這樣的接口更被推薦。遺憾的是不一樣的操做系統特供的epoll接口有很大差別,#因此使用相似於epoll的接口實現具備較好跨平臺能力的服務器會比較困難。#其次,該模型將事件探測和事件響應夾雜在一塊兒,一旦事件響應的執行體龐大,則對整個模型是災難性的。#         代碼    server  端# import select# import socket## sk = socket.socket()# sk.bind(('127.0.0.1',8000))# sk.setblocking(False)# sk.listen()## read_lst = [sk]# while True:   # [sk,conn]#     r_lst,w_lst,x_lst = select.select(read_lst,[],[])#     for i in r_lst:#         if i is sk:#             conn,addr = i.accept()#             read_lst.append(conn)#         else:#             ret = i.recv(1024)#             if ret == b'':#                 i.close()#                 read_lst.remove(i)#                 continue#             print(ret)#             i.send(b'goodbye!')#               client 端# import time# import socket# import threading# def func():#     sk = socket.socket()#     sk.connect(('127.0.0.1',8000))#     sk.send(b'hello')#     time.sleep(3)#     print(sk.recv(1024))#     sk.close()## for i in range(20):#     threading.Thread(target=func).start()# 異步IO(Asynchronous I/O)#Linux下的asynchronous IO其實用得很少#  用戶進程發起read操做以後,馬上就能夠開始去作其它的事。而另外一方面,從kernel的角度,# 當它受到一個asynchronous read以後,首先它會馬上返回,因此不會對用戶進程產生任何block。而後,# kernel會等待數據準備完成,而後將數據拷貝到用戶內存,當這一切都完成以後,kernel會給用戶進程發送一個signal,# 告訴它read操做完成了#  代碼# import selectors# from socket import *## def accept(sk,mask):#     conn,addr=sk.accept()#     sel.register(conn,selectors.EVENT_READ,read)## def read(conn,mask):#     try:#         data=conn.recv(1024)#         if not data:#             print('closing',conn)#             sel.unregister(conn)#             conn.close()#             return#         conn.send(data.upper()+b'_SB')#     except Exception:#         print('closing', conn)#         sel.unregister(conn)#         conn.close()## sk=socket()# sk.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)# sk.bind(('127.0.0.1',8088))# sk.listen(5)# sk.setblocking(False) #設置socket的接口爲非阻塞# sel=selectors.DefaultSelector()   # 選擇一個適合個人IO多路複用的機制# sel.register(sk,selectors.EVENT_READ,accept)#至關於網select的讀列表裏append了一個sk對象,而且綁定了一個回調函數accept# 說白了就是 若是有人請求鏈接sk,就調用accrpt方法# while True:#     events=sel.select() #檢測全部的sk,conn,是否有完成wait data階段#     for sel_obj,mask in events:  # [conn]#         callback=sel_obj.data #callback=read#         callback(sel_obj.fileobj,mask) #read(conn,1)
相關文章
相關標籤/搜索