pip install websocket pip install threadpool pip install websocket-client pip install multiprocessing
#!/usr/bin/python #-*- coding:utf-8 -*- #__author__ == 'chenmingle' import websocket import time import threading import json import multiprocessing from threadpool import ThreadPool, makeRequests #修改爲本身的websocket地址 WS_URL = "wss://ws.test.com/" #定義進程數 processes=5 #定義線程數(每一個文件可能限制1024個,能夠修改fs.file等參數) thread_num=1000 def on_message(ws, message): print(message) pass def on_error(ws, error): print(error) pass def on_close(ws): print("### closed ###") pass def on_open(ws): def send_trhead(): #設置你websocket的內容 send_info = {"cmd": "refresh", "data": {"room_id": "58", "wx_user_id": 56431}} #每隔10秒發送一下數據使連接不中斷 while True: time.sleep(10) ws.send(json.dumps(send_info)) t = threading.Thread(target=send_trhead) t.start() def on_start(num): time.sleep(num%20) websocket.enableTrace(True) ws = websocket.WebSocketApp(WS_URL + str(num), on_message=on_message, on_error=on_error, on_close=on_close) ws.on_open = on_open ws.run_forever() def thread_web_socket(): #線程池 pool = ThreadPool(thread_num) num = list() #設置開啓線程的數量 for ir in range(thread_num): num.append(ir) requests = makeRequests(on_start, num) [pool.putRequest(req) for req in requests] pool.wait() if __name__ == "__main__": #進程池 pool = multiprocessing.Pool(processes=processes) #設置開啓進程的數量 for i in xrange(processes): pool.apply_async(thread_web_socket) pool.close() pool.join()
這樣就完成一個高併發的websocket測試腳本,若是以爲這文章不錯的,請在本人Github上點個star,感謝!!python
參考資料:git