io多路複用

 

 

上篇回顧:靜態服務器+壓測linux

3.2.概念篇

1.同步與異步

同步是指一個任務的完成須要依賴另一個任務時,只有等待被依賴的任務完成後,依賴的任務才能算完成。web

異步是指不須要等待被依賴的任務完成,只是通知被依賴的任務要完成什麼工做。而後繼續執行下面代碼邏輯,只要本身完成了整個任務就算完成了(異步通常使用狀態、通知和回調)數據庫

PS:項目裏面通常是這樣的:(我的經驗)編程

  1. 同步架構:通常都是和錢相關的需求,須要實時返回的業務
  2. 異步架構:更可能是對寫要求比較高時的場景(同步變異步)
    • 讀通常都是實時返回,代碼通常都是await xxx()
  3. 想象個情景就清楚了:
    • 異步:如今用戶寫了篇文章,能夠異步操做,就算沒真正寫到數據庫也能夠返回:發表成功(大不了失敗提示一下)
    • 同步:用戶獲取訂單信息,你若是異步就會這樣了:提示下獲取成功,而後一片空白...用戶不卸載就怪了...

2.阻塞與非阻塞

阻塞是指調用結果返回以前,當前線程會被掛起,一直處於等待消息通知,不可以執行其餘業務(大部分代碼都是這樣的)緩存

非阻塞是指在不能馬上獲得結果以前,該函數不會阻塞當前線程,而會馬上返回(繼續執行下面代碼,或者重試機制走起)服務器

PS:項目裏面重試機制爲啥通常都是3次?markdown

  1. 第一次重試,兩臺PC掛了也是有可能的
  2. 第二次重試,負載均衡分配的三臺機器同時掛的可能性不是很大,這時候就有多是網絡有點擁堵了
  3. 最後一次重試,再失敗就沒意義了,日記寫起來,再重試網絡負擔就加大了,得不償失了

3.五種IO模型

對於一次IO訪問,數據會先被拷貝到內核的緩衝區中,而後纔會從內核的緩衝區拷貝到應用程序的地址空間。須要經歷兩個階段:網絡

  1. 準備數據
  2. 將數據從內核緩衝區拷貝到進程地址空間

因爲存在這兩個階段,Linux產生了下面五種IO模型(以socket爲例

  1. 阻塞式IO:
    • 當用戶進程調用了recvfrom等阻塞方法時,內核進入IO的第1個階段:準備數據(內核須要等待足夠的數據再拷貝)這個過程須要等待,用戶進程會被阻塞,等內核將數據準備好,而後拷貝到用戶地址空間,內核返回結果,用戶進程才從阻塞態進入就緒態
    • Linux中默認狀況下全部的socket都是阻塞的
  2. 非阻塞式IO:
    • 當用戶進程發出read操做時,若是kernel中的數據尚未準備好,那麼它並不會block用戶進程,而是馬上返回一個error
    • 用戶進程判斷結果是一個error時,它就知道數據尚未準備好,因而它能夠再次發送read操做
    • 一旦kernel中的數據準備好了,而且又再次收到了用戶進程的system call,那麼它立刻就將數據拷貝到了用戶內存,而後返回
    • 非阻塞IO模式下用戶進程須要不斷地詢問內核的數據準備好了沒有
  3. IO多路複用
    • 經過一種機制,一個進程能夠監視多個文件描述符(套接字描述符)一旦某個文件描述符就緒(通常是讀就緒或者寫就緒),可以通知程序進行相應的讀寫操做(這樣就不須要每一個用戶進程不斷的詢問內核數據準備好了沒)
    • 經常使用的IO多路複用方式有selectpollepoll
  4. 信號驅動IO:
    • 內核文件描述符就緒後,經過信號通知用戶進程,用戶進程再經過系統調用讀取數據。
    • 此方式屬於同步IO(實際讀取數據到用戶進程緩存的工做仍然是由用戶進程本身負責的)
  5. 異步IOPOSIXaio_系列函數)
    • 用戶進程發起read操做以後,馬上就能夠開始去作其它的事。內核收到一個異步IO read以後,會馬上返回,不會阻塞用戶進程。
    • 內核會等待數據準備完成,而後將數據拷貝到用戶內存,當這一切都完成以後,內核會給用戶進程發送一個signal告訴它read操做完成了

4.Unix圖示

貼一下Unix編程裏面的圖:

**非阻塞IO**

2.非阻塞IO

**IO複用**

3.IO複用

**信號IO**

4.信號IO

**異步AIO**

5.異步AIO

3.3.IO多路複用

開始以前我們經過非阻塞IO引入一下:(來個簡單例子socket.setblocking(False))

import time
import socket

