Python之路,Day9 , IO多路複用(番外篇)

同步IO和異步IO,阻塞IO和非阻塞IO分別是什麼,到底有什麼區別?不一樣的人在不一樣的上下文下給出的答案是不一樣的。因此先限定一下本文的上下文。html

本文討論的背景是Linux環境下的network IO。

一 概念說明

在進行解釋以前,首先要說明幾個概念:
- 用戶空間和內核空間
- 進程切換
- 進程的阻塞
- 文件描述符
- 緩存 I/Opython

 

 

用戶空間與內核空間

如今操做系統都是採用虛擬存儲器,那麼對32位操做系統而言,它的尋址空間(虛擬存儲空間)爲4G(2的32次方)。操做系統的核心是內核,獨立於普通的應用程序,能夠訪問受保護的內存空間,也有訪問底層硬件設備的全部權限。爲了保證用戶進程不能直接操做內核(kernel),保證內核的安全,操心繫統將虛擬空間劃分爲兩部分,一部分爲內核空間,一部分爲用戶空間。針對linux操做系統而言,將最高的1G字節(從虛擬地址0xC0000000到0xFFFFFFFF),供內核使用,稱爲內核空間,而將較低的3G字節(從虛擬地址0x00000000到0xBFFFFFFF),供各個進程使用,稱爲用戶空間。linux

 

進程切換

爲了控制進程的執行,內核必須有能力掛起正在CPU上運行的進程,並恢復之前掛起的某個進程的執行。這種行爲被稱爲進程切換。所以能夠說,任何進程都是在操做系統內核的支持下運行的,是與內核緊密相關的。web

從一個進程的運行轉到另外一個進程上運行,這個過程當中通過下面這些變化:
1. 保存處理機上下文,包括程序計數器和其餘寄存器。
2. 更新PCB信息。segmentfault

3. 把進程的PCB移入相應的隊列,如就緒、在某事件阻塞等隊列。
4. 選擇另外一個進程執行,並更新其PCB。
5. 更新內存管理的數據結構。
6. 恢復處理機上下文。緩存

總而言之就是很耗資源,具體的能夠參考這篇文章:進程切換安全

注:進程控制塊(Processing Control Block),是操做系統核心中一種數據結構,主要表示進程狀態。其做用是使一個在多道程序環境下不能獨立運行的程序(含數據),成爲一個能獨立運行的基本單位或與其它進程併發執行的進程。或者說,OS是根據PCB來對併發執行的進程進行控制和管理的。 PCB一般是系統內存佔用區中的一個連續存區,它存放着操做系統用於描述進程狀況及控制進程運行所需的所有信息 網絡

進程的阻塞

正在執行的進程,因爲期待的某些事件未發生,如請求系統資源失敗、等待某種操做的完成、新數據還沒有到達或無新工做作等,則由系統自動執行阻塞原語(Block),使本身由運行狀態變爲阻塞狀態。可見,進程的阻塞是進程自身的一種主動行爲,也所以只有處於運行態的進程(得到CPU),纔可能將其轉爲阻塞狀態。當進程進入阻塞狀態,是不佔用CPU資源的數據結構

文件描述符fd

文件描述符(File descriptor)是計算機科學中的一個術語,是一個用於表述指向文件的引用的抽象化概念。併發

文件描述符在形式上是一個非負整數。實際上,它是一個索引值,指向內核爲每個進程所維護的該進程打開文件的記錄表。當程序打開一個現有文件或者建立一個新文件時,內核向進程返回一個文件描述符。在程序設計中,一些涉及底層的程序編寫每每會圍繞着文件描述符展開。可是文件描述符這一律念每每只適用於UNIX、Linux這樣的操做系統。

 

緩存 I/O

緩存 I/O 又被稱做標準 I/O,大多數文件系統的默認 I/O 操做都是緩存 I/O。在 Linux 的緩存 I/O 機制中,操做系統會將 I/O 的數據緩存在文件系統的頁緩存( page cache )中,也就是說,數據會先被拷貝到操做系統內核的緩衝區中,而後纔會從操做系統內核的緩衝區拷貝到應用程序的地址空間。

緩存 I/O 的缺點:
數據在傳輸過程當中須要在應用程序地址空間和內核進行屢次數據拷貝操做,這些數據拷貝操做所帶來的 CPU 以及內存開銷是很是大的。

 

二 IO模式

剛纔說了,對於一次IO訪問(以read舉例),數據會先被拷貝到操做系統內核的緩衝區中,而後纔會從操做系統內核的緩衝區拷貝到應用程序的地址空間。因此說,當一個read操做發生時,它會經歷兩個階段:
1. 等待數據準備 (Waiting for the data to be ready)
2. 將數據從內核拷貝到進程中 (Copying the data from the kernel to the process)

