協程看上去也是子程序,但執行過程當中,在子程序內部可中斷,而後轉而執行別的子程序,在適當的時候再返回來接着執行。css
注意,在一個子程序中中斷,去執行其餘子程序,不是函數調用,有點相似CPU的中斷。好比子程序A、B:html
def A(): print '1' print '2' print '3' def B(): print 'x' print 'y' print 'z'
假設由協程執行,在執行A的過程當中,能夠隨時中斷,去執行B,B也可能在執行過程當中中斷再去執行A,結果多是:python
1 2 x y 3 z
可是在A中是沒有調用B的,因此協程的調用比函數調用理解起來要難一些。linux
看起來A、B的執行有點像多線程,但協程的特色在因而一個線程執行,那和多線程比,協程有何優點?nginx
最大的優點就是協程極高的執行效率。由於子程序切換不是線程切換,而是由程序自身控制,所以,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優點就越明顯。git
第二大優點就是不須要多線程的鎖機制,由於只有一個線程,也不存在同時寫變量衝突,在協程中控制共享資源不加鎖,只須要判斷狀態就行了,因此執行效率比多線程高不少。github
協程缺點:編程
由於協程是一個線程執行,那怎麼利用多核CPU呢?最簡單的方法是多進程+協程,既充分利用多核,又充分發揮協程的高效率,可得到極高的性能。緩存
Python對協程的支持還很是有限,用在generator中的yield能夠必定程度上實現協程。雖然支持不徹底,但已經能夠發揮至關大的威力了。服務器
=========================================================================================================================
from greenlet import greenlet #greenlet手動切換 gevent(自動切換) 封裝了 greenlet
def test1():
print(12)
gr2.switch()
print(34)
gr2.switch()
def test2():
print(56)
gr1.switch()
print(78)
gr1=greenlet(test1) #起了兩個協程gr1 gr2
gr2=greenlet(test2)
gr1.switch()
執行結果:兩個函數執行過程當中切換
12
56
34
78
Gevent實現協程
import gevent #gevent 經過模擬io操做 def func1(): print('執行func1') gevent.sleep(2) print('繼續執行func1...') def func2(): print('執行func2') gevent.sleep(1) print('繼續執行func2...') def func3(): print("執行func3") gevent.sleep(0.5) print("繼續執行func3") gevent.joinall([ gevent.spawn(func1), gevent.spawn(func2), gevent.spawn(func3), ])
執行結果
執行func1
執行func2
執行func3
繼續執行func3
繼續執行func2...
繼續執行func1...
在遇到真實IO操做時,須要將IO操做和gevent關聯起來,讓gevent知道產生的IO操做,這時候gevent纔會切換。
from urllib import request import gevent,time from gevent import monkey monkey.patch_all()#吧當前程序全部的有可能io操做的單獨坐上標記 須要將IO操做和gevent關聯起來,讓gevent知道產生的IO操做,這時候gevent纔會切換。 def f(url): print('get:%s'%url) resp=request.urlopen(url) data=resp.read() print("%d bytes receve from %s"%(len(data)),url) urls=[ 'https://www.python.org/', 'https://www.yahoo.com', 'https://github.com' ] time_start=time.time() for url in urls: f(url) print("同步cost",time.time()-time_start) async_time=time.time() gevent.joinall( [ gevent.spawn(f,urls[0]), gevent.spawn(f,urls[1]), gevent.spawn(f,urls[2]), ] ) print("異步cost",time.time()-async_time)
執行結果:
同步6秒多
異步2秒多
經過gevent實現單線程下的多socket併發
server side
import sys import socket import time import gevent from gevent import socket,monkey monkey.patch_all() def server(port): s = socket.socket() s.bind(('0.0.0.0', port)) s.listen(500) while True: cli, addr = s.accept() gevent.spawn(handle_request, cli) def handle_request(conn): try: while True: data = conn.recv(1024) print("recv:", data) conn.send(data) if not data: conn.shutdown(socket.SHUT_WR) except Exception as ex: print(ex) finally: conn.close() if __name__ == '__main__': server(8001)
client side
import socket HOST = 'localhost' # The remote host PORT = 8001 # The same port as used by the server s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((HOST, PORT)) while True: msg = bytes(input(">>:"),encoding="utf8") s.sendall(msg) data = s.recv(1024) #print(data) print('Received', repr(data)) s.close()
IO(Input / Output,輸入 / 輸出)即數據的讀取(接收)或寫入(發送)操做,一般用戶進程中的一個完整IO分爲兩階段:用戶進程空間 < -->內核空間、內核空間 < -->設備空間(磁盤、網絡等)。IO有內存IO、網絡IO和磁盤IO三種,一般咱們說的IO指的是後二者。
LINUX中進程沒法直接操做I / O設備,其必須經過系統調用請求kernel來協助完成I / O動做;內核會爲每一個I / O設備維護一個緩衝區。
對於一個輸入操做來講,進程IO系統調用後,內核會先看緩衝區中有沒有相應的緩存數據,沒有的話再到設備中讀取,由於設備IO通常速度較慢,須要等待;內核緩衝區有數據則直接複製到進程空間。
因此,對於一個網絡輸入操做一般包括兩個不一樣階段:
(1)等待網絡數據到達網卡→讀取到內核緩衝區,數據準備好;
(2)從內核緩衝區複製數據到進程空間。
《UNIX網絡編程》說得很清楚,5種IO模型分別是阻塞IO模型、非阻塞IO模型、IO複用模型、信號驅動的IO模型、異步IO模型;前4種爲同步IO操做,只有異步IO模型是異步IO操做。下面這樣些圖,是它裏面給出的例子:接收網絡UDP數據的流程在IO模型下的分析,在它的基礎上再加以簡單描述,以區分這些IO模型。
進程發起IO系統調用後,進程被阻塞,轉到內核空間處理,整個IO處理完畢後返回進程。操做成功則進程獲取到數據。
一、典型應用:阻塞socket。
二、特色:
進程阻塞掛起不消耗CPU資源,及時響應每一個操做;
實現難度低、開發應用較容易;
適用併發量小的網絡應用開發;
不適用併發量大的應用:由於一個請求IO會阻塞進程,因此,得爲每請求分配一個處理進程(線程)以及時響應,系統開銷大。
進程發起IO系統調用後,若是內核緩衝區沒有數據,須要到IO設備中讀取,進程返回一個錯誤而不會被阻塞;進程發起IO系統調用後,若是內核緩衝區有數據,內核就會把數據返回進程。
對於上面的阻塞IO模型來講,內核數據沒準備好須要進程阻塞的時候,就返回一個錯誤,以使得進程不被阻塞。
一、典型應用:socket是非阻塞的方式(設置爲NONBLOCK)
二、特色:
進程輪詢(重複)調用,消耗CPU的資源;
實現難度低、開發應用相對阻塞IO模式較難;
適用併發量較小、且不須要及時響應的網絡應用開發;
多個的進程的IO能夠註冊到一個複用器(select)上,而後用一個進程調用該select, select會監聽全部註冊進來的IO;
若是select沒有監聽的IO在內核緩衝區都沒有可讀數據,select調用進程會被阻塞;而當任一IO在內核緩衝區中有可數據時,select調用就會返回;
然後select調用進程能夠本身或通知另外的進程(註冊進程)來再次發起讀取IO,讀取內核中準備好的數據。
能夠看到,多個進程註冊IO後,只有另外一個select調用進程被阻塞。
一、典型應用:select、poll、epoll三種方案,nginx均可以選擇使用這三個方案;
二、特色:
專注進程解決多個進程IO的阻塞問題,性能好;Reactor模式;
實現、開發應用難度較大;
適用高併發服務應用開發:一個進程(線程)響應多個請求;
三、select、poll、epoll
Linux中IO複用的實現方式主要有select、poll和epoll:
Select:註冊IO、阻塞掃描,監聽的IO最大鏈接數不能多於FD_SIZE;
Poll:原理和Select類似,沒有數量限制,但IO數量大掃描線性性能降低;
Epoll :事件驅動不阻塞,mmap實現內核與用戶空間的消息傳遞,數量很大,Linux2.6後內核支持;
當進程發起一個IO操做,會向內核註冊一個信號處理函數,而後進程返回不阻塞;當內核數據就緒時會發送一個信號給進程,進程便在信號處理函數中調用IO讀取數據。
特色:回調機制,實現、開發應用難度大;
當進程發起一個IO操做,進程返回(不阻塞),但也不能返回果結;內核把整個IO處理完後,會通知進程結果。若是IO操做成功則進程直接獲取到數據。
一、典型應用:JAVA7AIO、高性能服務器應用
二、特色:不阻塞,數據一步到位;Proactor模式;
須要操做系統的底層支持,LINUX2.5版本內核首現,2.6版本產品的內核標準特性;實現、開發應用難度大;很是適合高性能高併發應用;
阻塞IO調用和非阻塞IO調用、阻塞IO模型和非阻塞IO模型
注意這裏的阻塞IO調用和非阻塞IO調用不是指阻塞IO模型和非阻塞IO模型:
阻塞IO調用 :在用戶進程(線程)中調用執行的時候,進程會等待該IO操做,而使得其餘操做沒法執行。
非阻塞IO調用:在用戶進程中調用執行的時候,不管成功與否,該IO操做會當即返回,以後進程能夠進行其餘操做(固然若是是讀取到數據,通常就接着進行數據處理)。
這個直接理解就好,進程(線程)IO調用會不會阻塞進程本身。因此這裏兩個概念是相對調用進程自己狀態來說的。
從上面對比圖片來講,阻塞IO模型是一個阻塞IO調用,而非阻塞IO模型是多個非阻塞IO調用+一個阻塞IO調用,由於多個IO檢查會當即返回錯誤,不會阻塞進程。
而上面也說過了,非阻塞IO模型對於阻塞IO模型來講區別就是,內核數據沒準備好須要進程阻塞的時候,就返回一個錯誤,以使得進程不被阻塞。
同步IO和異步IO
同步IO:致使請求進程阻塞,直到I/O操做完成。
異步IO:不致使請求進程阻塞。
上面兩個定義是《UNIX網絡編程 卷1:套接字聯網API》給出的。這不是很好理解,咱們來擴展一下,先說說同步和異步,同步和異步關注的是雙方的消息通訊機制:
同步:雙方的動做是通過雙方協調的,步調一致的。
異步:雙方並不須要協調,均可以隨意進行各自的操做。
這裏咱們的雙方是指,用戶進程和IO設備;明確同步和異步以後,咱們在上面網絡輸入操做例子的基礎上,進行擴展定義:
同步IO:用戶進程發出IO調用,去獲取IO設備數據,雙方的數據要通過內核緩衝區同步,徹底準備好後,再複製返回到用戶進程。而複製返回到用戶進程會致使請求進程阻塞,直到I/O操做完成。
異步IO:用戶進程發出IO調用,去獲取IO設備數據,並不須要同步,內核直接複製到進程,整個過程不致使請求進程阻塞。
因此, 阻塞IO模型、非阻塞IO模型、IO複用模型、信號驅動的IO模型者爲同步IO模型,只有異步IO模型是異步IO。
Select io多路複用的代碼實現
server 端
import select import socket import queue server=socket.socket() server.bind(('localhost',9001)) server.listen(1000) server.setblocking(False) msg_dict={} inputs=[server,] #給select傳遞監控的io連接server 表明本身 #inputs=[server,conn] 若是創建新連接,就把conn放進去 若是返回的是server則新的連接,若是是conn則是有數據來了 outputs=[] # while True: readable,writeable,exceptional=select.select(inputs,outputs,inputs) #outputs 往裏放甚麼就出來甚麼 print(readable,writeable,exceptional) for r in readable: if r is server: conn,addr=server.accept() print("來了新連接",addr) inputs.append(conn)#是由於這個新創建的鏈接還沒發數據過來,如今就接收的話程序就報錯了, #因此要想實現這個客戶端發數據來時server端能知道,就須要讓select再監測這個conn msg_dict[conn]=queue.Queue() #初始化一個隊列存儲給客戶端返回的消息 else: data=r.recv(1024) #這裏不用conn.recv緣由是,conn指的當前剛創建上的連接,若是是以前創建的連接發過來數據,con只認當前 #新創建的沒法以前創建的連接發過來數據 print("來的是數據",data) msg_dict[r].put(data) outputs.append(r) #放入返回的連接隊列 # r.send(data) # print("send down") for w in writeable:#要返回給客戶端的連接 列表 data_to_client=msg_dict[w].get() w.send(data_to_client) #返回給客戶端數據 outputs.remove(w)#確保下次循環的時候writeable不返回已經處理完的鏈接了 for e in exceptional: if e in outputs: outputs.remove(e) inputs.remove(e) del msg_dict[e]
client端
import socket HOST = 'localhost' # The remote host PORT = 9001 # The same port as used by the server s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((HOST, PORT)) while True: msg = bytes(input(">>:"), encoding="utf8") s.sendall(msg) data = s.recv(1024) # print(data) print('Received', repr(data)) s.close()
執行結果
/usr/bin/python3.5 /home/cui0x01/PycharmProjects/oldboy/進程_線程/select_server.py [<socket.socket fd=3, family=AddressFamily.AF_INET, type=2049, proto=0, laddr=('127.0.0.1', 9002)>] [] [] 來了新連接 ('127.0.0.1', 57076) [<socket.socket fd=4, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9002), raddr=('127.0.0.1', 57076)>] [] [] 來的是數據 b'1' [] [<socket.socket fd=4, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9002), raddr=('127.0.0.1', 57076)>] [] [<socket.socket fd=3, family=AddressFamily.AF_INET, type=2049, proto=0, laddr=('127.0.0.1', 9002)>] [] [] 來了新連接 ('127.0.0.1', 57078) [<socket.socket fd=5, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9002), raddr=('127.0.0.1', 57078)>] [] [] 來的是數據 b'2' [] [<socket.socket fd=5, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9002), raddr=('127.0.0.1', 57078)>] [] [<socket.socket fd=3, family=AddressFamily.AF_INET, type=2049, proto=0, laddr=('127.0.0.1', 9002)>] [] [] 來了新連接 ('127.0.0.1', 57080) [<socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9002), raddr=('127.0.0.1', 57080)>] [] [] 來的是數據 b'3' [] [<socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9002), raddr=('127.0.0.1', 57080)>] []
epoll是在2.6內核中提出的,是以前的select和poll的加強版本。相對於select和poll來講,epoll更加靈活,沒有描述符限制。epoll使用一個文件描述符管理多個描述符,將用戶關係的文件描述符的事件存放到內核的一個事件表中,這樣在用戶空間和內核空間的copy只需一次。
一 epoll操做過程
epoll操做過程須要三個接口,分別以下
int
epoll_create(
int
size);
//建立一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大
int
epoll_ctl(
int
epfd,
int
op,
int
fd,
struct
epoll_event *event);
int
epoll_wait(
int
epfd,
struct
epoll_event * events,
int
maxevents,
int
timeout);
1. int epoll_create(int size);
建立一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大,這個參數不一樣於select()中的第一個參數,給出最大監聽的fd+1的值,參數size並非限制了epoll所能監聽的描述符最大個數,只是對內核初始分配內部數據結構的一個建議
。
當建立好epoll句柄後,它就會佔用一個fd值,在linux下若是查看/proc/進程id/fd/,是可以看到這個fd的,因此在使用完epoll後,必須調用close()關閉,不然可能致使fd被耗盡。
2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
函數是對指定描述符fd執行op操做。
- epfd:是epoll_create()的返回值。
- op:表示op操做,用三個宏來表示:添加EPOLL_CTL_ADD,刪除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分別添加、刪除和修改對fd的監聽事件。
- fd:是須要監聽的fd(文件描述符)
- epoll_event:是告訴內核須要監聽什麼事
3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等待epfd上的io事件,最多返回maxevents個事件。
參數events用來從內核獲得事件的集合,maxevents告以內核這個events有多大,這個maxevents的值不能大於建立epoll_create()時的size,參數timeout是超時時間(毫秒,0會當即返回,-1將不肯定,也有說法說是永久阻塞)。該函數返回須要處理的事件數目,如返回0表示已超時。
複製代碼 #_*_coding:utf-8_*_ __author__ = 'Alex Li' import socket, logging import select, errno logger = logging.getLogger("network-server") def InitLog(): logger.setLevel(logging.DEBUG) fh = logging.FileHandler("network-server.log") fh.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.ERROR) formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") ch.setFormatter(formatter) fh.setFormatter(formatter) logger.addHandler(fh) logger.addHandler(ch) if __name__ == "__main__": InitLog() try: # 建立 TCP socket 做爲監聽 socket listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) except socket.error as msg: logger.error("create socket failed") try: # 設置 SO_REUSEADDR 選項 listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) except socket.error as msg: logger.error("setsocketopt SO_REUSEADDR failed") try: # 進行 bind -- 此處未指定 ip 地址,即 bind 了所有網卡 ip 上 listen_fd.bind(('', 2003)) except socket.error as msg: logger.error("bind failed") try: # 設置 listen 的 backlog 數 listen_fd.listen(10) except socket.error as msg: logger.error(msg) try: # 建立 epoll 句柄 epoll_fd = select.epoll() # 向 epoll 句柄中註冊 監聽 socket 的 可讀 事件 epoll_fd.register(listen_fd.fileno(), select.EPOLLIN) except select.error as msg: logger.error(msg) connections = {} addresses = {} datalist = {} while True: # epoll 進行 fd 掃描的地方 -- 未指定超時時間則爲阻塞等待 epoll_list = epoll_fd.poll() for fd, events in epoll_list: # 若爲監聽 fd 被激活 if fd == listen_fd.fileno(): # 進行 accept -- 得到鏈接上來 client 的 ip 和 port,以及 socket 句柄 conn, addr = listen_fd.accept() logger.debug("accept connection from %s, %d, fd = %d" % (addr[0], addr[1], conn.fileno())) # 將鏈接 socket 設置爲 非阻塞 conn.setblocking(0) # 向 epoll 句柄中註冊 鏈接 socket 的 可讀 事件 epoll_fd.register(conn.fileno(), select.EPOLLIN | select.EPOLLET) # 將 conn 和 addr 信息分別保存起來 connections[conn.fileno()] = conn addresses[conn.fileno()] = addr elif select.EPOLLIN & events: # 有 可讀 事件激活 datas = '' while True: try: # 從激活 fd 上 recv 10 字節數據 data = connections[fd].recv(10) # 若當前沒有接收到數據,而且以前的累計數據也沒有 if not data and not datas: # 從 epoll 句柄中移除該 鏈接 fd epoll_fd.unregister(fd) # server 側主動關閉該 鏈接 fd connections[fd].close() logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1])) break else: # 將接收到的數據拼接保存在 datas 中 datas += data except socket.error as msg: # 在 非阻塞 socket 上進行 recv 須要處理 讀穿 的狀況 # 這裏其實是利用 讀穿 出 異常 的方式跳到這裏進行後續處理 if msg.errno == errno.EAGAIN: logger.debug("%s receive %s" % (fd, datas)) # 將已接收數據保存起來 datalist[fd] = datas # 更新 epoll 句柄中鏈接d 註冊事件爲 可寫 epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT) break else: # 出錯處理 epoll_fd.unregister(fd) connections[fd].close() logger.error(msg) break elif select.EPOLLHUP & events: # 有 HUP 事件激活 epoll_fd.unregister(fd) connections[fd].close() logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1])) elif select.EPOLLOUT & events: # 有 可寫 事件激活 sendLen = 0 # 經過 while 循環確保將 buf 中的數據所有發送出去 while True: # 將以前收到的數據發回 client -- 經過 sendLen 來控制發送位置 sendLen += connections[fd].send(datalist[fd][sendLen:]) # 在所有發送完畢後退出 while 循環 if sendLen == len(datalist[fd]): break # 更新 epoll 句柄中鏈接 fd 註冊事件爲 可讀 epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLET) else: # 其餘 epoll 事件不進行處理 continue 複製代碼
selectors模塊
This module allows high-level and efficient I/O multiplexing, built upon the select
module primitives. Users are encouraged to use this module instead, unless they want precise control over the OS-level primitives used.
import selectors import socket sel = selectors.DefaultSelector() def accept(sock, mask): conn, addr = sock.accept() # Should be ready print('accepted', conn, 'from', addr,mask) conn.setblocking(False) sel.register(conn, selectors.EVENT_READ, read) #新鏈接註冊read回調函數 def read(conn, mask): data = conn.recv(1024) # Should be ready if data: print('echoing', repr(data), 'to', conn) conn.send(data) # Hope it won't block else: print('closing', conn) sel.unregister(conn) conn.close() sock = socket.socket() sock.bind(('localhost', 9999)) sock.listen(100) sock.setblocking(False) sel.register(sock, selectors.EVENT_READ, accept) while True: events = sel.select() #默認阻塞,有活動鏈接就返回活動的鏈接列表 for key, mask in events: callback = key.data #accept callback(key.fileobj, mask) #key.fileobj= 文件句柄
本文轉載http://www.cnblogs.com/alex3714/articles/5248247.html