39 - 同步-異步-IO多路複用

1 同步與異步

同步和異步關注的是程序在執行時的狀態:linux

  • 同步,能夠理解爲在執行完一個函數或方法以後,一直等待系統返回值或消息,這時程序是出於阻塞的,只有接收到返回的值或消息後才往下執行其餘的命令。
  • 異步,執行完函數或方法後,沒必要阻塞性地等待返回值或消息,只須要向系統委託一個異步過程,那麼當系統接收到返回值或消息時,系統會自動觸發委託的異步過程,從而完成一個完整的流程。
    windows

    例如:

  • 同步如打電話,通訊雙方不能斷(咱們是同時進行,同步),你一句我一句,這樣的好處是,對方想表達的信息我立刻能收到,可是,我在打着電話,我沒法作別的事情。
  • 異步如收發收短信,對比打電話,打電話我必定要在電話的旁邊聽着,保證雙方都在線,而收發短信,對方不用保證此刻我必定在手機旁,同時,我也不用時刻留意手機有沒有來短信。這樣的話,我看着視頻,而後來了短信,我就處理短信(也能夠不處理),接着再看視頻。網絡

        對於寫程序,同步每每會阻塞,沒有數據過來,我就等着,異步則不會阻塞,沒數據來我幹別的事,有數據來去處理這些數據。一句話總結一下就是:函數或方法被調用時,調用者是否獲得最終結果的數據結構

  • 直接獲得最終結果的就是同步調用;
  • 獲得中間結果而非最終結果的,就是異步調用

2 阻塞與非阻塞

阻塞和非阻塞關注的是程序在等待調用結果(消息,返回值)時的狀態.架構

  • 阻塞調用是指調用結果返回以前,當前線程會被掛起。調用線程只有在獲得結果以後纔會返回。
  • 非阻塞調用指在不能馬上獲得結果以前,該調用不會阻塞當前線程。

一句話總結一下就是:函數或方法被調用時,是否馬上返回cors

  • 當即返回就是非阻塞調用
  • 不當即返回就是阻塞調用

3 什麼是IO

linux系統中,全部的設備讀寫均可以看作文件的讀寫來操做,對文件的讀寫通常要通過內核態和用戶態的切換,正由於有切換才致使了IO有同步和異步的說法。異步

3.1 內核態用戶態

在i386以前,CPU工做在實模式下,以後開始支持保護模式,一般用保護環(ring)來描述特權級,分爲四個運行級別:Ring0 ~ Ring3.socket

實模下,軟件能夠直接訪問BIOS例程以及周邊硬件,沒有任何硬件等級的存儲器保護觀念或多任務。函數

CPU 在某個時刻運行在特定的特權級,等級約束了CPU了能夠作什麼,不能夠作什麼。x86(如今最流行的PC/Server CPU架構) CPU 只用了兩個特權級:0 和 3:

  • Ring 0: 能夠執行特權指令,能夠訪問說有級別數據,能夠訪問IO設備等(級別最高)
  • Ring 3:只能訪問本級別數據(級別最低)

針對於Linux來講,

  • ring0 就是內核態,運行內核代碼
  • ring3 就是用戶態,運行用戶代碼

當用戶的應用程序想訪問某些硬件資源時,就須要經過操做系統提供的 系統調用 ,系統調用可使用特權指令運行在內核空間,此時進程陷入內核態運行。系統調用完成,進程將回到用戶態繼續執行用戶空間代碼。

現代操做系統採用虛擬存儲器,對於32位操做系統來講,進程對虛擬內存地址的內存尋址空間爲4G(2^32)。操做系統中,內核程序獨立且運行在較高的特權級別上,它們駐留在被保護的內存空間上,擁有訪問硬件設備的權限,這部份內存稱爲內核空間(內核態,最高1G)。

3.2 IO兩個階段

一般來說IO能夠分紅兩種類型:

  • 來自網絡的IO
  • 來自文件或者設備的IO

IO過程能夠分爲兩個階段:

  • 數據準備階段(內核從IO設備讀寫數據)
  • 內核空間複製回用戶空間進程緩衝區階段(進程從內核複製數據)

3.3 IO模型

主要分爲同步IO和異步IO,而同步IO又能夠分爲:同步阻塞IO、同步非阻塞IO、IO多路複用。

3.3.1 同步阻塞IO

進程等待(阻塞),直到讀寫完成。(全程等待)
zuse

3.3.2 同步非阻塞IO

進程調用read操做,若是IO設備沒準備好,當即返回ERROR,進程不阻塞。用戶能夠再次發起系統調用,若是內核已經準備好,就阻塞,而後複製數據到用戶空間

  1. 第一階段數據沒有準備好,就先忙別的,等會再來看看。檢查數據是否準備好了的過程是非阻塞的。
  2. 第二階段是阻塞的,即內核空間和用戶空間之間複製數據是阻塞的。

feizuse

3.3.3 IO多路複用