正式由於這兩個階段,linux系統產生了下面五種網絡模式的方案。
- 阻塞 I/O(blocking IO)
- 非阻塞 I/O(nonblocking IO)
- I/O 多路複用( IO multiplexing)
- 信號驅動 I/O( signal driven IO)
- 異步 I/O(asynchronous IO)

注:因爲signal driven IO在實際中並不經常使用,因此我這隻說起剩下的四種IO Model。

 

阻塞 I/O(blocking IO)

在linux中,默認狀況下全部的socket都是blocking,一個典型的讀操做流程大概是這樣:

 

 

當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一個階段:準備數據(對於網絡IO來講,不少時候數據在一開始尚未到達。好比,尚未收到一個完整的UDP包。這個時候kernel就要等待足夠的數據到來)。這個過程須要等待,也就是說數據被拷貝到操做系統內核的緩衝區中是須要一個過程的。而在用戶進程這邊,整個進程會被阻塞(固然,是進程本身選擇的阻塞)。當kernel一直等到數據準備好了,它就會將數據從kernel中拷貝到用戶內存,而後kernel返回結果,用戶進程才解除block的狀態,從新運行起來。

因此,blocking IO的特色就是在IO執行的兩個階段都被block了。

非阻塞 I/O(nonblocking IO)

linux下,能夠經過設置socket使其變爲non-blocking。當對一個non-blocking socket執行讀操做時,流程是這個樣子:

 

當用戶進程發出read操做時,若是kernel中的數據尚未準備好,那麼它並不會block用戶進程,而是馬上返回一個error。從用戶進程角度講 ,它發起一個read操做後,並不須要等待,而是立刻就獲得了一個結果。用戶進程判斷結果是一個error時,它就知道數據尚未準備好,因而它能夠再次發送read操做。一旦kernel中的數據準備好了,而且又再次收到了用戶進程的system call,那麼它立刻就將數據拷貝到了用戶內存,而後返回。

因此,nonblocking IO的特色是用戶進程須要不斷的主動詢問kernel數據好了沒有。

I/O 多路複用( IO multiplexing)

IO multiplexing就是咱們說的select,poll,epoll,有些地方也稱這種IO方式爲event driven IO。select/epoll的好處就在於單個process就能夠同時處理多個網絡鏈接的IO。它的基本原理就是select,poll,epoll這個function會不斷的輪詢所負責的全部socket,當某個socket有數據到達了,就通知用戶進程。

 

當用戶進程調用了select,那麼整個進程會被block,而同時,kernel會「監視」全部select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程。

因此,I/O 多路複用的特色是經過一種機制一個進程能同時等待多個文件描述符,而這些文件描述符(套接字描述符)其中的任意一個進入讀就緒狀態,select()函數就能夠返回。

這個圖和blocking IO的圖其實並無太大的不一樣,事實上,還更差一些。由於這裏須要使用兩個system call (select 和 recvfrom),而blocking IO只調用了一個system call (recvfrom)。可是,用select的優點在於它能夠同時處理多個connection。

因此,若是處理的鏈接數不是很高的話,使用select/epoll的web server不必定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優點並非對於單個鏈接能處理得更快,而是在於能處理更多的鏈接。)

在IO multiplexing Model中,實際中,對於每個socket,通常都設置成爲non-blocking,可是,如上圖所示,整個用戶的process實際上是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。

 

異步 I/O(asynchronous IO)

inux下的asynchronous IO其實用得不多。先看一下它的流程:

用戶進程發起read操做以後,馬上就能夠開始去作其它的事。而另外一方面,從kernel的角度,當它受到一個asynchronous read以後,首先它會馬上返回,因此不會對用戶進程產生任何block。而後,kernel會等待數據準備完成,而後將數據拷貝到用戶內存,當這一切都完成以後,kernel會給用戶進程發送一個signal,告訴它read操做完成了。

 

總結

blocking和non-blocking的區別

調用blocking IO會一直block住對應的進程直到操做完成,而non-blocking IO在kernel還準備數據的狀況下會馬上返回。

synchronous IO和asynchronous IO的區別

在說明synchronous IO和asynchronous IO的區別以前,須要先給出二者的定義。POSIX的定義是這樣子的:
- A synchronous I/O operation causes the requesting process to be blocked until that I/O operation completes;
- An asynchronous I/O operation does not cause the requesting process to be blocked;

二者的區別就在於synchronous IO作」IO operation」的時候會將process阻塞。按照這個定義,以前所述的blocking IO,non-blocking IO,IO multiplexing都屬於synchronous IO。

有人會說,non-blocking IO並無被block啊。這裏有個很是「狡猾」的地方,定義中所指的」IO operation」是指真實的IO操做,就是例子中的recvfrom這個system call。non-blocking IO在執行recvfrom這個system call的時候,若是kernel的數據沒有準備好,這時候不會block進程。可是,當kernel中數據準備好的時候,recvfrom會將數據從kernel拷貝到用戶內存中,這個時候進程是被block了,在這段時間內,進程是被block的。

