引子: 算法
套接字起源於20世紀70年代加利福尼亞大學伯克利分校版本的Unix,即人們所說的BSD Unix。所以,有時人們也把套接字成爲「伯克利套接字」或「BSD套接字」。一開始,套接字被設計用在一臺主機上多個應用程序之間的通訊,這也被稱做進程間通許或IPC。套接字有兩種(或者稱爲兩個種族),分別是基於文件型和就網絡型。
還有AF_INET6被用於ipv6,還有一些其餘的地址家族,不過,他們要麼是隻用於某個平臺,要麼就是已經被廢棄,或者是不多被使用,或者是根本沒有實現,全部地址家族中,AF_INET是使用最普遍的一個,Python支持不少地址家族,可是因爲咱們只關心網絡編程,因此大部分時候咱們只使用AF_INET(AF:Address Family;INET:Internet)
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import socket 4 5 phone = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 買手機;socket.AF_INET:基於網絡協議;socket.SOCK_STREAM:基於流的TCP協議 6 phone.bind(('', 8080)) # 綁定手機卡;元祖形式,ip地址+端口 7 # 注:服務器的ip地址寫本機的ip地址 8 phone.listen(5) # 開機 9 conn, addr = phone.accept() # 等電話 10 msg = conn.recv(1024) # 收消息 11 print('客戶端發來的消息是:', msg) 12 conn.send(msg.upper()) # 發消息 13 conn.close() 14 phone.close()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import socket 4 phone = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 買手機 5 phone.connect(('', 8080)) # 撥電話 6 # 注:客戶端的ip地址,寫服務器端的ip地址 7 phone.send('hello'.encode('utf-8')) # 發消息 8 data = phone.recv(1024) # 收消息 9 print('收到服務端發來的消息', data)
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # import socket 4 from socket import * 5 import time 6 ip_port = ('', 8080) 7 back_log = 5 8 buffer_size = 1024 9 tcp_server = socket(AF_INET, SOCK_STREAM) 10 tcp_server.bind(ip_port) 11 tcp_server.listen(back_log) 12 print('服務端開始運行') 13 conn, addr = tcp_server.accept() # 服務器阻塞 14 print('雙向連接', conn) 15 print('客戶端地址', addr) 16 while True: 17 time.sleep(1) 18 print('[%s]' % time.time()) 19 data = conn.recv(buffer_size) 20 print('客戶端發來的消息是', data.decode('utf-8')) 21 conn.send(data.upper()) 22 conn.close() 23 tcp_server.close()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from socket import * 4 import time 5 ip_port = ('', 8080) 6 back_log = 5 7 buffer_size = 1024 8 tcp_client = socket(AF_INET, SOCK_STREAM) 9 tcp_client.connect(ip_port) 10 while True: 11 time.sleep(2) 12 print('[%s]' % time.time()) 13 msg = input('>>:').strip() 14 tcp_client.send(msg.encode('utf-8')) 15 print('客戶端已經發送消息') 16 data = tcp_client.recv(buffer_size) 17 print('收到服務端發來消息', data.decode('utf-8')) 18 tcp_client.close()
若重啓服務端時,可能會遇到:Address already in use;這個是因爲服務端扔然存在四次揮手的time_wait狀態佔用地址
1 # 加入一條socket配置,重用ip和端口 2 tcp_server = socket(AF_INET, SOCK_STREAM) 3 tcp_server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) # <---就是這條,在bind前加 4 tcp_server.bind(ip_port)
1 發現系統存在大量TIME_WAIT狀態的鏈接,經過調整linux內核參數解決, 2 vi /etc/sysctl.conf 3 4 編輯文件,加入如下內容: 5 net.ipv4.tcp_syncookies = 1 6 net.ipv4.tcp_tw_reuse = 1 7 net.ipv4.tcp_tw_recycle = 1 8 net.ipv4.tcp_fin_timeout = 30 9 10 而後執行 /sbin/sysctl -p 讓參數生效。 11 12 net.ipv4.tcp_syncookies = 1 表示開啓SYN Cookies。當出現SYN等待隊列溢出時,啓用cookies來處理,可防範少許SYN攻擊,默認爲0,表示關閉; 13 14 net.ipv4.tcp_tw_reuse = 1 表示開啓重用。容許將TIME-WAIT sockets從新用於新的TCP鏈接,默認爲0,表示關閉; 15 16 net.ipv4.tcp_tw_recycle = 1 表示開啓TCP鏈接中TIME-WAIT sockets的快速回收,默認爲0,表示關閉。 17 18 net.ipv4.tcp_fin_timeout 修改系統默認的 TIMEOUT 時間
1 ss = socket() # 建立一個服務器的套接字 2 ss.bind() # 綁定服務器套接字 3 while True: # 服務器無限循環 4 cs = ss.recvfrom()/ss.sendto() # 對話(接收與發送) 5 ss.close() # 關閉服務器套接字
1 cs = socket() # 建立客戶套接字 2 while True: 3 cs.sendto()/cs.recvfrom() # 對話(發送/接收) 4 cs.close() # 關閉客戶套接字
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from socket import * 4 ip_port = ('', 8080) 5 buffer_size = 1024 6 udp_server = socket(AF_INET, SOCK_DGRAM) # SOCK_DGRAM:數據報式套接字 7 udp_server.bind(ip_port) 8 while True: 9 # data = udp_server.recvfrom(buffer_size) 10 # print(data) # (b'hello', ('', 65047)) 11 data, addr = udp_server.recvfrom(buffer_size) 12 print(data) 13 print(addr) 14 udp_server.sendto(data.upper(), addr)
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from socket import * 4 ip_port = ('', 8080) 5 buffer_size = 1024 6 udp_client = socket(AF_INET, SOCK_DGRAM) # SOCK_DGRAM:數據報式套接字 7 while True: 8 msg = input('>>').strip() 9 udp_client.sendto(msg.encode('utf-8'), ip_port) 10 data, addr = udp_client.recvfrom(buffer_size) 11 print(data.decode('utf-8')) 12 print(addr)
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from socket import * 4 import subprocess 5 6 ip_port = ('', 8080) 7 back_log = 5 8 buffer_size = 1024 9 tcp_server = socket(AF_INET, SOCK_STREAM) 10 tcp_server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) 11 tcp_server.bind(ip_port) 12 tcp_server.listen(back_log) 13 while True: 14 conn, addr = tcp_server.accept() 15 while True: 16 try: 17 cmd = conn.recv(buffer_size) 18 # if not cmd:break 19 # 執行命令,獲得命令的運行結果cmd_res 20 res = subprocess.Popen(cmd.decode('utf-8'), shell=True, 21 stderr=subprocess.PIPE, 22 stdout=subprocess.PIPE, 23 stdin=subprocess.PIPE 24 ) 25 err = res.stderr.read() 26 if err: 27 cmd_res = err 28 else: 29 cmd_res = res.stdout.read() 30 if not cmd_res: 31 cmd_res = 'excute success'.encode('utf-8') 32 length = len(cmd_res) 33 conn.send(str(length).encode('utf-8')) 34 client_ready = conn.recv(buffer_size) 35 if client_ready == b'ready': 36 conn.send(cmd_res) 37 38 except Exception as EX: 39 print(EX) 40 break 41 conn.close()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from socket import * 4 5 ip_port = ('', 8080) 6 back_log = 5 7 buffer_size = 1024 8 tcp_client = socket(AF_INET, SOCK_STREAM) 9 tcp_client.connect(ip_port) 10 while True: 11 cmd = input('>>>').strip() 12 if not cmd:continue 13 if cmd == 'quit':break 14 tcp_client.send(cmd.encode('utf-8')) 15 length = tcp_client.recv(buffer_size) 16 tcp_client.send(b'ready') 17 length = int(length.decode('utf-8')) 18 recv_size = 0 19 recv_msg = b'' 20 while recv_size < length: 21 # 第一種寫法 22 # r_m = tcp_client.recv(buffer_size) 23 # recv_msg += r_m 24 # recv_size += len(r_m) 25 26 # 第二種寫法 27 recv_msg += tcp_client.recv(buffer_size) 28 recv_size = len(recv_msg) 29 print(recv_msg.decode('gbk')) 30 tcp_client.close()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from socket import * 4 import subprocess 5 import struct 6 7 ip_port = ('', 8080) 8 back_log = 5 9 buffer_size = 1024 10 tcp_server = socket(AF_INET, SOCK_STREAM) 11 tcp_server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) 12 tcp_server.bind(ip_port) 13 tcp_server.listen(back_log) 14 while True: 15 conn, addr = tcp_server.accept() 16 while True: 17 try: 18 cmd = conn.recv(buffer_size) 19 # if not cmd:break 20 # 執行命令,獲得命令的運行結果cmd_res 21 res = subprocess.Popen(cmd.decode('utf-8'), shell=True, 22 stderr=subprocess.PIPE, 23 stdout=subprocess.PIPE, 24 stdin=subprocess.PIPE 25 ) 26 err = res.stderr.read() 27 if err: 28 cmd_res = err 29 else: 30 cmd_res = res.stdout.read() 31 if not cmd_res: 32 cmd_res = 'excute success'.encode('utf-8') 33 length = len(cmd_res) 34 data_length = struct.pack('i', length) 35 conn.send(data_length) 36 conn.send(cmd_res) 37 38 except Exception as EX: 39 print(EX) 40 break 41 conn.close()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from socket import * 4 import struct 5 6 ip_port = ('', 8080) 7 back_log = 5 8 buffer_size = 1024 9 tcp_client = socket(AF_INET, SOCK_STREAM) 10 tcp_client.connect(ip_port) 11 while True: 12 cmd = input('>>>').strip() 13 if not cmd:continue 14 if cmd == 'quit':break 15 tcp_client.send(cmd.encode('utf-8')) 16 length_data = tcp_client.recv(4) 17 length = struct.unpack('i', length_data)[0] 18 recv_size = 0 19 recv_msg = b'' 20 while recv_size < length: 21 # 第一種寫法 22 # r_m = tcp_client.recv(buffer_size) 23 # recv_msg += r_m 24 # recv_size += len(r_m) 25 26 # 第二種寫法 27 recv_msg += tcp_client.recv(buffer_size) 28 recv_size = len(recv_msg) 29 print(recv_msg.decode('gbk')) 30 tcp_client.close()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import socketserver 4 5 6 class MyServer(socketserver.BaseRequestHandler): 7 def handle(self): 8 print('conn is:', self.request) # <==>conn 9 print('addr is:', self.client_address) # <==> addr 10 while True: 11 try: 12 # 收消息 13 data = self.request.recv(1024) 14 if not data:break 15 print('收到客戶端的消息是', data) 16 # 發消息 17 self.request.sendall(data.upper()) 18 except Exception as EX: 19 print('錯誤提示:',EX) 20 break 21 22 23 if __name__ == '__main__': 24 s = socketserver.ThreadingTCPServer(('', 8080), MyServer) # 多線程;第一個參數,地址+端口;第二個參數,類
25 # s = socketserver.ForkingTCPServer(('', 8080), MyServer) # 多進程;多進程的開銷大於多線程
26 s.serve_forever()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from socket import * 4 ip_port = ('', 8080) 5 back_log = 5 6 buffer_size = 1024 7 tcp_client = socket(AF_INET, SOCK_STREAM) 8 tcp_client.connect(ip_port) 9 while True: 10 msg = input('>>').strip() 11 if not msg:continue 12 if msg == 'quit':break 13 tcp_client.send(msg.encode('utf-8')) 14 data = tcp_client.recv(buffer_size) 15 print('收到服務端發來的消息:', data.decode('utf-8')) 16 tcp_client.close()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import socketserver 4 5 6 class MyServer(socketserver.BaseRequestHandler): 7 def handle(self): 8 print(self.request) 9 print('收到客戶端的消息是:', self.request[0].upper()) 10 self.request[1].sendto(self.request[0].upper(), self.client_address) 11 12 13 if __name__ == '__main__': 14 s = socketserver.ThreadingUDPServer(('', 8080), MyServer) # 第一個參數,地址+端口;第二個參數,類 15 s.serve_forever()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from socket import * 4 ip_port = ('', 8080) 5 buffer_size = 1024 6 udp_client = socket(AF_INET, SOCK_DGRAM) # SOCK_DGRAM:數據報式套接字 7 while True: 8 msg = input('>>').strip() 9 udp_client.sendto(msg.encode('utf-8'), ip_port) 10 data, addr = udp_client.recvfrom(buffer_size) 11 print(data.decode('utf-8')) 12 print(addr)
1 # _*_coding:utf-8_*_ 2 __author__ = 'Linhaifeng' 3 from socket import * 4 import hmac, os 5 6 secret_key = b'alex bang bang bang' 7 8 9 def conn_auth(conn): 10 ''' 11 認證客戶端連接 12 :param conn: 13 :return: 14 ''' 15 print('開始驗證新連接的合法性') 16 msg = os.urandom(32) # 隨機生成的;b'\xa3\x9d\xaa\x94\x9e\x89\xe9\xc9\xc3r\xf9E\xe0w\x82=\xaac-\x04\xd8:\xea\x07\xad\x1dx\x1er\xe0\x7f\x02' 17 conn.sendall(msg) 18 h = hmac.new(secret_key, msg) # <hmac.HMAC object at 0x000000D5DA4824E0> 19 digest = h.digest() # 隨機生成的;b'\x17!*\xae6\x81\xfe|)\x138\xfa2o%\x1a' 20 respone = conn.recv(len(digest)) 21 return hmac.compare_digest(respone, digest) # 比較第一個參數和第二個參數;相同,返回True,反之也成立 22 23 24 def data_handler(conn, bufsize=1024): 25 if not conn_auth(conn): 26 print('該連接不合法,關閉') 27 conn.close() 28 return 29 print('連接合法,開始通訊') 30 while True: 31 data = conn.recv(bufsize) 32 if not data: break 33 conn.sendall(data.upper()) 34 35 36 def server_handler(ip_port, bufsize, backlog=5): 37 ''' 38 只處理連接 39 :param ip_port: 40 :return: 41 ''' 42 tcp_socket_server = socket(AF_INET, SOCK_STREAM) 43 tcp_socket_server.bind(ip_port) 44 tcp_socket_server.listen(backlog) 45 while True: 46 conn, addr = tcp_socket_server.accept() 47 print('新鏈接[%s:%s]' % (addr[0], addr[1])) 48 data_handler(conn, bufsize) 49 50 51 if __name__ == '__main__': 52 ip_port = ('', 9999) 53 bufsize = 1024 54 server_handler(ip_port, bufsize)
1 # _*_coding:utf-8_*_ 2 __author__ = 'Linhaifeng' 3 from socket import * 4 import hmac, os 5 6 secret_key = b'alex bang bang bang' 7 8 9 def conn_auth(conn): 10 ''' 11 認證客戶端連接 12 :param conn: 13 :return: 14 ''' 15 print('開始驗證新連接的合法性') 16 msg = os.urandom(32) 17 conn.sendall(msg) 18 h = hmac.new(secret_key, msg) 19 digest = h.digest() 20 respone = conn.recv(len(digest)) 21 return hmac.compare_digest(respone, digest) 22 23 24 def data_handler(conn, bufsize=1024): 25 if not conn_auth(conn): 26 print('該連接不合法,關閉') 27 conn.close() 28 return 29 print('連接合法,開始通訊') 30 while True: 31 data = conn.recv(bufsize) 32 if not data: break 33 conn.sendall(data.upper()) 34 35 36 def server_handler(ip_port, bufsize, backlog=5): 37 ''' 38 只處理連接 39 :param ip_port: 40 :return: 41 ''' 42 tcp_socket_server = socket(AF_INET, SOCK_STREAM) 43 tcp_socket_server.bind(ip_port) 44 tcp_socket_server.listen(backlog) 45 while True: 46 conn, addr = tcp_socket_server.accept() 47 print('新鏈接[%s:%s]' % (addr[0], addr[1])) 48 data_handler(conn, bufsize) 49 50 51 if __name__ == '__main__': 52 ip_port = ('', 9999) 53 bufsize = 1024 54 server_handler(ip_port, bufsize)
三、容許用戶在ftp server上隨意切換目錄(cd)
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import os,sys 4 PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 5 sys.path.append(PATH) 6 7 8 from core import main 9 10 11 if __name__ == '__main__': 12 main.ArgvHandler()
1 [DEFAULT] 2 3 [alex] 4 Password =123 5 Quotation = 100 6 7 [root] 8 Password = root 9 Quotation = 100
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import os 4 BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 5 6 ip = '' 7 port = 8080 8 9 ACCOUNT_PATH = os.path.join(BASE_DIR, 'conf', 'accounts.cfg')
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import socketserver 4 import optparse 5 import socketserver 6 from conf import settings 7 from core import server 8 9 10 class ArgvHandler(): 11 def __init__(self): 12 self.op = optparse.OptionParser() 13 # self.op.add_option('-s', '--server', dest='server') 14 # self.op.add_option('-P', '--port', dest='port') 15 options, args = self.op.parse_args() 16 # print(options) # {'server': '', 'port': '8080'} 17 # print(type(options)) # <class 'optparse.Values'> 18 # print(options.server) # 19 # print(args) 20 21 options, args = self.op.parse_args() 22 self.verify_args(options, args) 23 24 def verify_args(self, options, args): 25 cmd = args[0] 26 # 第一種方法 27 # if cmd == 'start': 28 # pass 29 # else: 30 # pass 31 # 第二種方法 32 if hasattr(self, cmd): 33 func = getattr(self, cmd) 34 func() 35 36 def start(self): 37 print('ths server is working...') 38 s = socketserver.ThreadingTCPServer((settings.ip, settings.port), server.ServerHandler) 39 s.serve_forever() 40 41 def help(self): 42 pass
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import socketserver 4 import json 5 import configparser 6 from conf import settings 7 import os 8 9 STATUS_CODE = { 10 250: "Invalid cmd format,e.g:{'action':'get','filename':'test.py','size':344}", 11 251: "Invalid cmd", 12 252: "Invalid auth data", 13 253: "Wrong username or password", 14 254: "Passed authentication", 15 255: "Filename doesn't provided", 16 256: "File doesn't exist on server", 17 257: "ready to send file", 18 258: "md5 verification", 19 800: "the file exist,but not enough,is continue?", 20 801: "the file exist!", 21 802: "ready to receive datas", 22 900: "md5 valdate success" 23 } 24 25 26 class ServerHandler(socketserver.BaseRequestHandler): 27 def handle(self): 28 while True: 29 data = self.request.recv(1024).strip() # self.request=conn 30 data = json.loads(data.decode('utf8')) 31 if data.get('action'): 32 if hasattr(self, data.get('action')): 33 func = getattr(self, data.get('action')) 34 func(**data) 35 else: 36 print('Invalid cmd') 37 else: 38 print('Invalid cmd') 39 40 def send_response(self, status_code): 41 response = {'status_code': status_code} 42 self.request.sendall(json.dumps(response).encode('utf8')) 43 44 def auth(self, **data): 45 username = data['username'] 46 password = data['password'] 47 username = self.authenticate(username, password) 48 if username: 49 self.send_response(254) 50 else: 51 self.send_response(253) 52 53 def authenticate(self, username, password): 54 cfg = configparser.ConfigParser() 55 cfg.read(settings.ACCOUNT_PATH) 56 if username in cfg.sections(): 57 if cfg[username]['Password'] == password: 58 self.username = username 59 self.mainPath = os.path.join(settings.BASE_DIR, 'home', self.username) 60 print('passed authenticate') 61 return username 62 63 def put(self, **data): 64 print('data', data) 65 file_name = data.get('file_name') 66 file_size = data.get('file_size') 67 target_path = data.get('target_path') 68 abs_path = os.path.join(self.mainPath, target_path, file_name) 69 has_received = 0 70 if os.path.exists(abs_path): 71 file_has_size = os.stat(abs_path).st_size 72 if file_has_size < file_size: 73 # 斷點續傳 74 self.request.sendall('800'.encode('utf8')) 75 choice = self.request.recv(1024).decode('utf8') 76 if choice == 'Y': 77 self.request.sendall(str(file_has_size).encode('utf8')) 78 has_received = file_has_size 79 f = open(abs_path, 'ab') 80 else: 81 f = open(abs_path, 'wb') 82 else: 83 # 文件徹底存在 84 self.request.sendall('801'.encode('utf8')) 85 return 86 else: 87 self.request.sendall('802'.encode('utf8')) 88 f = open(abs_path, 'wb') 89 90 while has_received < file_size: 91 try: 92 data = self.request.recv(1024) 93 except Exception as EX: 94 break 95 f.write(data) 96 has_received += len(data) 97 f.close() 98 99 def ls(self, **data): 100 file_list = os.listdir(self.mainPath) 101 file_str = '\n'.join(file_list) 102 if not len(file_list): 103 file_str = '<empty dir>' 104 self.request.sendall(file_str.encode('utf8')) 105 106 def cd(self, **data): 107 dirname = data.get('dirname') 108 if dirname == '..': 109 self.mainPath = os.path.dirname(self.mainPath) 110 else: 111 self.mainPath = os.path.join(self.mainPath, dirname) 112 self.request.sendall(self.mainPath.encode('utf8')) 113 114 def mkdir(self, **data): 115 dirname = data.get('dirname') 116 path = os.path.join(self.mainPath, dirname) 117 if not os.path.exists(path): 118 if '/' in path: 119 os.makedirs(path) 120 else: 121 os.mkdir(path) 122 self.request.sendall('create success'.encode('utf8')) 123 else: 124 self.request.sendall('dirname exist'.encode('utf8'))
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # import socket 4 # 5 # sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 6 # sk.connect(('', 8080)) 7 import optparse 8 import socket 9 import json 10 import os, sys 11 STATUS_CODE = { 12 250: "Invalid cmd format,e.g:{'action':'get','filename':'test.py','size':344}", 13 251: "Invalid cmd", 14 252: "Invalid auth data", 15 253: "Wrong username or password", 16 254: "Passed authentication", 17 255: "Filename doesn't provided", 18 256: "File doesn't exist on server", 19 257: "ready to send file", 20 258: "md5 verification", 21 800: "the file exist,but not enough,is continue?", 22 801: "the file exist!", 23 802: "ready to receive datas", 24 900: "md5 valdate success" 25 } 26 27 28 class ClientHandler(object): 29 def __init__(self): 30 self.op = optparse.OptionParser() 31 self.op.add_option('-s', '--server', dest='server') 32 self.op.add_option('-P', '--port', dest='port') 33 self.op.add_option('-u', '--username', dest='username') 34 self.op.add_option('-p', '--password', dest='password') 35 36 self.options, self.args = self.op.parse_args() 37 38 self.verify_args(self.options, self.args) 39 self.make_connection() 40 self.mainPath = os.path.dirname(os.path.abspath(__file__)) 41 self.last = 0 42 43 def verify_args(self, options, args): 44 server = options.server 45 port = options.port 46 username = options.username 47 password = options.password 48 if int(port) > 0 and int(port) < 65535: 49 return True 50 else: 51 exit('the port is in 0-65535') 52 53 def make_connection(self): 54 self.sock = socket.socket() 55 self.sock.connect((self.options.server, int(self.options.port))) 56 57 def interactive(self): 58 print('begin to interactive...') 59 # self.authenticate() 60 if self.authenticate(): 61 while 1: 62 cmd_info = input('[%s]' % self.current_dir).strip() 63 cmd_list = cmd_info.split() 64 if hasattr(self, cmd_list[0]): 65 func = getattr(self, cmd_list[0]) 66 func(*cmd_list) 67 68 def put(self, *cmd_list): 69 # put 12.png images 70 actions, local_path, target_path = cmd_list 71 local_path = os.path.join(self.mainPath, local_path) 72 file_name = os.path.basename(local_path) 73 file_size = os.stat(local_path).st_size 74 data = { 75 'action': 'put', 76 'file_name': file_name, 77 'file_size': file_size, 78 'target_path': target_path 79 } 80 self.sock.send(json.dumps(data).encode('utf8')) 81 is_exist = self.sock.recv(1024).decode('utf8') 82 has_send = 0 83 if is_exist == '800': 84 # 文件不完整 85 choice = input('the file exist,but not enough,is continue?[Y/N]').strip() 86 if choice.upper() == 'Y': 87 self.sock.sendall('Y'.encode('utf8')) 88 continue_position = self.sock.recv(1024).decode('utf8') 89 has_send += int(continue_position) 90 else: 91 self.sock.sendall('N'.encode('utf8')) 92 elif is_exist == '801': 93 # 文件徹底存在 94 print('the file exist') 95 return 96 else: 97 pass 98 f = open(local_path, 'rb') 99 f.seek(has_send) 100 while has_send < file_size: 101 data = f.read(1024) 102 self.sock.sendall(data) 103 has_send += len(data) 104 self.show_progress(has_send, file_size) 105 f.close() 106 print('successfully upload!') 107 108 def show_progress(self, has, total): 109 rate = float(has)/float(total) 110 rate_num = int(rate*100) 111 if self.last != rate_num: 112 sys.stdout.write('%s%% %s\r' % (rate_num, '#'*rate_num)) 113 self.last = rate_num 114 115 def ls(self, *cmd_list): 116 data = { 117 'action': 'ls' 118 } 119 self.sock.sendall(json.dumps(data).encode('utf8')) 120 data = self.sock.recv(1024).decode('utf8') 121 print(data) 122 123 def cd(self, *cmd_list): 124 data = { 125 'action': 'cd', 126 'dirname': cmd_list[1] 127 } 128 self.sock.sendall(json.dumps(data).encode('utf8')) 129 data = self.sock.recv(1024).decode('utf8') 130 self.current_dir = os.path.basename(data) 131 132 def mkdir(self, *cmd_list): 133 data = { 134 'action': 'mkdir', 135 'dirname': cmd_list[1] 136 } 137 self.sock.sendall(json.dumps(data).encode('utf8')) 138 data = self.sock.recv(1024).decode('utf8') 139 140 def authenticate(self): 141 if self.options.username is None or self.options.password is None: 142 username = input('username:') 143 password = input('password:') 144 return self.get_auth_result(username, password) 145 return self.get_auth_result(self.options.username, self.options.password) 146 147 def response(self): 148 data = self.sock.recv(1024).decode('utf8') 149 data = json.loads(data) 150 return data 151 152 def get_auth_result(self, username, password): 153 data = { 154 'action': "auth", 155 'username': username, 156 'password': password 157 } 158 self.sock.send(json.dumps(data).encode('utf8')) 159 response = self.response() 160 print('response:', response['status_code']) 161 if response['status_code'] == 254: 162 self.username = username 163 self.current_dir = username 164 print(STATUS_CODE[response['status_code']]) 165 return True 166 else: 167 print(STATUS_CODE[response['status_code']]) 168 169 170 ch = ClientHandler() 171 ch.interactive()
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
threading 模塊創建在thread 模塊之上。thread模塊以低級、原始的方式來處理和控制線程,而threading 模塊經過對thread進行二次封裝,提供了更方便的api來處理線程。
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import threading 4 import time 5 6 7 def music(): 8 print('begin to listen % s' % time.ctime()) 9 time.sleep(3) 10 print('stop to listen %s' % time.ctime()) 11 12 13 def game(): 14 print('begin to play game % s \r\n' % time.ctime()) 15 time.sleep(5) 16 print('stop to play game %s \r\n' % time.ctime()) 17 18 19 threads = [] 20 t1 = threading.Thread(target=music) 21 t2 = threading.Thread(target=game) 22 threads.append(t1) 23 threads.append(t2) 24 if __name__ == '__main__': 25 # join()功能:在子線程完成運行以前,這個子線程的父線程講一直被阻塞 26 # t1 = threading.Thread(target=music) 27 # t2 = threading.Thread(target=game) 28 # t1.start() 29 # t2.start() 30 # 31 # t1.join() 32 # t2.join() 33 # print('end') 34 35 # setDaemon():將線程生命爲守護線程 36 t2.setDaemon(True) # 注:必定要在start以前設置 37 for t in threads: 38 t.start() 39 print('end')
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import threading 4 import time 5 6 7 class MyThread(threading.Thread): 8 def __init__(self, num): 9 threading.Thread.__init__(self) 10 self.num = num 11 12 def run(self): # 定義每一個線程要運行的函數 13 14 print("running on number:%s" % self.num) 15 16 time.sleep(3) 17 18 19 if __name__ == '__main__': 20 t1 = MyThread(1) 21 t2 = MyThread(2) 22 t1.start() 23 t2.start() 24 25 print("ending......")
1 # run():用於表示線程活動的方法 2 # start():啓動線程活動 3 # isAlive():返回線程是否活動的,返回布爾值,True/False 4 # getName():返回線程名字 5 # setName():設置線程名字 6 7 threading模塊提供的一些方法: 8 # threading.currentThread():返回當前的線程變量 9 # threading.enumerate():返回一個包含正在運行的線程的list。正在運行指線程啓動後-結束前,不包括啓動前和終止後的線程 10 # threading.activeCount():返回正在運行的線程數量,與len(threading.enumerate())有相同的結果
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import threading 4 import time 5 6 7 def music(): 8 print('begin to listen % s' % time.ctime()) 9 time.sleep(3) 10 print('stop to listen %s' % time.ctime()) 11 12 13 def game(): 14 print('begin to play game % s \r\n' % time.ctime()) 15 time.sleep(5) 16 print('stop to play game %s \r\n' % time.ctime()) 17 18 19 threads = [] 20 t1 = threading.Thread(target=music) 21 t2 = threading.Thread(target=game) 22 threads.append(t1) 23 threads.append(t2) 24 if __name__ == '__main__': 25 # join()功能:在子線程完成運行以前,這個子線程的父線程講一直被阻塞 26 # t1 = threading.Thread(target=music) 27 # t2 = threading.Thread(target=game) 28 # t1.start() 29 # t2.start() 30 # 31 # t1.join() 32 # t2.join() 33 # print('end') 34 35 # setDaemon():將線程生命爲守護線程 36 t2.setDaemon(True) # 注:必定要在start以前設置 37 for t in threads: 38 t.start() 39 print(t.getName()) 40 print('count:', threading.activeCount()) 41 while threading.activeCount() == 3: 42 print('end')
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import threading 4 import time 5 num = 100 6 7 8 def sub(): 9 global num 10 print('ok') 11 lock.acquire() # 加鎖 12 temp = num 13 time.sleep(0.001) 14 num = temp-1 15 lock.release() # 釋放鎖 16 17 18 li = [] 19 lock = threading.Lock() 20 for i in range(100): 21 t1 = threading.Thread(target=sub) 22 t1.start() 23 li.append(t1) 24 for l in li: 25 l.join() 26 print(num)
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import threading 4 import time 5 6 7 class MyThread(threading.Thread): 8 9 def actionA(self): 10 r_lock.acquire() 11 print(self.name, 'gotA', time.ctime()) # 重寫線程後的self.name --->線程的名字 12 time.sleep(2) 13 r_lock.acquire() 14 print(self.name, 'gotB', time.ctime()) 15 time.sleep(1) 16 r_lock.release() 17 r_lock.release() 18 19 def actionB(self): 20 r_lock.acquire() 21 print(self.name, 'gotB', time.ctime()) # 重寫線程後的self.name --->線程的名字 22 time.sleep(2) 23 r_lock.acquire() 24 print(self.name, 'gotA', time.ctime()) 25 time.sleep(1) 26 r_lock.release() 27 r_lock.release() 28 29 def run(self): 30 self.actionA() 31 self.actionB() 32 33 34 if __name__ == '__main__': 35 r_lock = threading.RLock() 36 li = [] 37 for t in range(3): 38 t = MyThread() 39 t.start() 40 li.append(t) 41 42 for i in li: 43 i.join() 44 45 print('end')
An event is a simple synchronization object;the event represents an internal flag,
and threads can wait for the flag to be set, or set or clear the flag themselves.
event = threading.Event()
# a client thread can wait for the flag to be set
# a server thread can set or reset it
If the flag is set, the wait method doesn’t do anything.
If the flag is cleared, wait will block until it becomes set again.
Any number of threads may wait for the same event.
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import threading 4 import time 5 6 7 class Boss(threading.Thread): 8 def run(self): 9 print('Boss:今天加班到22:00!\r\n') 10 print(event.isSet()) # False 11 event.set() 12 time.sleep(6) 13 print('Boss:能夠下班了,明天放假!\r\n') 14 print(event.isSet()) 15 event.set() 16 17 18 class Worker(threading.Thread): 19 def run(self): 20 event.wait() # 一旦event被設定,等同於pass 21 print('Worker:唉···命真苦!\r\n') 22 time.sleep(1) 23 event.clear() 24 event.wait() 25 print('Worker:OhYeah!\r\n') 26 27 28 if __name__ == '__main__': 29 event = threading.Event() 30 threads = [] 31 for i in range(5): 32 threads.append(Worker()) 33 threads.append(Boss()) 34 for t in threads: 35 t.start() 36 for t in threads: 37 t.join() 38 print('end')
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import threading 4 import time 5 6 7 class MyThread(threading.Thread): 8 def run(self): 9 if semaphore.acquire(): 10 print(self.name, '\r') 11 time.sleep(5) 12 semaphore.release() 13 14 15 if __name__ == '__main__': 16 semaphore = threading.Semaphore(5) 17 threads = [] 18 for i in range(100): 19 threads.append(MyThread()) 20 for t in threads: 21 t.start()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import threading 4 import time 5 6 li = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 7 8 9 def Foo(): 10 while li: 11 try: 12 last_li = li[-1] 13 print(last_li) 14 li.remove(last_li) 15 time.sleep(1) 16 except Exception as EX: 17 print('錯誤提示:', last_li, EX) 18 19 20 t1 = threading.Thread(target=Foo) 21 t1.start() 22 t2 = threading.Thread(target=Foo) 23 t2.start()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import queue # 線程隊列 4 q = queue.Queue() # 建立q對象,同步實現的。隊列長度可爲無限或者有限。可經過Queque的構造函數的可選參數maxsize來設定隊列長度。若是maxsize小於1就表示隊列長度無限 5 q.put(12) # 將一個值放到隊列,調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item爲必需的,爲插入項目的值;第二個block爲可選參數,默認爲1.若是隊列當前爲空且block爲1,put()方法就使調用線程暫停,直到空處一個數據單元。若是block爲0,put方法將引起Full異常 6 q.put('alex') 7 q.put({"age": 15}) 8 print(q.get()) # 將一個值從隊列中取出,調用隊列對象的get()方法,從對頭刪除並返回一個實例。可選參數爲block,默認爲True。若是隊列爲空且block爲True,get()就使調用線程暫停,直至有項目可用。若是隊列爲空且block爲False,隊列將引起Empty異常 9 print(q.qsize()) # 返回隊列的大小 10 print(q.empty()) # 判斷隊列是否爲空,返回布爾值,True/False 11 print(q.full()) # 判斷隊列是否已經滿了,返回布爾值,True/False 12 q.join() # 實際上意味着等到隊列爲空,再執行別的操做 13 14 ''' 15 Queue模塊的三種隊列及構造函數 16 一、Python Queue模塊的FIFO隊列,先進先出 class queue.Queue(maxsize) 17 二、LIFO 相似於堆,即先進後處。 class queue.LifoQueue(maxsize) 18 三、優先級隊列,級別越低月先出來。 class queue.PriorityQueue(maxsize) 19 '''
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import time 4 import random 5 import queue 6 import threading 7 q = queue.Queue() 8 9 10 def Producer(name): 11 count = 0 12 while count < 10: 13 print('making..') 14 time.sleep(random.randrange(3)) 15 q.put(count) 16 print('Producer [%s] has produced %s meat bun。\r' % (name, count)) 17 count += 1 18 print('ok\r') 19 20 21 def Consumer(name): 22 count = 0 23 while count < 10: 24 time.sleep(random.randrange(4)) 25 if not q.empty(): 26 data = q.get() 27 print('lm Consumer [%s] has eat %s meat bun。\r' % (name, data)) 28 else: 29 print('---no meat bun anymore----\r') 30 count += 1 31 32 33 p1 = threading.Thread(target=Producer, args=('alex',)) 34 c1 = threading.Thread(target=Consumer, args=('B',)) 35 p1.start() 36 c1.start()
is a package that supports spawning processes using an API similar to the threading module. The ultiprocessing
package offers both local and remote concurrency,effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing
module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.multiprocessing
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from multiprocessing import Process 4 import time 5 6 7 def Foo(name): 8 time.sleep(1) 9 print('hello', name, time.ctime()) 10 11 12 if __name__ == '__main__': 13 p_list = [] 14 for i in range(200): 15 p = Process(target=Foo, args=('alex',)) 16 p_list.append(p) 17 p.start() 18 for i in p_list: 19 p.join() 20 print('end')
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from multiprocessing import Process 4 import time 5 6 7 class MyProcess(Process): 8 def __init__(self): 9 super(MyProcess, self).__init__() 10 11 def run(self): 12 time.sleep(1) 13 print('hello', self.name, time.ctime()) 14 15 16 if __name__ == '__main__': 17 p_list = [] 18 for i in range(3): 19 p = MyProcess() 20 p.start() 21 p_list.append(p) 22 for p in p_list: 23 p.join() 24 print('end')
Process([group [, target [, name [, args [, kwargs]]]]])
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import queue 4 import multiprocessing 5 6 7 def Foo(q): 8 q.put(123) 9 q.put(456) 10 11 12 if __name__ == '__main__': 13 q = multiprocessing.Queue() # 注意:此處需用進程隊列,不能用線程隊列,即q=queue.Queue() 14 p = multiprocessing.Process(target=Foo, args=(q,)) 15 p.start() 16 print(q.get()) 17 print(q.get())
The Pipe()
function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from multiprocessing import Process, Pipe 4 5 6 def Foo(child_conn): 7 child_conn.send([12, {'name': 'alice'}, 'hello']) 8 response = child_conn.recv() 9 print('response:', response) 10 child_conn.close() 11 print('q_id2:', id(child_conn)) 12 13 14 if __name__ == '__main__': 15 parent_conn, child_conn = Pipe() 16 print('q_ID1', id(child_conn)) 17 p = Process(target=Foo, args=(child_conn,)) 18 p.start() 19 print(parent_conn.recv()) 20 parent_conn.send('早上好!') 21 p.join()
A manager object returned by Manager()
controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager()
will support types list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Barrier
, Queue
, Value
and Array
. For example:
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from multiprocessing import Process, Manager 4 5 6 def Foo(dic, li, i): 7 dic[i] = '1' 8 li.append(i) 9 10 11 if __name__ == '__main__': 12 with Manager() as manager: 13 dic = manager.dict() 14 li = manager.list(range(5)) 15 p_list = [] 16 for i in range(10): 17 p = Process(target=Foo, args=(dic, li, i)) 18 p.start() 19 p_list.append(p) 20 for p in p_list: 21 p.join() 22 print(dic) 23 print(li)
Without using the lock output from the different processes is liable to get all mixed up.
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from multiprocessing import Process, Lock 4 5 6 def Foo(lk, i): 7 with lk: # 默認狀況下,已經lk.acquire() 8 print('hello world %s' % i) 9 10 11 if __name__ == '__main__': 12 lock = Lock() 13 for num in range(10): 14 Process(target=Foo, args=(lock, num)).start()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from multiprocessing import Process, Pool 4 import time, os 5 6 7 def Foo(i): 8 time.sleep(1) 9 print('i = \r', i) 10 11 12 def Bar(arg): # 此處arg=Foo()函數的返回值 13 print('pgid-->%s\r' % os.getpid()) 14 print('ppid-->%s\r' % os.getppid()) 15 print('logger:%s\r' % arg) 16 17 18 if __name__ == '__main__': 19 pool = Pool(5) 20 Bar(1) 21 print('------------\r') 22 for i in range(10): 23 # pool.apply(func=Foo, args=(i,)) # 同步接口 24 # pool.apply_async(func=Foo, args=(i,)) 25 pool.apply_async(func=Foo, args=(i,), callback=Bar) # callback-->回調函數:就是某個動做或者函數執行成功後再去執行的函數
26 pool.close() 27 pool.join() # join和close位置不能反 28 print('end\r')
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import time 4 5 6 def consumer(name): 7 print('---->ready to eat meat bun') 8 while True: 9 new_meat_bun = yield 10 print('[%s] is eating meat bun %s' % (name, new_meat_bun)) 11 12 13 def producer(): 14 con1.__next__() 15 con2.__next__() 16 n = 0 17 while 1: 18 time.sleep(1) 19 print('\033[32;1m[producer]\033[0m is making meat bun %s and %s' % (n, n + 1)) 20 con1.send(n) 21 con2.send(n + 1) 22 n += 2 23 24 25 if __name__ == '__main__': 26 con1 = consumer('alex') 27 con2 = consumer('alice') 28 producer()
greenlet是一個用C實現的協程模塊,相比與Python自帶的yield,它可使你在任意函數之間隨意切換,而不需把這個函數先聲明爲generator。(注:須要用pip安裝包;pip install gevent)
1 from greenlet import greenlet 2 import time 3 4 5 def A(): 6 while 1: 7 print('-------A-------') 8 time.sleep(0.5) 9 g2.switch() 10 11 12 def B(): 13 while 1: 14 print('-------B-------') 15 time.sleep(0.5) 16 g1.switch() 17 18 19 g1 = greenlet(A) # 建立協程g1 20 g2 = greenlet(B) 21 22 g1.switch() # 跳轉至協程g1
1 import gevent 2 import requests, time # 此處requests需安裝模塊;pip --timeout=100 install requests 3 4 start = time.time() 5 6 7 def f(url): 8 print('GET: %s' % url) 9 resp = requests.get(url) # 爬網頁的標籤 10 data = resp.text 11 print('%d bytes received from %s.' % (len(data), url)) 12 13 14 gevent.joinall([ 15 gevent.spawn(f, 'https://www.qq.com/'), 16 gevent.spawn(f, 'https://www.baidu.com/'), 17 gevent.spawn(f, 'https://www.taobao.com/'), 18 ]) 19 20 print("cost time:", time.time() - start)
緩存I/O又被稱做標準I/O,大多數文件系統的默認I/O操做都是緩存I/O。在Linux的緩存I/O機制中,操做系統會將I/O的數據緩存在文件系統的頁緩存(page cache)中,也就是說,數據會先被拷貝到操做系統內核的緩衝區中,而後纔會從操做系統內的緩衝區拷貝到應用程序的地址空間。用戶空間無法直接訪問內核空間的,內核態到用戶態的數據拷貝。
對於一個network IO(以read舉例),他會涉及到兩個系統對象,一個是調用這個IO的process(or thread),另外一個就是系統內核(kernel)。當一個read操做發生時,它會經歷兩個階段:
一、等待數據準備(Waiting for the data to be read)
二、將數據從內核拷貝到進程中(Copying the data from the kernel to the process)
注:這兩點很重要,由於這些IO Mode的區別就是在這兩個階段上各有不一樣的狀況。
blocking IO(阻塞IO,Linux下)
當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一階段:準備數據。對於network IO來講,不少時候數據在一開始還沒到達(如:還沒收到一個完整的UDP包),這個時候kernel就要等待足夠的數據到來。而在用戶進程這邊,整個進程會被阻塞。當kernel一直等到數據準備好了,它就將數據從kernel中拷貝到用戶內存,而後kernel。因此,blocking IO的特色就是在IO執行的兩個階段都被block了。
non-blocking IO(非阻塞IO,Linux下)
在Linux下,能夠經過設置socket使其變爲non-blocking。當對一個non-blocking socket執行時大概的流程:
1 import time 2 import socket 3 sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 4 sk.bind(('',8080)) 5 sk.listen(5) 6 sk.setblocking(False) # 設置是否阻塞,默認爲True,非阻塞 7 while True: 8 try: 9 print('waiting client connection...') 10 conn,addr = sk.accept() # 進程主動輪詢 11 print('+++',addr) 12 data = sk.recv(1024) 13 print(str(data,'utf8')) 14 conn.close() 15 except Exception as EX: 16 print('錯誤提示:',EX) 17 time.sleep(2)
1 import time 2 import socket 3 sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 4 while True: 5 sk.connect(('',8080)) 6 print('hello') 7 sk.sendall(bytes('hello','utf8')) 8 time.sleep(1) 9 break
IO multiplexing(IO多路複用)
有些地方也稱爲這種IO方式爲event driven IO。它的基本原理就是select/epoll這個function會不斷的輪詢所負責的全部socket,當某個socket有數據到達了,就通知用戶進程,大概流程圖:
當用戶進程調用了select,那麼真個進程會被block。而同時,kernel會「監視」全部select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這時用戶進程再調用read操做,將數據從kernel拷貝到用戶進程。(若是處理的鏈接數不是不少的話,使用select/epoll的web server不必定比使用multi-threading+blocking IO的web server性能更好,可能延遲更大;select/epoll的優點並非對於單個鏈接能處理得更快,而是在於能處理更多的鏈接)
1 import socket 2 import select 3 4 sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 5 sk.bind(('', 8080)) 6 sk.listen(5) 7 inputs = [sk, ] 8 while True: 9 r, w, e = select.select(inputs, [], [], 3) 10 for obj in r: 11 print('obj:', obj) 12 if obj == sk: 13 conn, addr = obj.accept() 14 print('已鏈接:', conn) 15 inputs.append(conn) 16 else: 17 data_byte = obj.recv(1024) 18 print(str(data_byte, 'utf8')) 19 inp = input('回答[%s]號客戶端>>:' % inputs.index(obj)) 20 obj.sendall(bytes(inp, 'utf8')) 21 print('>>>', r)
1 import socket 2 3 sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 4 sk.connect(('', 8080)) 5 6 while True: 7 inp = input(">>>>") 8 sk.sendall(bytes(inp, "utf8")) 9 data = sk.recv(1024) 10 print(str(data, 'utf8'))
Asynchronous I/O(異步IO)
從圖中能夠看出,用戶進程發起read操做以後,馬上就開始去作其它的事。另外一方面,從kernel的角度,當他受到一個asynchronous read以後,首先它會馬上返回,因此不會對用戶進程產生任何block。而後,kernel會等待數據準備完成,而後將數據拷貝到用戶內存,當這一切都完成以後,kernel會給用戶進程發送一個signal,告訴它read操做完成了。
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import selectors 4 import socket 5 6 sel = selectors.DefaultSelector() # 根據系統,默認選擇最優IO多用戶模式 7 8 9 def accept(sock, mask): 10 conn, addr = sock.accept() 11 print('accepted', conn, 'from', addr) 12 conn.setblocking(False) 13 sel.register(conn, selectors.EVENT_READ, read) 14 15 16 def read(conn, mask): 17 try: 18 data = conn.recv(1000) 19 if not data: 20 raise Exception 21 print('收到:', data.decode('utf8')) 22 conn.send(data.upper()) # Hope it won't block 23 except Exception as EX: 24 print('closing:', conn) 25 sel.unregister(conn) # 解除綁定 26 conn.close() 27 28 29 sock = socket.socket() 30 sock.bind(('localhost', 8080)) 31 sock.listen(100) 32 sock.setblocking(False) 33 sel.register(sock, selectors.EVENT_READ, accept) # sock與accept綁定 34 while True: 35 events = sel.select() # 監聽[sock,conn1,conn2....] 36 for key, mask in events: 37 callback = key.data 38 print('>>callback:', callback) 39 callback(key.fileobj, mask)
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import socket 4 sk = socket.socket() 5 sk.connect(('', 8080)) 6 while True: 7 inp = input('>>>') 8 sk.send(inp.encode('utf8')) 9 data = sk.recv(1024) 10 print(data.decode('utf8'))