def select(socket_addr_list):
    for client_socket, client_addr in socket_addr_list:
        try:
            data = client_socket.recv(2048)
            if data:
                print(f"[來自{client_addr}的消息:]\n")
                print(data.decode("utf-8"))
                client_socket.send(
                    b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
                )
            else:
                # 沒有消息是觸發異常,空消息是斷開鏈接
                client_socket.close()  # 關閉客戶端鏈接
                socket_addr_list.remove((client_socket, client_addr))
                print(f"[客戶端{client_addr}已斷開鏈接,當前鏈接數:{len(socket_addr_list)}]")
        except Exception:
            pass

def main():
    # 存放客戶端集合
    socket_addr_list = list()

    with socket.socket() as tcp_server:
        # 防止端口綁定的設置
        tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        tcp_server.bind(('', 8080))
        tcp_server.listen()
        tcp_server.setblocking(False)  # 服務端非阻塞
        while True:
            try:
                client_socket, client_addr = tcp_server.accept()
                client_socket.setblocking(False)  # 客戶端非阻塞
                socket_addr_list.append((client_socket, client_addr))
            except Exception:
                pass
            else:
                print(f"[來自{client_addr}的鏈接,當前鏈接數:{len(socket_addr_list)}]")
            # 防止客戶端斷開後出錯
            if socket_addr_list:
                # 輪詢查看客戶端有沒有消息
                select(socket_addr_list)  # 引用傳參
                time.sleep(0.01)

if __name__ == "__main__":
    main()

輸出:
3.nowait.gif

能夠思考下:

  1. 爲何Server也要設置爲非阻塞?
    • PS:一個線程裏面只能有一個死循環,如今程序須要兩個死循環,so ==> 放一塊兒咯
  2. 斷開鏈接怎麼判斷?
    • PS:沒有消息是觸發異常,空消息是斷開鏈接
  3. client_socket爲何不用dict存放?
    • PS:dict在循環的過程當中,del會引起異常

1.Select

select和上面的有點相似,就是輪詢的過程交給了操做系統:

kernel會「監視」全部select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程

來個和上面等同的案例:

import select
import socket

def main():
    with socket.socket() as tcp_server:
        tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        tcp_server.bind(('', 8080))
        tcp_server.listen()
        socket_info_dict = dict()
        socket_list = [tcp_server]  # 監測列表
        while True:
            # 劣勢:select列表數量有限制
            read_list, write_list, error_list = select.select(
                socket_list, [], [])
            for item in read_list:
                # 服務端迎接新的鏈接
                if item == tcp_server:
                    client_socket, client_address = item.accept()
                    socket_list.append(client_socket)
                    socket_info_dict[client_socket] = client_address
                    print(f"[{client_address}已鏈接,當前鏈接數:{len(socket_list)-1}]")
                # 客戶端發來
                else:
                    data = item.recv(2048)
                    if data:
                        print(data.decode("utf-8"))
                        item.send(
                            b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
                        )
                    else:
                        item.close()
                        socket_list.remove(item)
                        info = socket_info_dict[item]
                        print(f"[{info}已斷開,當前鏈接數:{len(socket_list)-1}]")

if __name__ == "__main__":
    main()

輸出和上面同樣

擴展說明:

select 函數監視的文件描述符分3類,分別是writefdsreadfds、和exceptfds。調用後select函數會阻塞,直到有描述符就緒函數返回(有數據可讀、可寫、或者有except)或者超時(timeout指定等待時間,若是當即返回設爲null便可)

select的一個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024(64位=>2048)

而後Poll就出現了,就是把上限給去掉了,本質並沒變,仍是使用的輪詢

2.EPoll

epoll在內核2.6中提出(Linux獨有),使用一個文件描述符管理多個描述符,將用戶關心的文件描述符的事件存放到內核的一個事件表中,採用監聽回調的機制,這樣在用戶空間和內核空間的copy只需一次,避免再次遍歷就緒的文件描述符列表

先來看個案例吧:(輸出和上面同樣)

import socket
import select

def main():
    with socket.socket() as tcp_server:
        tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        tcp_server.bind(('', 8080))
        tcp_server.listen()

        # epoll是linux獨有的
        epoll = select.epoll()
        # tcp_server註冊到epoll中
        epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET)

        # key-value
        fd_socket_dict = dict()

        # 回調須要本身處理
        while True:
            # 返回可讀寫的socket fd 集合
            poll_list = epoll.poll()
            for fd, event in poll_list:
                # 服務器的socket
                if fd == tcp_server.fileno():
                    client_socket, client_addr = tcp_server.accept()
                    fd = client_socket.fileno()
                    fd_socket_dict[fd] = (client_socket, client_addr)
                    # 把客戶端註冊進epoll中
                    epoll.register(fd, select.EPOLLIN | select.EPOLLET)
                else:  # 客戶端
                    client_socket, client_addr = fd_socket_dict[fd]
                    data = client_socket.recv(2048)
                    print(
                        f"[來自{client_addr}的消息,當前鏈接數:{len(fd_socket_dict)}]\n")
                    if data:
                        print(data.decode("utf-8"))
                        client_socket.send(
                            b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
                        )
                    else:
                        del fd_socket_dict[fd]
                        print(
                            f"[{client_addr}已離線,當前鏈接數:{len(fd_socket_dict)}]\n"
                        )
                        # 從epoll中註銷
                        epoll.unregister(fd)
                        client_socket.close()