而asynchronous IO則不同,當進程發起IO 操做以後,就直接返回不再理睬了,直到kernel發送一個信號,告訴進程說IO完成。在這整個過程當中,進程徹底沒有被block。

 

各個IO Model的比較如圖所示:

 

經過上面的圖片,能夠發現non-blocking IO和asynchronous IO的區別仍是很明顯的。在non-blocking IO中,雖然進程大部分時間都不會被block,可是它仍然要求進程去主動的check,而且當數據準備完成之後,也須要進程主動的再次調用recvfrom來將數據拷貝到用戶內存。而asynchronous IO則徹底不一樣。它就像是用戶進程將整個IO操做交給了他人(kernel)完成,而後他人作完後發信號通知。在此期間,用戶進程不須要去檢查IO操做的狀態,也不須要主動的去拷貝數據。

 

三 I/O 多路複用之select、poll、epoll詳解

select,poll,epoll都是IO多路複用的機制。I/O多路複用就是經過一種機制,一個進程能夠監視多個描述符,一旦某個描述符就緒(通常是讀就緒或者寫就緒),可以通知程序進行相應的讀寫操做。但select,poll,epoll本質上都是同步I/O,由於他們都須要在讀寫事件就緒後本身負責進行讀寫,也就是說這個讀寫過程是阻塞的,而異步I/O則無需本身負責進行讀寫,異步I/O的實現會負責把數據從內核拷貝到用戶空間。(這裏囉嗦下)

select

?
1
select(rlist, wlist, xlist, timeout = None )

select 函數監視的文件描述符分3類,分別是writefds、readfds、和exceptfds。調用後select函數會阻塞,直到有描述副就緒(有數據 可讀、可寫、或者有except),或者超時(timeout指定等待時間,若是當即返回設爲null便可),函數返回。當select函數返回後,能夠 經過遍歷fdset,來找到就緒的描述符。

select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢。select的一 個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制,但 是這樣也會形成效率的下降。

poll

?
1
int poll (struct pollfd * fds, unsigned int nfds, int timeout);

不一樣與select使用三個位圖來表示三個fdset的方式,poll使用一個 pollfd的指針實現。

struct pollfd { int fd; /* file descriptor */ short events; /* requested events to watch */ short revents; /* returned events witnessed */ }; 

pollfd結構包含了要監視的event和發生的event,再也不使用select「參數-值」傳遞的方式。同時,pollfd並無最大數量限制(可是數量過大後性能也是會降低)。 和select函數同樣,poll返回後,須要輪詢pollfd來獲取就緒的描述符。

從上面看,select和poll都須要在返回後,經過遍歷文件描述符來獲取已經就緒的socket。事實上,同時鏈接的大量客戶端在一時刻可能只有不多的處於就緒狀態,所以隨着監視的描述符數量的增加,其效率也會線性降低。

  

epoll

epoll是在2.6內核中提出的,是以前的select和poll的加強版本。相對於select和poll來講,epoll更加靈活,沒有描述符限制。epoll使用一個文件描述符管理多個描述符,將用戶關係的文件描述符的事件存放到內核的一個事件表中,這樣在用戶空間和內核空間的copy只需一次。

一 epoll操做過程

epoll操做過程須要三個接口,分別以下:

 