所謂IO多路複用,就是同時監控多個IO,有一個準備好了,就不須要等了當即開始處理,提升了同時處理IO的能力。主要的IO多路複用有:

  • select: 幾乎全部操做系統平臺都支持,poll是對select的升級
  • epoll:Linux系統內核2.5+開始支持,對select和poll的加強,在監視的基礎上,增長回調機制。(BSD、Mac平臺有kqueue,Windows有iocp)

ioduolu

例如:

  • select:食堂供應不少菜(衆多IO),你須要吃某三菜一湯,大師傅(操做系統)說要現作,須要等。你只好等待大師傅叫,其中同樣菜好了,大師傅叫你,說你點的菜有的好了,你得本身遍歷找找看哪同樣好了。請服務員打給你。
  • epool:是有菜準備好了,大師傅喊你去幾號窗口直接打菜,不用本身找菜了。

通常狀況下,select最多能監聽1024個fd(能夠修改,但不建議),可是因爲select採用輪詢的方式,當管理的IO多了,每次都要遍歷所有fd,效率低下。epoll沒有管理的fd的上限,且是回調機制,不須要遍歷,效果很高。

3.3.4 異步IO

進程發起異步IO請求,當即返回。內核完成IO的兩個階段,後給進程發信號
yibu

Linux的aio的系統調用,內核從2.6版本開始支持。

4 Python中的IO多路複用

Python的select庫實現了select、poll系統調用,這個基本上操做系統都支持,部分實現了epoll,它是底層的IO多路複用模塊。

  • select維護一個文件描述符數據結構,單個進程使用於上限,一般是1024,線性掃描這個數據結構,效率低。
  • pool和select的區別是內部數據結構使用鏈表,沒有這個最大限制,可是依然是線性遍歷才知道哪一個設備就緒了。
  • epoll使用事件通知機制,使用回調機制提升效率

    select/pool 還有從內核空間複製消息到用戶空間,而epoll經過內核空間和用戶空間共享一塊內存來減小複製。

4.1 selectors庫

Python 3.4 提供 selectors庫,高級IO複用庫。它的類層次結構以下

BaseSelector
+-- SelectSelector
+-- PollSelector
+-- EpollSelector
+-- DevpollSelector
+-- KqueueSelector

觀察模塊原碼倒數幾行咱們知道,selecors.DefaultSelector會返回當前平臺最有效、性能最高的實現,可是因爲沒有實現 Windows下的IOCP,因此windows下只能退化爲select。

4.2 register方法

selectorsobj.register爲當前selectors實例註冊一個文件對象,監視它的IO實現,返回一個selectKey對象。它的參數以下:

register(self, fileobj, events, data=None):
  • fileobj: 被監視的文件對象,好比socket對象
  • events: 事件,該文件對象必須等待的事件
  • data:可選的與此文件對象關聯的不透明的屬性,例如:關聯用來存儲每一個客戶端的會話ID,關聯方法。經過這個參數在關注的事件產生後讓selector幹什麼事。

經常使用的Event事件

  • EVENT_READ: 可讀0b01,內核已經準備好輸入輸出設備,能夠開始讀了。
  • EVENT_WRITE: 可寫0b10,內核準備好了,能夠往裏寫了。

返回的selectKey對象對象具備如下方法:

  • fileobj: 註冊的文件對象(socket)
  • fd:文件描述符
  • events:等待fd標識的文件對象觸發的事件類型
  • data:註冊時關聯的數據(標識回調函數)

4.3 利用selectors完成IO多路複用版本的EchoServer

下面是代碼:

import socket
import selectors
import logging
import threading
import time

FORMAT = '%(asctime)s %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)

class EchoServer:

    def __init__(self, ip, port):
        self.ip = ip
        self.port = port
        self.sock = socket.socket()
        self.selector = selectors.DefaultSelector()
        self.evnet = threading.Event()

    def start(self):
        self.sock.bind((self.ip, self.port))
        self.sock.listen()
        self.sock.setblocking(False)   # 事件被觸發,說明有連接進來,那麼不須要阻塞等待
        self.selector.register(self.sock, selectors.EVENT_READ, data=self.accept)  # 註冊accept函數
        threading.Thread(target=self.select, name='select', daemon=True).start()   # 啓動監視進程


    # 啓動selector,用於監視事件的發生
    def select(self):
        while not self.evnet.is_set():
            events = self.selector.select()
            for key, event in events:
                key.data(key.fileobj)


    def accept(self, sock: socket.socket):
        sock, client = sock.accept()
        # 將client的讀寫也加入到監事列表中
        self.selector.register(sock, selectors.EVENT_READ, self.recv)

    # 當對應的socket有寫操做,會直接直接觸發執行,因此這裏根本不須要死循環
    def recv(self, sock: socket.socket):
        client_ip = sock.getpeername()
        data = sock.recv(1024)
        if data == b'quit' or data == b'':
            self.evnet.set()
            sock.close()
            return
        msg = '{}:{} {}'.format(*client_ip, data.decode()).encode()
        sock.send(msg)

    # 關閉進程時,將selectors中註冊的事件取消掉,再關閉監視器
    def stop(self):
        self.evnet.set()
        event_list = set()
        for event in self.selector.get_map():
            event_list.add(event)
        for event in event_list:
            self.selector.unregister(event)
        self.selector.close()


