# 這裏異步服務器的實現是藉助於select,有關select模塊在我上邊的博客中有體現。# zen_utils也是咱們本身寫的一個腳本。也在上邊的磨課中import select, zen_utils,queue,timedef serve(listener,inpouts,outputs,message_queues): while inputs: print('等待開始第一個線程鏈接1111') # 開始select監聽,對input_list中的服務器端的server開始進行監聽 # 一旦調用socket的send,recv函數,將會再次調用此模塊 # 這裏監控三個參數,第一個返回的是可讀的list, 第二個存儲的是可寫的list, 第三個存儲的是錯誤信息的 readable, writeable, errorinfo = select.select(inputs, outputs, inputs) # 這裏判斷的是有沒有客戶端進行鏈接。 for s in readable: # 而後在進行判斷是服務端觸發的鏈接仍是客戶端觸發的連接。 if s is listener: # 接收的消息和客戶端的IP端口 print(s) connection, client_address = s.accept() print('client:', connection, client_address) # 設置爲非阻塞模式。 connection.setblocking(0) # 將客戶端鏈接也加入到inputs監聽列表中。 inputs.append(connection) # 加入到要發送的消息隊列中。 message_queues[connection] = queue.Queue() else: # 這是客戶端發送消息觸發的請求。 data = s.recv(1024) if data != b'?' and data != b'': print('客戶端{}發送了{}消息', s.getpeername(), data.decode('utf-8')) # 將接收到的消息須要進行回覆的答案。放到對應的消息隊列中。 print(zen_utils.aphorisms.get(data,b'')) message_queues[s].put(zen_utils.aphorisms.get(data,b'')) if s not in outputs: outputs.append(s) else: print("一個客戶端斷開了鏈接", client_address) # 輸出端不在監聽。 if s in outputs: outputs.remove(s) # 輸入端再也不監聽 inputs.remove(s) # 關閉這個套接字。 s.close() # 刪除消息隊列中的消息 del message_queues[s] # 下邊是處理輸出。 for s in writeable: try: message_queue = message_queues.get(s) sent_data = '' if message_queue is not None: # 若是消息隊列爲空,不阻塞 sent_data = message_queue.get_nowait() else: # 這裏觸發的客戶端關閉鏈接。 print('客戶端關閉鏈接') except queue.Empty: # 客戶端發送完了消息。 print('%s' % s.getpeername()) outputs.remove(s) else: if sent_data != '': s.sendall(sent_data) else: print('客戶端關閉鏈接') # 處理異常狀況。 for s in errorinfo: print('有一個異常客戶端的鏈接', s.getpeername()) inputs.remove(s) if s in outputs: outputs.remove(s) s.close() del message_queues[s] time.sleep(1)if __name__ == '__main__': address = zen_utils.parse_command_line('獲取IP端口') listener = zen_utils.create_srv_socket(address) # 將定義好的一個套接字傳入到列表中。 inputs = [listener] # 處理要發送的數據。 outputs = [] # 要發送的數據 message_queues = {} serve(listener,inputs,outputs,message_queues)