協程,又稱微線程,纖程。英文名Coroutine。html
線程是系統級別的它們由操做系統調度,而協程則是程序級別的由程序根據須要本身調度。在一個線程中會有不少函數,咱們把這些函數稱爲子程序,在子程序執行過程當中能夠中斷去執行別的子程序,而別的子程序也能夠中斷回來繼續執行以前的子程序,這個過程就稱爲協程。也就是說在同一線程內一段代碼在執行過程當中會中斷而後跳轉執行別的代碼,接着在以前中斷的地方繼續開始執行,相似與yield操做。python
協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。linux
協程的優勢:nginx
(1)無需線程上下文切換的開銷,協程避免了無心義的調度,由此能夠提升性能(但也所以,程序員必須本身承擔調度的責任,同時,協程也失去了標準線程使用多CPU的能力)程序員
(2)無需原子操做鎖定及同步的開銷web
(3)方便切換控制流,簡化編程模型編程
(4)高併發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。因此很適合用於高併發處理。windows
協程的缺點:數組
(1)沒法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程須要和進程配合才能運行在多CPU上.固然咱們平常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。緩存
(2)進行阻塞(Blocking)操做(如IO時)會阻塞掉整個程序
(1)yield實現協程效果
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/24 10:46 # @Author : Py.qi # @File : yield_xiecheng.py # @Software: PyCharm def consumer(name): print('開始吃包子...') while True: print('\033[31;1m[consumer]%s須要包子\033[0m'%name) bone = yield #接收send發送的數據 print('\033[31;1m[%s]吃了%s個包子\033[0m'%(name,bone)) def producer(obj1): obj1.send(None) #必須先發送None for i in range(3): print('\033[32;1m[producer]\033[0m正在作%s個包子'%i) obj1.send(i) if __name__ == '__main__': con1 = consumer('消費者A') #建立消費者對象 producer(con1) #output: 開始吃包子... [consumer]消費者A須要包子 [producer]正在作0個包子 [消費者A]吃了0個包子 [consumer]消費者A須要包子 [producer]正在作1個包子 [消費者A]吃了1個包子 [consumer]消費者A須要包子 [producer]正在作2個包子 [消費者A]吃了2個包子 [consumer]消費者A須要包子
(2)greenlet模塊實現程序間切換執行
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/24 15:25 # @Author : Py.qi # @File : greenlet_now.py # @Software: PyCharm import greenlet def A(): print('a.....') g2.switch() #切換至B print('a....2') g2.switch() def B(): print('b.....') g1.switch() #切換至A print('b....2') g1 = greenlet.greenlet(A) #啓動一個線程 g2 = greenlet.greenlet(B) g1.switch()
(3)gevent實現協程
Gevent 是一個第三方庫,能夠輕鬆經過gevent實現協程程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。
gevent會主動識別程序內部的IO操做,當子程序遇到IO後,切換到別的子程序。若是全部的子程序都進入IO,則阻塞。
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/24 15:59 # @Author : Py.qi # @File : gevent_noe.py # @Software: PyCharm import gevent def foo(): print('running in foo') gevent.sleep(2) print('com back from bar in to foo') def bar(): print('running in bar') gevent.sleep(2) print('com back from foo in to bar') gevent.joinall([ #建立線程並行執行程序,碰到IO就切換 gevent.spawn(foo), gevent.spawn(bar), ])
線程函數同步與異步比較:
import gevent def task(pid): gevent.sleep(1) print('task %s done'%pid) def synchronous(): #同步一個線程執行函數 for i in range(1,10): task(i) def asynchronous(): #異步一個線程執行函數 threads = [gevent.spawn(task,i) for i in range(10)] gevent.joinall(threads) print('synchronous:') synchronous() #同步執行時要等待執行完後再執行 print('asynchronous:') asynchronous() #異步時遇到等待則會切換執行
爬蟲異步IO阻塞切換:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/24 17:00 # @Author : Py.qi # @File : gevent_urllib.py # @Software: PyCharm from urllib import request import gevent,time from gevent import monkey monkey.patch_all() #將程序中全部IO操做作上標記使程序非阻塞狀態 def url_request(url): print('get:%s'%url) resp = request.urlopen(url) data = resp.read() print('%s bytes received from %s'%(len(data),url)) async_time_start = time.time() #開始時間 gevent.joinall([ gevent.spawn(url_request,'https://www.python.org/'), gevent.spawn(url_request,'https://www.nginx.org/'), gevent.spawn(url_request,'https://www.ibm.com'), ]) print('haoshi:',time.time()-async_time_start) #總用時
協程實現多併發連接socket通訊:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/24 17:22 # @Author : Py.qi # @File : gevent_sock.py # @Software: PyCharm import socket,gevent from gevent import monkey monkey.patch_all() def server_sock(port): s = socket.socket() s.bind(('',port)) s.listen(10) while True: conn,addr = s.accept() gevent.spawn(handle_request,conn) def handle_request(conn): try: while True: data = conn.recv(1024) if not data: conn.shutdown(socket.SHUT_WR) print('recv:',data.decode()) conn.send(data) except Exception as ex: print(ex) finally: conn.close() if __name__ == '__main__': server_sock(8888) #!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/24 17:35 # @Author : Py.qi # @File : gevent_sockclient.py # @Software: PyCharm import socket HOST = 'localhost' # The remote host PORT = 8888 # The same port as used by the server s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((HOST, PORT)) while True: #msg = bytes(input(">>:"), encoding="utf8") for i in range(50): s.send('dddd'.encode()) data = s.recv(1024) # print(data) print('Received', repr(data)) s.close()
事件驅動編程是一種編程範式,這裏程序的執行流由外部事件來決定。它的特色是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理,另外兩種經常使用的編程範式是(單線程)同步以及多線程編程。
服務器處理模型的程序時,有如下幾種模型:
在單線程同步模型中,任務按照順序執行。若是某個任務由於I/O而阻塞,其餘全部的任務都必須等待,直到它完成以後它們才能依次執行。這種明確的執行順序和串行化處理的行爲是很容易推斷得出的。若是任務之間並無互相依賴的關係,但仍然須要互相等待的話這就使得程序沒必要要的下降了運行速度。
在多線程版本中,這3個任務分別在獨立的線程中執行。這些線程由操做系統來管理,在多處理器系統上能夠並行處理,或者在單處理器系統上交錯執行。這使得當某個線程阻塞在某個資源的同時其餘線程得以繼續執行。與完成相似功能的同步程序相比,這種方式更有效率,但程序員必須寫代碼來保護共享資源,防止其被多個線程同時訪問。多線程程序更加難以推斷,由於這類程序不得不經過線程同步機制如鎖、可重入函數、線程局部存儲或者其餘機制來處理線程安全問題,若是實現不當就會致使出現微妙且使人痛不欲生的bug。
在事件驅動版本的程序中,3個任務交錯執行,但仍然在一個單獨的線程控制中。當處理I/O或者其餘昂貴的操做時,註冊一個回調到事件循環中,而後當I/O操做完成時繼續執行。回調描述了該如何處理某個事件。事件循環輪詢全部的事件,當事件到來時將它們分配給等待處理事件的回調函數。這種方式讓程序儘量的得以執行而不須要用到額外的線程。事件驅動型程序比多線程程序更容易推斷出行爲,由於程序員不須要關心線程安全問題。
當程序中有許多任務,且任務之間高度獨立(它們不須要互相通訊,或等待彼此)並且在等待事件到來時,某些任務會阻塞時事件驅動模型時個很好的選擇;當應用程序須要在任務間共享可變的數據時,事件驅動模式能夠更好的在單線程下處理。
網絡應用程序一般都是上述特色,這使得它們可以很好的契合事件驅動編程模型。
此處要提出一個問題,就是,上面的事件驅動模型中,只要一遇到IO就註冊一個事件,而後主程序就能夠繼續幹其它的事情了,只到io處理完畢後,繼續恢復以前中斷的任務,這本質上是怎麼實現的呢?這就涉及到select\poll\epoll異步IO
同步IO和異步IO,阻塞IO和非阻塞IO分別是什麼,到底有什麼區別?
不一樣的人在不一樣的上下文下給出的答案是不一樣的。因此先限定一下本文的上下文。
本文討論的背景是Linux環境下的network IO。
在進行解釋以前,首先要說明幾個概念:
進程切換
進程的阻塞
文件描述符
緩存 I/O
進程切換
爲了控制進程的執行,內核必須有能力掛起正在CPU上運行的進程,並恢復之前掛起的某個進程的執行。這種行爲被稱爲進程切換。
所以能夠說,任何進程都是在操做系統內核的支持下運行的,是與內核緊密相關的。
從一個進程的運行轉到另外一個進程上運行,這個過程當中通過下面這些變化:
1. 保存處理器上下文,包括程序計數器和其餘寄存器
2. 更新PCB信息
3. 把進程的PCB移入相應的隊列,如就緒、在某事件阻塞等隊列
4. 選擇另外一個進程執行,並更新其PCB
5. 更新內存管理的數據結構
6. 恢復處理器上下文
進程控制塊PCB(Processing Control Block),是操做系統核心中一種數據結構,主要表示進程狀態。
PCB的做用是使一個在多道程序環境下不能獨立運行的程序(含數據),成爲一個能獨立運行的基本單位或與其它進程併發執行的進程。或者說,OS是根據PCB來對併發執行的進程進行控制和管理的。 PCB一般是系統內存佔用區中的一個連續存區,它存放着操做系統用於描述進程狀況及控制進程運行所需的所有信息
進程的阻塞
正在執行的進程,因爲期待的某些事件未發生,如請求系統資源失敗、等待某種操做的完成、新數據還沒有到達或無新工做作等,則由系統自動執行阻塞原語(Block),使本身由運行狀態變爲阻塞狀態。可見,進程的阻塞是進程自身的一種主動行爲,也所以只有處於運行態的進程(得到CPU),纔可能將其轉爲阻塞狀態。當進程進入阻塞狀態,是不佔用CPU資源的。
文件描述符fd
文件描述符(File descriptor)是計算機科學中的一個術語,是一個用於表述指向文件的引用的抽象化概念。
文件描述符在形式上是一個非負整數。實際上,它是一個索引值,指向內核爲每個進程所維護的該進程打開文件的記錄表。
當程序打開一個現有文件或者建立一個新文件時,內核向進程返回一個文件描述符。
在程序設計中,一些涉及底層的程序編寫每每會圍繞着文件描述符展開。可是文件描述符這一律念每每只適用於UNIX、Linux這樣的操做系統。
緩存 I/O
緩存 I/O 又被稱做標準 I/O,大多數文件系統的默認 I/O 操做都是緩存 I/O。
在 Linux 的緩存 I/O 機制中,操做系統會將 I/O 的數據緩存在文件系統的頁緩存( page cache )中。
數據會先被拷貝到操做系統內核的緩衝區中,而後纔會從操做系統內核的緩衝區拷貝到應用程序的地址空間。
緩存 I/O 的缺點:
數據在傳輸過程當中須要在應用程序地址空間和內核進行屢次數據拷貝操做,這些數據拷貝操做所帶來的 CPU 以及內存開銷是很是大的。
對於一次IO訪問(以read舉例),數據會先被拷貝到操做系統內核的緩衝區中,而後纔會從操做系統內核的緩衝區拷貝到應用程序的地址空間。
一個IO(如read)操做會經歷如下兩個階段:
1. 等待數據準備 (Waiting for the data to be ready)
2. 將數據從內核拷貝到進程中 (Copying the data from the kernel to the process)
由於有了這兩個階段,linux系統產生了下面五種網絡模式的方案。
1.阻塞 I/O(blocking IO)
2.非阻塞 I/O(nonblocking IO)
3.I/O 多路複用( IO multiplexing)
4.信號驅動 I/O( signal driven IO)
5.異步 I/O(asynchronous IO)
阻塞 I/O(blocking IO)
在linux中,默認狀況下全部的socket都是blocking,一個典型的讀操做流程大概是這樣:
當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一個階段:準備數據(對於網絡IO來講,不少時候數據在一開始尚未到達。好比,尚未收到一個完整的UDP包。這個時候kernel就要等待足夠的數據到來)。這個過程須要等待,也就是說數據被拷貝到操做系統內核的緩衝區中是須要一個過程的。而在用戶進程這邊,整個進程會被阻塞(固然,是進程本身選擇的阻塞)。當kernel一直等到數據準備好了,它就會將數據從kernel中拷貝到用戶內存,而後kernel返回結果,用戶進程才解除block的狀態,從新運行起來。
因此,blocking IO的特色就是在IO執行的兩個階段都被block了。
非阻塞 I/O(nonblocking IO)
linux下,可經過設置socket使其變爲非阻塞IO。當對一個non-blocking socket執行讀操做時,流程是這個樣子:
當用戶進程發出read操做時,若是kernel中的數據尚未準備好,那麼它並不會block用戶進程,而是馬上返回一個error。
從用戶進程角度講 ,它發起一個read操做後,並不須要等待,而是立刻就獲得了一個結果。用戶進程判斷結果是一個error時,它就知道數據尚未準備好,因而它能夠再次發送read操做。一旦kernel中的數據準備好了,而且又再次收到了用戶進程的system call,那麼它立刻就將數據拷貝到了用戶內存,而後返回。
因此,nonblocking IO的特色是用戶進程須要不斷的主動詢問kernel數據好了沒有。
I/O 多路複用( IO multiplexing)
IO multiplexing就是咱們說的select,poll,epoll,有些地方也稱這種IO方式爲event driven IO。
select/epoll的好處就在於單個process就能夠同時處理多個網絡鏈接的IO。
它的基本原理就是select,poll,epoll這個function會不斷的輪詢所負責的全部socket
當某個socket有數據到達了,就通知用戶進程。
當用戶進程調用了select,那麼整個進程會被block,而同時,kernel會「監視」全部select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程。
因此,I/O 多路複用的特色是經過一種機制使一個進程能同時等待多個文件描述符,而這些文件描述符(套接字描述符)其中的任意一個進入讀就緒狀態,select()函數就能夠返回。
IO多路複用和阻塞IO其實並無太大的不一樣,事實上,還更差一些。由於這裏須要使用兩個system call (select 和 recvfrom),而阻塞IO只調用了一個system call (recvfrom)。可是,用select的優點在於它能夠同時處理多個鏈接。
若是處理的鏈接數不是很高的話,使用select/epoll的web server不必定比使用多線程+阻塞IO的web server性能更好,可能延遲還更大。
select/epoll的優點並非對於單個鏈接能處理得更快,而是在於能處理更多的鏈接。
在IO multiplexing Model中,實際中,對於每個socket,通常都設置成爲non-blocking
可是,如上圖所示,整個用戶的process實際上是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。
異步 I/O(asynchronous IO)
用戶進程發起read操做以後,馬上就能夠開始去作其它的事。而另外一方面,從kernel的角度,當它受到一個asynchronous read以後,首先它會馬上返回,因此不會對用戶進程產生任何block。
而後,kernel會等待數據準備完成,而後將數據拷貝到用戶內存,當這一切都完成以後,kernel會給用戶進程發送一個signal,告訴它read操做完成了。
blocking和non-blocking的區別
調用blocking IO會一直block住對應的進程直到操做完成
調用non-blocking IO在kernel還準備數據的狀況下會馬上返回
synchronous IO和asynchronous IO的區別
synchronous I/O操做會致使請求進程被阻塞,直到I/O操做完成;
asynchronous I/O操做不會致使請求進程被阻塞;
以前所說的blocking IO,non-blocking IO,IO multiplexing都屬於synchronous IO。
有人會說,non-blocking IO並無被block啊。 這裏須要格外注意,定義中所指的」IO operation」是指真實的IO操做,就是例子中的recvfrom這個system call。non-blocking IO在執行recvfrom這個system call的時候,若是kernel的數據沒有準備好,這時候不會block進程。可是,當kernel中數據準備好的時候,recvfrom會將數據從kernel拷貝到用戶內存中,這個時候進程是被block了,在這段時間內,進程是被block的。
而asynchronous IO則不同,當進程發起IO 操做以後,就直接返回不再理睬了,直到kernel發送一個信號,告訴進程說IO完成。在這整個過程當中,進程徹底沒有被block。
各個IO Model的比較如圖所示:
經過上面的圖片,能夠發現non-blocking IO和asynchronous IO的區別仍是很明顯的。
在non-blocking IO中,雖然進程大部分時間都不會被block,可是它仍然要求進程去主動的check,而且當數據準備完成之後,也須要進程主動的再次調用recvfrom來將數據拷貝到用戶內存。
而asynchronous IO則徹底不一樣。它就像是用戶進程將整個IO操做交給了他人(kernel)完成,而後他人作完後發信號通知。在此期間,用戶進程不須要去檢查IO操做的狀態,也不須要主動的去拷貝數據。
select,poll,epoll都是IO多路複用的機制。I/O多路複用就是經過一種機制使一個進程能夠監視多個描述符,一旦某個描述符就緒(通常是讀就緒或者寫就緒),可以通知程序進行相應的讀寫操做。
select,poll,epoll本質上都是同步I/O,由於他們都須要在讀寫事件就緒後本身負責進行讀寫,也就是說這個讀寫過程是阻塞的
異步I/O則無需本身負責進行讀寫,異步I/O的實現會負責把數據從內核拷貝到用戶空間。
sellect、poll、epoll三者的區別 :
select:
目前支持幾乎全部的平臺
默認單個進程可以監視的文件描述符的數量存在最大限制,在linux上默認只支持1024個socket
能夠經過修改宏定義或從新編譯內核(修改系統最大支持的端口數)的方式提高這一限制
內核準備好數據後通知用戶有數據了,但不告訴用戶是哪一個鏈接有數據,用戶只能經過輪詢的方式來獲取數據
假定select讓內核監視100個socket鏈接,當有1個鏈接有數據後,內核就通知用戶100個鏈接中有數據了
可是不告訴用戶是哪一個鏈接有數據了,此時用戶只能經過輪詢的方式一個個去檢查而後獲取數據
這裏是假定有100個socket鏈接,那麼若是有上萬個,上十萬個呢?
那你就得輪詢上萬次,上十萬次,而你所取的結果僅僅就那麼1個。這樣就會浪費不少沒用的開銷
只支持水平觸發;每次調用select,都須要把fd集合從用戶態拷貝到內核態,這個開銷在fd不少時會很大
同時每次調用select都須要在內核遍歷傳遞進來的全部fd,這個開銷在fd不少時也會很大
poll:
與select沒有本質上的差異,僅僅是沒有了最大文件描述符數量的限制
只支持水平觸發
只是一個過渡版本,不多用
epoll:
Linux2.6纔出現的epoll,具有了select和poll的一切優勢,公認爲性能最好的多路IO就緒通知方法
沒有最大文件描述符數量的限制
同時支持水平觸發和邊緣觸發
不支持windows平臺
內核準備好數據之後會通知用戶哪一個鏈接有數據了
IO效率不隨fd數目增長而線性降低
使用mmap加速內核與用戶空間的消息傳遞
水平觸發與邊緣觸發:
水平觸發:將就緒的文件描述符告訴進程後,若是進程沒有對其進行IO操做,那麼下次調用epoll時將再次報告這些文件描述符,這種方式稱爲水平觸發
邊緣觸發:只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發
理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。
select和epoll的特色:
select:
select經過一個select()系統調用來監視多個文件描述符的數組,當select()返回後,該數組中就緒的文件描述符便會被內核修改標誌位,使得進程能夠得到這些文件描述符從而進行後續的讀寫操做。
因爲網絡響應時間的延遲使得大量TCP鏈接處於非活躍狀態,但調用select()會對全部socket進行一次線性掃描,因此這也浪費了必定的開銷。
epoll:
epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。
另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。
select
select(rlist,wlist,xlist,timeout=None)
select函數監視的文件描述符分3類,分別是writefds、readfds、和exceptfds。
調用後select函數會阻塞,直到有描述符就緒(有數據可讀、可寫、或者有except),或者超時(timeout指定等待時間,若是當即返回設爲null便可),函數返回。當select函數返回後,能夠經過遍歷fdset,來找到就緒的描述符。
poll
int poll(struct pollfd *fds,unsigned int nfds,int timeout);
不一樣於select使用三個位圖來表示三個fdset的方式,poll使用一個pollfd的指針實現。
struct pollfd { int fd; /*file descriptor */ short events; /* requested events to watch */ short revents; /* returned events witnessed */ };
pollfd結構包含了要監視的event和發生的event,再也不使用select「參數-值」傳遞的方式。同時,pollfd並無最大數量限制(可是數量過大後性能也是會降低)。 和select函數同樣,poll返回後,須要輪詢pollfd來獲取就緒的描述符。
從上面看,select和poll都須要在返回後,經過遍歷文件描述符來獲取已經就緒的socket。
事實上,同時鏈接的大量客戶端在一時刻可能只有不多的處於就緒狀態,所以隨着監視的描述符數量的增加,其效率也會線性降低。
epool
epoll是在2.6內核中提出的,是以前的select和poll的加強版本。相對於select和poll來講,epoll更加靈活,沒有描述符限制。
epoll使用一個文件描述符管理多個描述符,將用戶關係的文件描述符的事件存放到內核的一個事件表中,這樣在用戶空間和內核空間的copy只需一次。
epoll操做過程須要三個接口,分別以下:
int epoll_create(int size);//建立一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
(1)int epool_create(int size);
建立一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大,這個參數不一樣於select()中的第一個參數,給出最大監聽的fd+1的值,參數size並非限制了epoll所能監聽的描述符最大個數,只是對內核初始分配內部數據結構的一個建議。
當建立好epoll句柄後,它就會佔用一個fd值,在linux下若是查看/proc/進程id/fd/,是可以看到這個fd的,因此在使用完epoll後,必須調用close()關閉,不然可能致使fd被耗盡。
(2)int epool_ctl(int epfd,int op,int fd,struct epoll_event *event);
函數是對指定描述符fd執行op操做。
epfd:是epoll_create()的返回值。
op:表示op操做,用三個宏來表示:
添加EPOLL_CTL_ADD,刪除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。
分別添加、刪除和修改對fd的監聽事件。
fd:是須要監聽的fd(文件描述符)
epoll_event:是告訴內核須要監聽什麼事
(3)int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等待epfd上的io事件,最多返回maxevents個事件。
參數events用來從內核獲得事件的集合,maxevents告以內核這個events有多大,這個maxevents的值不能大於建立epoll_create()時的size,參數timeout是超時時間(毫秒,0會當即返回,-1將不肯定,也有說法說是永久阻塞)。該函數返回須要處理的事件數目,如返回0表示已超時。
一個簡單的select多併發socket服務端代碼:
#!/usr/bin/python #Author:sean import select import socket import queue server = socket.socket() HOST = 'localhost' PORT = 8080 print("start up %s on port: %s",% (HOST,PORT)) server.bind((HOST,PORT)) server.listen() server.setblocking(False) #不阻塞 msg_dic_queue = {} #這是一個隊列字典,存放要返回給客戶端的數據 inputs = [server] #inputs裏存放要讓內核監測的鏈接,這裏的server是指監測server自己的鏈接狀態 #inputs = [server,conn] outputs = [] #outputs裏存放要返回給客戶端的數據鏈接對象 while True: print("waiting for next connect...") readable,writeable,exceptional = select.select(inputs,outputs,inputs) #若是沒有任何fd就緒,程序就會一直阻塞在這裏 # print(readable,writeable,exceptional) for r in readable: #處理活躍的鏈接,每一個r就是一個socket鏈接對象 if r is server: #表明來了一個新鏈接 conn,client_addr = server.accept() print("arrived a new connect: ",client_addr) conn.setblocking(False) inputs.append(conn) #由於這個新創建的鏈接還沒發數據來,如今就接收的話,程序就報異常了 #因此要想實現這個客戶端發數據來時server端能知道,就須要讓select再監測這個conn msg_dic_queue[conn] = queue.Queue() #初始化一個隊列,後面存要返回給客戶端的數據 else: #r不是server的話就表明是一個與客戶端創建的文件描述符了 #客戶端的數據過來了,在這裏接收 data = r.recv(1024) if data: print("received data from [%s]: "% r.getpeername()[0],data) msg_dic_queue[r].put(data) #收到的數據先放到隊列字典裏,以後再返回給客戶端 if r not in outputs: outputs.append(r) #放入返回的鏈接隊列裏。爲了避免影響處理與其它客戶端的鏈接,這裏不馬上返回數據給客戶端 else: #若是收不到data就表明客戶端已經斷開了 print("Client is disconnect",r) if r in outputs: outputs.remove(r) #清理已斷開的鏈接 inputs.remove(r) del msg_dic_queue[r] for w in writeable: #處理要返回給客戶端的鏈接列表 try: next_msg = msg_dic_queue[w].get_nowait() except queue.Empty: print("client [%s]"% w.getpeername()[0],"queue is empty...") outputs.remove(w) #確保下次循環時writeable不返回已經處理完的鏈接 else: print("sending message to [%s]"% w.getpeername()[0],next_msg) w.send(next_msg) #返回給客戶端源數據 for e in exceptional: #處理異常鏈接 if e in outputs: outputs.remove(e) inputs.remove(e) del msg_dic_queue[e]
select多併發socket客戶端代碼:
#!/usr/bin/python #Author:sean import socket msgs = [ b'This is the message. ', b'It will be sent ', b'in parts.', ] SERVER_ADDRESS = 'localhost' SERVER_PORT = 8080 # Create a few TCP/IP socket socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(500) ] # Connect the socket to the port where the server is listening print('connecting to %s port %s' % (SERVER_ADDRESS,SERVER_PORT)) for s in socks: s.connect((SERVER_ADDRESS,SERVER_PORT)) for message in msgs: # Send messages on both sockets for s in socks: print('%s: sending "%s"' % (s.getsockname(), message) ) s.send(message) # Read responses on both sockets for s in socks: data = s.recv(1024) print( '%s: received "%s"' % (s.getsockname(), data) ) if not data: print(sys.stderr, 'closing socket', s.getsockname() )
epoll多併發socket服務端代碼以下:
#!/usr/bin/python #Author:sean import socket, logging import select, errno logger = logging.getLogger("network-server") def InitLog(): logger.setLevel(logging.DEBUG) fh = logging.FileHandler("network-server.log") fh.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.ERROR) formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") ch.setFormatter(formatter) fh.setFormatter(formatter) logger.addHandler(fh) logger.addHandler(ch) if __name__ == "__main__": InitLog() try: # 建立 TCP socket 做爲監聽 socket listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) except socket.error as msg: logger.error("create socket failed") try: # 設置 SO_REUSEADDR 選項 listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) except socket.error as msg: logger.error("setsocketopt SO_REUSEADDR failed") try: # 進行 bind -- 此處未指定 ip 地址,即 bind 了所有網卡 ip 上 listen_fd.bind(('', 8008)) except socket.error as msg: logger.error("bind failed") try: # 設置 listen 的 backlog 數 listen_fd.listen(10) except socket.error as msg: logger.error(msg) try: # 建立 epoll 句柄 epoll_fd = select.epoll() # 向 epoll 句柄中註冊 監聽 socket 的 可讀 事件 epoll_fd.register(listen_fd.fileno(), select.EPOLLIN) except select.error as msg: logger.error(msg) connections = {} addresses = {} datalist = {} while True: # epoll 進行 fd 掃描的地方 -- 未指定超時時間則爲阻塞等待 epoll_list = epoll_fd.poll() for fd, events in epoll_list: # 若爲監聽 fd 被激活 if fd == listen_fd.fileno(): # 進行 accept -- 得到鏈接上來 client 的 ip 和 port,以及 socket 句柄 conn, addr = listen_fd.accept() logger.debug("accept connection from %s, %d, fd = %d" % (addr[0], addr[1], conn.fileno())) # 將鏈接 socket 設置爲 非阻塞 conn.setblocking(0) # 向 epoll 句柄中註冊 鏈接 socket 的 可讀 事件 epoll_fd.register(conn.fileno(), select.EPOLLIN | select.EPOLLET) # 將 conn 和 addr 信息分別保存起來 connections[conn.fileno()] = conn addresses[conn.fileno()] = addr elif select.EPOLLIN & events: # 有 可讀 事件激活 datas = '' while True: try: # 從激活 fd 上 recv 10 字節數據 data = connections[fd].recv(10) # 若當前沒有接收到數據,而且以前的累計數據也沒有 if not data and not datas: # 從 epoll 句柄中移除該 鏈接 fd epoll_fd.unregister(fd) # server 側主動關閉該 鏈接 fd connections[fd].close() logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1])) break else: # 將接收到的數據拼接保存在 datas 中 datas += data except socket.error as msg: # 在 非阻塞 socket 上進行 recv 須要處理 讀穿 的狀況 # 這裏其實是利用 讀穿 出 異常 的方式跳到這裏進行後續處理 if msg.errno == errno.EAGAIN: logger.debug("%s receive %s" % (fd, datas)) # 將已接收數據保存起來 datalist[fd] = datas # 更新 epoll 句柄中鏈接d 註冊事件爲 可寫 epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT) break else: # 出錯處理 epoll_fd.unregister(fd) connections[fd].close() logger.error(msg) break elif select.EPOLLHUP & events: # 有 HUP 事件激活 epoll_fd.unregister(fd) connections[fd].close() logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1])) elif select.EPOLLOUT & events: # 有 可寫 事件激活 sendLen = 0 # 經過 while 循環確保將 buf 中的數據所有發送出去 while True: # 將以前收到的數據發回 client -- 經過 sendLen 來控制發送位置 sendLen += connections[fd].send(datalist[fd][sendLen:]) # 在所有發送完畢後退出 while 循環 if sendLen == len(datalist[fd]): break # 更新 epoll 句柄中鏈接 fd 註冊事件爲 可讀 epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLET) else: # 其餘 epoll 事件不進行處理 continue
五、python之selectors模塊
selectors模塊是在python3.4版本中引進的,它封裝了IO多路複用中的select和epoll,可以更快,更方便的實現多併發效果。
官方文檔見:https://docs.python.org/3/library/selectors.html
如下是一個selectors模塊的代碼示範:
#!/usr/bin/python #Author:sean import selectors import socket #selectors模塊默認會用epoll,若是你的系統中沒有epoll(好比windows)則會自動使用select sel = selectors.DefaultSelector() #生成一個select對象 def accept(sock, mask): conn, addr = sock.accept() # Should be ready print('accepted', conn, 'from', addr) conn.setblocking(False) #設定非阻塞 sel.register(conn, selectors.EVENT_READ, read) #新鏈接註冊read回調函數 def read(conn, mask): data = conn.recv(1024) # Should be ready if data: print('echoing', repr(data), 'to', conn) conn.send(data) else: print('closing', conn) sel.unregister(conn) conn.close() sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen() sock.setblocking(False) sel.register(sock, selectors.EVENT_READ, accept) #把剛生成的sock鏈接對象註冊到select鏈接列表中,並交給accept函數處理 while True: events = sel.select() #默認是阻塞,有活動鏈接就返回活動的鏈接列表 #這裏看起來是select,其實有可能會使用epoll,若是你的系統支持epoll,那麼默認就是epoll for key, mask in events: callback = key.data #去調accept函數 callback(key.fileobj, mask) #key.fileobj就是readable中的一個socket鏈接對象