?
1
2
3
int epoll_create( int size); //建立一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大
int epoll_ctl( int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait( int epfd, struct epoll_event * events, int maxevents, int timeout);

1. int epoll_create(int size);
建立一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大,這個參數不一樣於select()中的第一個參數,給出最大監聽的fd+1的值,參數size並非限制了epoll所能監聽的描述符最大個數,只是對內核初始分配內部數據結構的一個建議
當建立好epoll句柄後,它就會佔用一個fd值,在linux下若是查看/proc/進程id/fd/,是可以看到這個fd的,因此在使用完epoll後,必須調用close()關閉,不然可能致使fd被耗盡。

2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
函數是對指定描述符fd執行op操做。
- epfd:是epoll_create()的返回值。
- op:表示op操做,用三個宏來表示:添加EPOLL_CTL_ADD,刪除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分別添加、刪除和修改對fd的監聽事件。
- fd:是須要監聽的fd(文件描述符)
- epoll_event:是告訴內核須要監聽什麼事

3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等待epfd上的io事件,最多返回maxevents個事件。
參數events用來從內核獲得事件的集合,maxevents告以內核這個events有多大,這個maxevents的值不能大於建立epoll_create()時的size,參數timeout是超時時間(毫秒,0會當即返回,-1將不肯定,也有說法說是永久阻塞)。該函數返回須要處理的事件數目,如返回0表示已超時。

 

  

#_*_coding:utf-8_*_
__author__ = 'Alex Li'

import socket, logging
import select, errno

logger = logging.getLogger("network-server")

def InitLog():
    logger.setLevel(logging.DEBUG)

    fh = logging.FileHandler("network-server.log")
    fh.setLevel(logging.DEBUG)
    ch = logging.StreamHandler()
    ch.setLevel(logging.ERROR)

    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    ch.setFormatter(formatter)
    fh.setFormatter(formatter)

    logger.addHandler(fh)
    logger.addHandler(ch)


if __name__ == "__main__":
    InitLog()

    try:
        # 建立 TCP socket 做爲監聽 socket
        listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
    except socket.error as  msg:
        logger.error("create socket failed")

    try:
        # 設置 SO_REUSEADDR 選項
        listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    except socket.error as  msg:
        logger.error("setsocketopt SO_REUSEADDR failed")

    try:
        # 進行 bind -- 此處未指定 ip 地址,即 bind 了所有網卡 ip 上
        listen_fd.bind(('', 2003))
    except socket.error as  msg:
        logger.error("bind failed")

    try:
        # 設置 listen 的 backlog 數
        listen_fd.listen(10)
    except socket.error as  msg:
        logger.error(msg)

    try:
        # 建立 epoll 句柄
        epoll_fd = select.epoll()
        # 向 epoll 句柄中註冊 監聽 socket 的 可讀 事件
        epoll_fd.register(listen_fd.fileno(), select.EPOLLIN)
    except select.error as  msg:
        logger.error(msg)

    connections = {}
    addresses = {}
    datalist = {}
    while True:
        # epoll 進行 fd 掃描的地方 -- 未指定超時時間則爲阻塞等待
        epoll_list = epoll_fd.poll()

        for fd, events in epoll_list:
            # 若爲監聽 fd 被激活
            if fd == listen_fd.fileno():
                # 進行 accept -- 得到鏈接上來 client 的 ip 和 port,以及 socket 句柄
                conn, addr = listen_fd.accept()
                logger.debug("accept connection from %s, %d, fd = %d" % (addr[0], addr[1], conn.fileno()))
                # 將鏈接 socket 設置爲 非阻塞
                conn.setblocking(0)
                # 向 epoll 句柄中註冊 鏈接 socket 的 可讀 事件
                epoll_fd.register(conn.fileno(), select.EPOLLIN | select.EPOLLET)
                # 將 conn 和 addr 信息分別保存起來
                connections[conn.fileno()] = conn
                addresses[conn.fileno()] = addr
            elif select.EPOLLIN & events:
                # 有 可讀 事件激活
                datas = ''
                while True:
                    try:
                        # 從激活 fd 上 recv 10 字節數據
                        data = connections[fd].recv(10)
                        # 若當前沒有接收到數據,而且以前的累計數據也沒有
                        if not data and not datas:
                            # 從 epoll 句柄中移除該 鏈接 fd
                            epoll_fd.unregister(fd)
                            # server 側主動關閉該 鏈接 fd
                            connections[fd].close()
                            logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1]))
                            break
                        else:
                            # 將接收到的數據拼接保存在 datas 中
                            datas += data
                    except socket.error as  msg:
                        # 在 非阻塞 socket 上進行 recv 須要處理 讀穿 的狀況
                        # 這裏其實是利用 讀穿 出 異常 的方式跳到這裏進行後續處理
                        if msg.errno == errno.EAGAIN:
                            logger.debug("%s receive %s" % (fd, datas))
                            # 將已接收數據保存起來
                            datalist[fd] = datas
                            # 更新 epoll 句柄中鏈接d 註冊事件爲 可寫
                            epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT)
                            break
                        else:
                            # 出錯處理
                            epoll_fd.unregister(fd)
                            connections[fd].close()
                            logger.error(msg)
                            break
            elif select.EPOLLHUP & events:
                # 有 HUP 事件激活
                epoll_fd.unregister(fd)
                connections[fd].close()
                logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1]))
            elif select.EPOLLOUT & events:
                # 有 可寫 事件激活
                sendLen = 0
                # 經過 while 循環確保將 buf 中的數據所有發送出去
                while True:
                    # 將以前收到的數據發回 client -- 經過 sendLen 來控制發送位置
                    sendLen += connections[fd].send(datalist[fd][sendLen:])
                    # 在所有發送完畢後退出 while 循環
                    if sendLen == len(datalist[fd]):
                        break
                # 更新 epoll 句柄中鏈接 fd 註冊事件爲 可讀
                epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLET)
            else:
                # 其餘 epoll 事件不進行處理
                continue
epoll socket echo server

 

 

 

參考:

https://segmentfault.com/a/1190000003063859

https://my.oschina.net/moooofly/blog/147297

 

原文:http://www.cnblogs.com/alex3714/articles/5876749.html

相關文章
相關標籤/搜索