注:selectors也是包裝了select高級的包裝內置函數,它包裝了select與epoll,優先使用epoll windos內只支持select。併發
import selectors import socket # 生成select實例對象 sel = selectors.DefaultSelector() def accept(sock, mask): # 接收連接 conn, addr = sock.accept() # Should be ready print('accepted', conn, 'from', addr) # 連接設置非阻塞模式 conn.setblocking(False) # 註冊conn,回調 read函數 sel.register(conn, selectors.EVENT_READ, read) def read(conn, mask): data = conn.recv(1024) # Should be ready if data: print('echoing', repr(data), 'to', conn) conn.send(data) # Hope it won't block else: print('closing', conn) # 取消註冊 sel.unregister(conn) # 關閉連接 conn.close() sock = socket.socket() sock.bind(('localhost', 9999)) sock.listen(10000) sock.setblocking(False) # 註冊server事件: # 參數一:sock 進行監聽 # 參數二:selectors.EVENT_READ 執行動做 # 參數三:accept,只要來一個連接就回調這個函數 sel.register(sock, selectors.EVENT_READ, accept) # 第一次調用server,register accept # 第二次調用client,register read while True: # 調用select:優先使用epoll events = sel.select() # 只要不阻塞就有調用的數據,返回一個列表 # 默認阻塞,有活動連接就返回活動的連接列表 for key, mask in events: # callback至關於調accept函數 callback = key.data # 獲取函數內存地址,加入參數 # key.fileobj = 文件句柄 callback(key.fileobj, mask)
客戶端:socket
import socket import sys messages = [ b'This is the message. ', b'It will be sent ', b'in parts.', ] # 傳入連接參數 server_address = ('localhost', 9999) # Create a TCP/IP socket # 使用列表生成式,生成多個請求。 # Winuds使用select支持併發並很少 這裏測試40個併發 # Linux默認使用epoll可支持上萬併發可修改10000 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(40)] print('connecting to %s port %s' % server_address) # 循環socks連續連接4次 for s in socks: s.connect(server_address) # 循環發送數據 for message in messages: # Send messages on both sockets # 每一個客戶端依次發送數據 for s in socks: print('%s: sending "%s"' % (s.getsockname(), message) ) s.send(message) # Read responses on both sockets # 服務端收數據 for s in socks: data = s.recv(1024) # getsockname()服務端返回 print( '%s: received "%s"' % (s.getsockname(), data) ) # 沒有數據打印客戶端要關閉了 if not data: print(sys.stderr, 'closing socket', s.getsockname() )
注:創建一個socket就須要創建一個文件句柄,最大數65535 注:Linux修改文件描述符數值 當前執行 ulimit -SHn 65535 #修改連接端口 ulimit -n #查看連接端口
epoll實現多併發socketide
import socket, logging import select, errno logger = logging.getLogger("network-server") def InitLog(): logger.setLevel(logging.DEBUG) fh = logging.FileHandler("network-server.log") fh.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.ERROR) formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") ch.setFormatter(formatter) fh.setFormatter(formatter) logger.addHandler(fh) logger.addHandler(ch) if __name__ == "__main__": InitLog() try: # 建立 TCP socket 做爲監聽 socket listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) except socket.error as msg: logger.error("create socket failed") try: # 設置 SO_REUSEADDR 選項 listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) except socket.error as msg: logger.error("setsocketopt SO_REUSEADDR failed") try: # 進行 bind -- 此處未指定 ip 地址,即 bind 了所有網卡 ip 上 listen_fd.bind(('', 2003)) except socket.error as msg: logger.error("bind failed") try: # 設置 listen 的 backlog 數 listen_fd.listen(10) except socket.error as msg: logger.error(msg) try: # 建立 epoll 句柄 epoll_fd = select.epoll() # 向 epoll 句柄中註冊 監聽 socket 的 可讀 事件 epoll_fd.register(listen_fd.fileno(), select.EPOLLIN) except select.error as msg: logger.error(msg) connections = {} addresses = {} datalist = {} while True: # epoll 進行 fd 掃描的地方 -- 未指定超時時間則爲阻塞等待 epoll_list = epoll_fd.poll() for fd, events in epoll_list: # 若爲監聽 fd 被激活 if fd == listen_fd.fileno(): # 進行 accept -- 得到鏈接上來 client 的 ip 和 port,以及 socket 句柄 conn, addr = listen_fd.accept() logger.debug("accept connection from %s, %d, fd = %d" % (addr[0], addr[1], conn.fileno())) # 將鏈接 socket 設置爲 非阻塞 conn.setblocking(0) # 向 epoll 句柄中註冊 鏈接 socket 的 可讀 事件 epoll_fd.register(conn.fileno(), select.EPOLLIN | select.EPOLLET) # 將 conn 和 addr 信息分別保存起來 connections[conn.fileno()] = conn addresses[conn.fileno()] = addr elif select.EPOLLIN & events: # 有 可讀 事件激活 datas = '' while True: try: # 從激活 fd 上 recv 10 字節數據 data = connections[fd].recv(10) # 若當前沒有接收到數據,而且以前的累計數據也沒有 if not data and not datas: # 從 epoll 句柄中移除該 鏈接 fd epoll_fd.unregister(fd) # server 側主動關閉該 鏈接 fd connections[fd].close() logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1])) break else: # 將接收到的數據拼接保存在 datas 中 datas += data except socket.error as msg: # 在 非阻塞 socket 上進行 recv 須要處理 讀穿 的狀況 # 這裏其實是利用 讀穿 出 異常 的方式跳到這裏進行後續處理 if msg.errno == errno.EAGAIN: logger.debug("%s receive %s" % (fd, datas)) # 將已接收數據保存起來 datalist[fd] = datas # 更新 epoll 句柄中鏈接d 註冊事件爲 可寫 epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT) break else: # 出錯處理 epoll_fd.unregister(fd) connections[fd].close() logger.error(msg) break elif select.EPOLLHUP & events: # 有 HUP 事件激活 epoll_fd.unregister(fd) connections[fd].close() logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1])) elif select.EPOLLOUT & events: # 有 可寫 事件激活 sendLen = 0 # 經過 while 循環確保將 buf 中的數據所有發送出去 while True: # 將以前收到的數據發回 client -- 經過 sendLen 來控制發送位置 sendLen += connections[fd].send(datalist[fd][sendLen:]) # 在所有發送完畢後退出 while 循環 if sendLen == len(datalist[fd]): break # 更新 epoll 句柄中鏈接 fd 註冊事件爲 可讀 epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLET) else: # 其餘 epoll 事件不進行處理 continue