IO阻塞模型(blocking IO)python
在linux中,默認狀況下全部的socket都是blocking,一個典型的讀操做流程大概是這樣:linux
因此,blocking IO的特色就是在IO執行的兩個階段(等待數據和拷貝數據兩個階段)都被block了。web
from socket import * server = socket(AF_INET,SOCK_STREAM) server.bind(('127.0.0.1',8080)) server.listen(5) while True: conn,addr = server.accept() print(addr) while True: try: data = conn.recv(1024) if not data:break conn.send(data.upper()) except ConnectionResetError: break conn.close()
from socket import * client = socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg = input('>>:').strip() if not msg:continue client.send(msg.encode('utf-8')) data = client.recv(1024) print(data.decode('utf-8')) client.close()
非阻塞IO模型服務器
Linux下,能夠經過設置socket使其變爲non-blocking。當對一個non-blocking socket執行讀操做時,流程是這個樣子:網絡
因此,在非阻塞式IO中,用戶進程實際上是須要不斷的主動詢問kernel數據準備好了沒有。app
# 1.對cpu的佔用率過多,可是是無用的佔用 # 2.在連接數過多的狀況下不能及時響應客戶端的消息 from socket import * server = socket(AF_INET,SOCK_STREAM) server.bind(('127.0.0.1',8080)) server.listen(5) server.setblocking(False) # 非阻塞型,默認爲阻塞型True conn_l = [] while True: try: conn,addr = server.accept() conn_l.append(conn) print(addr) except BlockingIOError: # print('幹其它活去了') # time.sleep(2) del_l = [] for conn in conn_l: try: data = conn.recv(1024) if not data: # 針對linux系統 conn.close() del_l.append(conn) continue conn.send(data.upper()) except BlockingIOError: pass except ConnectionResetError: conn.close() del_l.append(conn) for conn in del_l: conn_l.remove(conn)
from socket import * client = socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8081)) while True: msg = input('>>:').strip() if not msg:continue client.send(msg.encode('utf-8')) data = client.recv(1024) print(data.decode('utf-8')) client.close()
IO多路複用ssh
IO multiplexing這個詞可能有點陌生,可是若是我說select/epoll,大概就都能明白了。有些地方也稱這種IO方式爲事件驅動IO(event driven IO)。咱們都知道,select/epoll的好處就在於單個process就能夠同時處理多個網絡鏈接的IO。它的基本原理就是select/epoll這個function會不斷的輪詢所負責的全部socket,當某個socket有數據到達了,就通知用戶進程。它的流程如圖:socket
當用戶進程調用了select,那麼整個進程會被block,而同時,kernel會「監視」全部select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程。
這個圖和blocking IO的圖其實並無太大的不一樣,事實上還更差一些。由於這裏須要使用兩個系統調用(select和recvfrom),而blocking IO只調用了一個系統調用(recvfrom)。可是,用select的優點在於它能夠同時處理多個connection。ide
強調:函數
1. 若是處理的鏈接數不是很高的話,使用select/epoll的web server不必定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優點並非對於單個鏈接能處理得更快,而是在於能處理更多的鏈接。
2. 在多路複用模型中,對於每個socket,通常都設置成爲non-blocking,可是,如上圖所示,整個用戶的process實際上是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。
結論: select的優點在於能夠處理多個鏈接,不適用於單個鏈接
from socket import * import select server = socket(AF_INET,SOCK_STREAM) server.bind(('127.0.0.1',8080)) server.listen(5) server.setblocking(False) # 非阻塞型,默認爲阻塞型True read_l = [server,] print('strating....') while True: rl,wl,xl = select.select(read_l,[],[]) # 總體的返回值是一個元組,rl爲元組裏的一個列表 # print('===>',rl) # rl裏的值就是server對象或conn對象 for r in rl: if r is server: conn,addr = r.accept() read_l.append(conn) else: try: data = r.recv(1024) if not data: r.close() read_l.remove(r) r.send(data.upper()) except ConnectionResetError: r.close() read_l.remove(r)
from socket import * client = socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8081)) while True: msg = input('>>:').strip() if not msg:continue client.send(msg.encode('utf-8')) data = client.recv(1024) print(data.decode('utf-8')) client.close()
socketserver模塊
TCP
import socketserver class MyTCPHandler(socketserver.BaseRequestHandler): def handle(self): print('========?>',self.request) # self.request is conn while True: data = self.request.recv(1024) self.request.send(data.upper()) if __name__ == '__main__': # socketserver.ForkingTCPServer 這個模塊的多進程只能在linux上用 server = socketserver.ThreadingTCPServer(('127.0.0.1',8080),MyTCPHandler) server.serve_forever()
from socket import * client = socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8081)) while True: msg = input('>>:').strip() if not msg:continue client.send(msg.encode('utf-8')) data = client.recv(1024) print(data.decode('utf-8')) client.close()
UDP
import socketserver class MyTCPHandler(socketserver.BaseRequestHandler): def handle(self): print('========?>',self.request) # self.request 是一個元組,第一個值是客戶端發來的消息,第二個值是一個套接字對象 client_data=self.request[0] self.request[1].sendto(client_data.upper(),self.client_address) if __name__ == '__main__': # socketserver.ForkingTCPServer 這個模塊的多進程只能在linux上用 server = socketserver.ThreadingUDPServer(('127.0.0.1',8080),MyTCPHandler) server.serve_forever()
from socket import * client = socket(AF_INET,SOCK_DGRAM) while True: msg = input('>>:').strip() if not msg:continue client.sendto(msg.encode('utf-8'),('127.0.0.1',8080)) data,server_addr = client.recvfrom(1024) print(data.decode('utf-8')) client.close()
paramiko模塊
paramiko是一個用於作遠程控制的模塊,使用該模塊能夠對遠程服務器進行命令或文件操做,值得一說的是,fabric和ansible內部的遠程管理就是使用的paramiko來現實
下載安裝
pip3 install paramiko #在python3中
SSHClient
用於鏈接遠程服務器並執行基本命令
基於用戶名密碼鏈接:
import paramiko # 建立SSH對象 ssh = paramiko.SSHClient() # 容許鏈接不在know_hosts文件中的主機 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 鏈接服務器 ssh.connect(hostname='120.92.84.249', port=22, username='root', password='xxx') # 執行命令 stdin, stdout, stderr = ssh.exec_command('df') # 獲取命令結果 result = stdout.read() print(result.decode('utf-8')) # 關閉鏈接 ssh.close()
基於公鑰密鑰鏈接:
客戶端文件名:id_rsa
服務端必須有文件名:authorized_keys(在用ssh-keygen時,必須製做一個authorized_keys,能夠用ssh-copy-id來製做)
import paramiko private_key = paramiko.RSAKey.from_private_key_file('/tmp/id_rsa') # 建立SSH對象 ssh = paramiko.SSHClient() # 容許鏈接不在know_hosts文件中的主機 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 鏈接服務器 ssh.connect(hostname='120.92.84.249', port=22, username='root', pkey=private_key) # 執行命令 stdin, stdout, stderr = ssh.exec_command('df') # 獲取命令結果 result = stdout.read() print(result.decode('utf-8')) # 關閉鏈接 ssh.close()
SFTPClient
用於鏈接遠程服務器並執行上傳下載
基於用戶名密碼上傳下載
import paramiko transport = paramiko.Transport(('120.92.84.249',22)) transport.connect(username='root',password='xxx') sftp = paramiko.SFTPClient.from_transport(transport) # 將location.py 上傳至服務器 /tmp/test.py sftp.put('/tmp/id_rsa', '/etc/test.rsa') # 將remove_path 下載到本地 local_path sftp.get('remove_path', 'local_path') transport.close()