目錄python
同步和異步關注的是程序在執行時的狀態:linux
同步
,能夠理解爲在執行完一個函數或方法以後,一直等待系統返回值或消息,這時程序是出於阻塞的,只有接收到返回的值或消息後才往下執行其餘的命令。異步
,執行完函數或方法後,沒必要阻塞性地等待返回值或消息,只須要向系統委託一個異步過程,那麼當系統接收到返回值或消息時,系統會自動觸發委託的異步過程,從而完成一個完整的流程。
windows
異步如收發收短信,對比打電話,打電話我必定要在電話的旁邊聽着,保證雙方都在線,而收發短信,對方不用保證此刻我必定在手機旁,同時,我也不用時刻留意手機有沒有來短信。這樣的話,我看着視頻,而後來了短信,我就處理短信(也能夠不處理),接着再看視頻。網絡
對於寫程序,同步每每會阻塞,沒有數據過來,我就等着,異步則不會阻塞,沒數據來我幹別的事,有數據來去處理這些數據。一句話總結一下就是:函數或方法被調用時,調用者是否獲得最終結果的
。數據結構
阻塞和非阻塞關注的是程序在等待調用結果(消息,返回值)時的狀態.架構
一句話總結一下就是:函數或方法被調用時,是否馬上返回
cors
linux系統中,全部的設備讀寫均可以看作文件的讀寫來操做,對文件的讀寫通常要通過內核態和用戶態的切換,正由於有切換才致使了IO有同步和異步的說法。異步
在i386以前,CPU工做在實模式下,以後開始支持保護模式,一般用保護環(ring)來描述特權級,分爲四個運行級別:Ring0 ~ Ring3.socket
實模下,軟件能夠直接訪問BIOS例程以及周邊硬件,沒有任何硬件等級的存儲器保護觀念或多任務。函數
CPU 在某個時刻運行在特定的特權級,等級約束了CPU了能夠作什麼,不能夠作什麼。x86(如今最流行的PC/Server CPU架構) CPU 只用了兩個特權級:0 和 3:
Ring 0
: 能夠執行特權指令,能夠訪問說有級別數據,能夠訪問IO設備等(級別最高)Ring 3
:只能訪問本級別數據(級別最低)針對於Linux來講,
內核態
,運行內核代碼用戶態
,運行用戶代碼當用戶的應用程序想訪問某些硬件資源時,就須要經過操做系統提供的 系統調用 ,系統調用可使用特權指令運行在內核空間,此時進程陷入內核態運行。系統調用完成,進程將回到用戶態繼續執行用戶空間代碼。
現代操做系統採用虛擬存儲器,對於32位操做系統來講,進程對虛擬內存地址的內存尋址空間爲4G(2^32)。操做系統中,內核程序獨立且運行在較高的特權級別上,它們駐留在被保護的內存空間上,擁有訪問硬件設備的權限,這部份內存稱爲內核空間(內核態,最高1G)。
一般來說IO能夠分紅兩種類型:
IO過程能夠分爲兩個階段:
主要分爲同步IO和異步IO,而同步IO又能夠分爲:同步阻塞IO、同步非阻塞IO、IO多路複用。
進程等待(阻塞),直到讀寫完成。(全程等待
)
進程調用read操做,若是IO設備沒準備好,當即返回ERROR,進程不阻塞。用戶能夠再次發起系統調用,若是內核已經準備好,就阻塞,而後複製數據到用戶空間
所謂IO多路複用,就是同時監控多個IO,有一個準備好了,就不須要等了當即開始處理,提升了同時處理IO的能力。主要的IO多路複用有:
通常狀況下,select最多能監聽1024個fd(能夠修改,但不建議),可是因爲select採用輪詢的方式,當管理的IO多了,每次都要遍歷所有fd,效率低下。epoll沒有管理的fd的上限,且是回調機制,不須要遍歷,效果很高。
進程發起異步IO請求,當即返回。內核完成IO的兩個階段,後給進程發信號
Linux的aio的系統調用,內核從2.6版本開始支持。
Python的select庫實現了select、poll系統調用,這個基本上操做系統都支持,部分實現了epoll,它是底層的IO多路複用模塊。
epoll使用事件通知機制,使用回調機制提升效率
select/pool 還有從內核空間複製消息到用戶空間,而epoll經過內核空間和用戶空間共享一塊內存來減小複製。
Python 3.4 提供 selectors庫,高級IO複用庫。它的類層次結構以下
BaseSelector +-- SelectSelector +-- PollSelector +-- EpollSelector +-- DevpollSelector +-- KqueueSelector
觀察模塊原碼倒數幾行
咱們知道,selecors.DefaultSelector會返回當前平臺最有效、性能最高的實現,可是因爲沒有實現 Windows下的IOCP,因此windows下只能退化爲select。
selectorsobj.register爲當前selectors實例註冊一個文件對象,監視它的IO實現,返回一個selectKey對象。它的參數以下:
register(self, fileobj, events, data=None):
經常使用的Event事件
返回的selectKey對象對象具備如下方法:
下面是代碼:
import socket import selectors import logging import threading import time FORMAT = '%(asctime)s %(message)s' logging.basicConfig(level=logging.INFO, format=FORMAT) class EchoServer: def __init__(self, ip, port): self.ip = ip self.port = port self.sock = socket.socket() self.selector = selectors.DefaultSelector() self.evnet = threading.Event() def start(self): self.sock.bind((self.ip, self.port)) self.sock.listen() self.sock.setblocking(False) # 事件被觸發,說明有連接進來,那麼不須要阻塞等待 self.selector.register(self.sock, selectors.EVENT_READ, data=self.accept) # 註冊accept函數 threading.Thread(target=self.select, name='select', daemon=True).start() # 啓動監視進程 # 啓動selector,用於監視事件的發生 def select(self): while not self.evnet.is_set(): events = self.selector.select() for key, event in events: key.data(key.fileobj) def accept(self, sock: socket.socket): sock, client = sock.accept() # 將client的讀寫也加入到監事列表中 self.selector.register(sock, selectors.EVENT_READ, self.recv) # 當對應的socket有寫操做,會直接直接觸發執行,因此這裏根本不須要死循環 def recv(self, sock: socket.socket): client_ip = sock.getpeername() data = sock.recv(1024) if data == b'quit' or data == b'': self.evnet.set() sock.close() return msg = '{}:{} {}'.format(*client_ip, data.decode()).encode() sock.send(msg) # 關閉進程時,將selectors中註冊的事件取消掉,再關閉監視器 def stop(self): self.evnet.set() event_list = set() for event in self.selector.get_map(): event_list.add(event) for event in event_list: self.selector.unregister(event) self.selector.close() if __name__ == '__main__': es = EchoServer('127.0.0.1', 9999) es.start() while True: cmd = input('>>>:').strip() if cmd == 'quit': es.stop() break
這裏是每次client進來時,經過記錄client的socket來完成的。
import socket import selectors import logging import threading FORMAT = '%(asctime)s %(message)s' logging.basicConfig(level=logging.INFO, format=FORMAT) class ChatSocketServer: def __init__(self, ip, port): self.ip = ip self.port = port self.sock = socket.socket() self.selector = selectors.DefaultSelector() self.event = threading.Event() self.clients = {} # 用於記錄鏈接的client def start(self): self.sock.bind((self.ip, self.port)) self.sock.listen() self.sock.setblocking(False) # 事件被觸發,說明有連接進來,那麼不須要阻塞等待 self.selector.register(self.sock, selectors.EVENT_READ, data=self.accept) # 註冊accept函數 threading.Thread(target=self.select, name='select', daemon=True).start() # 啓動監視進程 # 啓動selector,用於監視事件的發生 def select(self): while not self.event.is_set(): events = self.selector.select() for key, event in events: key.data(key.fileobj) def accept(self, sock: socket.socket): sock, client = sock.accept() # 添加已鏈接客戶端列表 self.clients[client] = sock # 將client的讀寫也加入到監事列表中 self.selector.register(sock, selectors.EVENT_READ, self.recv) # 當對應的socket有寫操做,會直接直接觸發執行,因此這裏根本不須要死循環 def recv(self, sock: socket.socket): client_ip = sock.getpeername() data = sock.recv(1024) if data == b'quit' or data == b'': self.clients.pop(client_ip) # 退出後彈出client地址 self.event.set() sock.close() return msg = '{}:{} {}'.format(*client_ip, data.decode()) logging.info(msg) for clients in self.clients.values(): clients.send(msg.encode()) # 關閉進程時,將selectors中註冊的事件取消掉,再關閉監視器 def stop(self): self.event.set() event_list = set() for event in self.selector.get_map(): event_list.add(event) for event in event_list: self.selector.unregister(event) for client in self.clients.values(): # 關閉因此已鏈接的client的socket client.close() self.selector.close() if __name__ == '__main__': es = ChatSocketServer('127.0.0.1', 9999) es.start() while True: cmd = input('>>>:').strip() if cmd == 'quit': es.stop() break
固然還能夠經過selector來處理,爲何呢?由於每當有請求進來,selector都會監視當前鏈接的recv,那麼咱們只須要在selector中的recv拿出來,就知道到底有多少鏈接了。
import socket import selectors import logging import threading FORMAT = '%(asctime)s %(message)s' logging.basicConfig(level=logging.INFO, format=FORMAT) class ChatSocketServer: def __init__(self, ip, port): self.ip = ip self.port = port self.sock = socket.socket() self.selector = selectors.DefaultSelector() self.event = threading.Event() def start(self): self.sock.bind((self.ip, self.port)) self.sock.listen() self.sock.setblocking(False) # 事件被觸發,說明有連接進來,那麼不須要阻塞等待 self.selector.register(self.sock, selectors.EVENT_READ, data=self.accept) # 註冊accept函數 threading.Thread(target=self.select, name='select', daemon=True).start() # 啓動監視進程 # 啓動selector,用於監視事件的發生 def select(self): while not self.event.is_set(): events = self.selector.select() for key, event in events: key.data(key.fileobj) def accept(self, sock: socket.socket): sock, client = sock.accept() # 將client的讀寫也加入到監事列表中 self.selector.register(sock, selectors.EVENT_READ, self.recv) # 當對應的socket有寫操做,會直接直接觸發執行,因此這裏根本不須要死循環 def recv(self, sock: socket.socket): client_ip = sock.getpeername() data = sock.recv(1024) if data == b'quit' or data == b'': self.selector.unregister(sock) # 客戶端退出,則取消監控當前 socket 事件 sock.close() return msg = '{}:{} {}'.format(*client_ip, data.decode()) logging.info(msg) # 羣發消息,若是data綁定的是recv(排除accept),那麼就經過socket羣發消息 for sock in self.selector.get_map().values(): if sock.data == self.recv: sock.fileobj.send(msg.encode()) # 關閉進程時,將selectors中註冊的事件取消掉,再關閉監視器 def stop(self): self.event.set() event_list = set() for event in self.selector.get_map(): event_list.add(event) for event in event_list: self.selector.unregister(event) self.selector.close() if __name__ == '__main__': es = ChatSocketServer('127.0.0.1', 9999) es.start() while True: cmd = input('>>>:').strip() if cmd == 'quit': es.stop() break