if __name__ == '__main__':
    es = EchoServer('127.0.0.1', 9999)
    es.start()

    while True:
        cmd = input('>>>:').strip()
        if cmd == 'quit':
            es.stop()
            break

4.4 聊天室

這裏是每次client進來時,經過記錄client的socket來完成的。

import socket
import selectors
import logging
import threading

FORMAT = '%(asctime)s %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)


class ChatSocketServer:

    def __init__(self, ip, port):
        self.ip = ip
        self.port = port
        self.sock = socket.socket()
        self.selector = selectors.DefaultSelector()
        self.event = threading.Event()
        self.clients = {}   # 用於記錄鏈接的client

    def start(self):
        self.sock.bind((self.ip, self.port))
        self.sock.listen()
        self.sock.setblocking(False)  # 事件被觸發,說明有連接進來,那麼不須要阻塞等待
        self.selector.register(self.sock, selectors.EVENT_READ, data=self.accept)  # 註冊accept函數
        threading.Thread(target=self.select, name='select', daemon=True).start()  # 啓動監視進程

    # 啓動selector,用於監視事件的發生
    def select(self):
        while not self.event.is_set():
            events = self.selector.select()
            for key, event in events:
                key.data(key.fileobj)

    def accept(self, sock: socket.socket):
        sock, client = sock.accept()

        # 添加已鏈接客戶端列表
        self.clients[client] = sock

        # 將client的讀寫也加入到監事列表中
        self.selector.register(sock, selectors.EVENT_READ, self.recv)

    # 當對應的socket有寫操做,會直接直接觸發執行,因此這裏根本不須要死循環
    def recv(self, sock: socket.socket):
        client_ip = sock.getpeername()
        data = sock.recv(1024)
        if data == b'quit' or data == b'':
            self.clients.pop(client_ip)   # 退出後彈出client地址
            self.event.set()
            sock.close()
            return
        msg = '{}:{} {}'.format(*client_ip, data.decode())
        logging.info(msg)
        for clients in self.clients.values():
            clients.send(msg.encode())

    # 關閉進程時,將selectors中註冊的事件取消掉,再關閉監視器
    def stop(self):
        self.event.set()
        event_list = set()
        for event in self.selector.get_map():
            event_list.add(event)
        for event in event_list:
            self.selector.unregister(event)
        for client in self.clients.values():    # 關閉因此已鏈接的client的socket
            client.close()
        self.selector.close()


if __name__ == '__main__':
    es = ChatSocketServer('127.0.0.1', 9999)
    es.start()

    while True:
        cmd = input('>>>:').strip()
        if cmd == 'quit':
            es.stop()
            break

固然還能夠經過selector來處理,爲何呢?由於每當有請求進來,selector都會監視當前鏈接的recv,那麼咱們只須要在selector中的recv拿出來,就知道到底有多少鏈接了。

import socket
import selectors
import logging
import threading

FORMAT = '%(asctime)s %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)


class ChatSocketServer:

    def __init__(self, ip, port):
        self.ip = ip
        self.port = port
        self.sock = socket.socket()
        self.selector = selectors.DefaultSelector()
        self.event = threading.Event()

    def start(self):
        self.sock.bind((self.ip, self.port))
        self.sock.listen()
        self.sock.setblocking(False)  # 事件被觸發,說明有連接進來,那麼不須要阻塞等待
        self.selector.register(self.sock, selectors.EVENT_READ, data=self.accept)  # 註冊accept函數
        threading.Thread(target=self.select, name='select', daemon=True).start()  # 啓動監視進程

    # 啓動selector,用於監視事件的發生
    def select(self):
        while not self.event.is_set():
            events = self.selector.select()
            for key, event in events:
                key.data(key.fileobj)

    def accept(self, sock: socket.socket):
        sock, client = sock.accept()

        # 將client的讀寫也加入到監事列表中
        self.selector.register(sock, selectors.EVENT_READ, self.recv)

    # 當對應的socket有寫操做,會直接直接觸發執行,因此這裏根本不須要死循環
    def recv(self, sock: socket.socket):
        client_ip = sock.getpeername()
        data = sock.recv(1024)
        if data == b'quit' or data == b'':
            self.selector.unregister(sock)   # 客戶端退出,則取消監控當前 socket 事件
            sock.close()
            return
        msg = '{}:{} {}'.format(*client_ip, data.decode())
        logging.info(msg)

        # 羣發消息,若是data綁定的是recv(排除accept),那麼就經過socket羣發消息
        for sock in self.selector.get_map().values():
            if sock.data == self.recv:
                sock.fileobj.send(msg.encode())

    # 關閉進程時,將selectors中註冊的事件取消掉,再關閉監視器
    def stop(self):
        self.event.set()
        event_list = set()
        for event in self.selector.get_map():
            event_list.add(event)
        for event in event_list:
            self.selector.unregister(event)
        self.selector.close()


if __name__ == '__main__':
    es = ChatSocketServer('127.0.0.1', 9999)
    es.start()

    while True:
        cmd = input('>>>:').strip()
        if cmd == 'quit':
            es.stop()
            break
相關文章
相關標籤/搜索