if __name__ == "__main__":
    main()

擴展:epoll的兩種工做模式

LT(level trigger,水平觸發)模式:當epoll_wait檢測到描述符就緒,將此事件通知應用程序,應用程序能夠不當即處理該事件。下次調用epoll_wait時,會再次響應應用程序並通知此事件。LT模式是默認的工做模式。
LT模式同時支持阻塞和非阻塞socket。

ET(edge trigger,邊緣觸發)模式:當epoll_wait檢測到描述符就緒,將此事件通知應用程序,應用程序必須當即處理該事件。若是不處理,下次調用epoll_wait時,不會再次響應應用程序並通知此事件。
ET是高速工做方式,只支持非阻塞socket(ET模式減小了epoll事件被重複觸發的次數,所以效率要比LT模式高)

Code提煉一下

  1. 實例化對象:epoll = select.epoll()
  2. 註冊對象:epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET)
  3. 註銷對象:epoll.unregister(fd)

PS:epoll不必定比Select性能高,通常都是分場景的:

  1. 高併發下,鏈接活躍度不高時:epoll比Select性能高(eg:web請求,頁面隨時關閉)
  2. 併發不高,鏈接活躍度比較高:Select更合適(eg:小遊戲)
  3. Select是win和linux通用的,而epoll只有linux有

其實IO多路複用還有一個kqueue,和epoll相似,下面的通用寫法中有包含


3.通用寫法(Selector

通常來講:Linux下使用epoll,Win下使用select(IO多路複用會這個通用的便可)

先看看Python源代碼:

# 選擇級別:epoll|kqueue|devpoll > poll > select
if 'KqueueSelector' in globals():
    DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
    DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
    DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
    DefaultSelector = PollSelector
else:
    DefaultSelector = SelectSelector

實戰案例:(可讀和可寫能夠不分開)

import socket
import selectors

# Linux下使用epoll,Win下使用select
Selector = selectors.DefaultSelector()

class Task(object):
    def __init__(self):
        # 存放客戶端fd和socket鍵值對
        self.fd_socket_dict = dict()

    def run(self):
        self.server = socket.socket()
        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server.bind(('', 8080))
        self.server.listen()
        # 把Server註冊到epoll
        Selector.register(self.server.fileno(), selectors.EVENT_READ,
                          self.connected)

    def connected(self, key):
        """客戶端鏈接時處理"""
        client_socket, client_address = self.server.accept()
        fd = client_socket.fileno()
        self.fd_socket_dict[fd] = (client_socket, client_address)
        # 註冊一個客戶端讀的事件(服務端去讀消息)
        Selector.register(fd, selectors.EVENT_READ, self.call_back_reads)
        print(f"{client_address}已鏈接,當前鏈接數:{len(self.fd_socket_dict)}")

    def call_back_reads(self, key):
        """客戶端可讀時處理"""
        # 一個fd只能註冊一次,監測可寫的時候須要把可讀給註銷
        Selector.unregister(key.fd)
        client_socket, client_address = self.fd_socket_dict[key.fd]
        print(f"[來自{client_address}的消息:]\n")
        data = client_socket.recv(2048)
        if data:
            print(data.decode("utf-8"))
            # 註冊一個客戶端寫的事件(服務端去發消息)
            Selector.register(key.fd, selectors.EVENT_WRITE,
                              self.call_back_writes)
        else:
            client_socket.close()
            del self.fd_socket_dict[key.fd]
            print(f"{client_address}已斷開,當前鏈接數:{len(self.fd_socket_dict)}")

    def call_back_writes(self, key):
        """客戶端可寫時處理"""
        Selector.unregister(key.fd)
        client_socket, client_address = self.fd_socket_dict[key.fd]
        client_socket.send(b"ok")
        Selector.register(key.fd, selectors.EVENT_READ, self.call_back_reads)

def main():
    t = Task()
    t.run()
    while True:
        ready = Selector.select()
        for key, obj in ready:
            # 須要本身回調
            call_back = key.data
            call_back(key)

if __name__ == "__main__":
    main()

Code提煉一下

  1. 實例化對象:Selector = selectors.DefaultSelector()
  2. 註冊對象:
    • Selector.register(server.fileno(), selectors.EVENT_READ, call_back)
    • Selector.register(server.fileno(), selectors.EVENT_WRITE, call_back)
  3. 註銷對象:Selector.unregister(key.fd)
  4. 注意一下:一個fd只能註冊一次,監測可寫的時候須要把可讀給註銷(反之同樣)

業餘拓展:

select, iocp, epoll,kqueue及各類I/O複用機制
https://blog.csdn.net/shallwake/article/details/5265287

kqueue用法簡介
http://www.cnblogs.com/luminocean/p/5631336.html
相關文章
相關標籤/搜索