websocket長鏈接壓力測試踩過的坑

Websocket協議壓測記錄python

背景:web

    公司的行情繫統是採用的websocket協議,有請求和訂閱兩種方式向服務器申請最新行情信息。請求方式是一次的,訂閱方式是創建鏈接後,服務器定時向客戶端推送行情信息。shell

初步測試方案:json

因考慮到websocket是雙工通信,是長鏈接,而且本次壓測的性能指標是系統能創建的最大鏈接數,而且是創建鏈接後服務器能持續向客戶端推送行情信息。後端

基於以上緣由考慮用python採用多線程創建鏈接,爲了驗證可否收到推送的信息,把返回的行情信息保存到文本文件中。Python腳本以下:服務器

 

import websocketwebsocket

import time多線程

import threading架構

import gzipapp

#import json

#from threadpool import ThreadPool, makeRequests

#from websocket import create_connection

 

SERVER_URL = "ws://pro-web-new.devtest.exshell-dev.com/r1/main/ws"

#SERVER_URL = "wss://i.cg.net/wi/ws"

#SERVER_URL = "wss://www.exshell.com/r1/main/ws"

def on_message(ws, message):

    print(message)

 

def on_error(ws, error):

    print(error)

 

def on_close(ws):

    print("### closed ###")

 

def on_open(ws):

    def send_trhead():

 

        send_info = '{"sub": "market.ethusdt.kline.1min","id": "id10"}'

        #send_info = '{"event":"subscribe", "channel":"btc_usdt.deep"}'

        while True:

            #time.sleep(5)

            #ws.send(json.dumps(send_info))

            ws.send(send_info)

            while (1):

                compressData = ws.recv()

                result = gzip.decompress(compressData).decode('utf-8')

                if result[:7] == '{"ping"':

                    ts = result[8:21]

                    pong = '{"pong":' + ts + '}'

                    ws.send(pong)

                    ws.send(send_info)

                else:

                    #print(result)

                    with open('./test_result.txt', 'a') as f:

                        f.write(threading.currentThread().name+'\n')

                        f.write(result+'\n')

 

    t = threading.Thread(target=send_trhead)

    t.start()

    print(threading.currentThread().name)

def on_start(a):

 

    # time.sleep(2)

    # websocket.enableTrace(True)

    # ws = websocket.WebSocketApp(SERVER_URL,

    #                             on_message=on_message,

    #                             on_error=on_error,

    #                             on_close=on_close)

    # ws.on_open = on_open

    # ws.run_forever()

    #print(a[2])

    try:

        ws = websocket.create_connection(SERVER_URL)

        on_open(ws)

    except Exception as e:

        print('error is :',e)

        print('connect ws error,retry...')

        time.sleep(5)

if __name__ == "__main__":

 

    # pool = ThreadPool(3)

    # test = list()

    # for ir in range(3):

    #     test.append(ir)

    #

    # requests = makeRequests(on_start, test)

    # [pool.putRequest(req) for req in requests]

    # pool.wait()

    # # #on_start(1)

 

    for ir in range(20):

        on_start(1)

        time.sleep(0.1)

 

初步測試結果:

    在壓測的過程當中,發現鏈接數達到必定程度(單機1400鏈接),鏈接就斷掉了,監控發現壓力機內存基本消耗光了,因創建鏈接,並接收返回的信息,隨着鏈接數增長,內存消耗大,只能斷開鏈接,釋放內存。

 

調整測試方案:

    和架構、開發討論後,準備在websocket客戶端採用AIO異步通信方式增大壓力,因當時是考慮到長鏈接未考慮這種方式,查詢資料,發現websocket服務端能夠採用AIO異步通信方式,在websocket客戶端嘗試一下,採用locust + python的方式,也查找了一些資料,發現方案可行。

    Locust是一款可擴展的,分佈式的,性能測試的,開源的,用Python編寫的性能測試工具。對於測試HTTP協議的接口是比較方便的,可是它也支持測試別的協議的接口,不過須要重寫Locust類。腳本以下:

 

from locust import Locust, events, task, TaskSet

import websocket

import time

import gzip

 

class WebSocketClient():

     def __init__(self, host, port):

         self.host = host

         self.port = port

 

class WebSocketLocust(Locust):

     def __init__(self, *args, **kwargs):

         self.client = WebSocketClient("172.31.15.85", 9503)

 

class UserBehavior(TaskSet):

 

    ws = websocket.WebSocket()

     #self.ws.connect("ws://10.98.64.103:8807")

     ws.connect("ws://pro-web-new.devtest.exshell-dev.com/r1/main/ws")

 

     @task(1)

     def buy(self):

         try:

             start_time = time.time()

 

             #self.ws.send('{"url":"/buy","data":{"id":"123","issue":"20170822","doubled_num":2}}')

            #result = self.ws.recv()

 

             send_info = '{"sub": "market.ethusdt.kline.1min","id": "id10"}'

             # send_info = '{"event":"subscribe", "channel":"btc_usdt.deep"}'

             while True:

                 # time.sleep(5)

                # ws.send(json.dumps(send_info))

                 ws.send(send_info)

                 while (1):

                    compressData = ws.recv()

                    result = gzip.decompress(compressData).decode('utf-8')

                     if result[:7] == '{"ping"':

                        ts = result[8:21]

                         pong = '{"pong":' + ts + '}'

                         ws.send(pong)

                        ws.send(send_info)

                     else:

                         # print(result)

                        with open('./test_result.txt', 'a') as f:

                             #f.write(threading.currentThread().name + '\n')

                             f.write(result + '\n')

         except Exception as e:

             print("error is:",e)

 

class ApiUser(WebSocketLocust):

    task_set = UserBehavior

     min_wait = 100

     max_wait = 200

 

 

 

用命令執行腳本:

Locust -f websocket_client_locust.py –-no-web -c 1 -r 1 -t 60s

單個用戶執行成功,並能生成文件。但多個用戶執行的時候就報錯,報錯信息以下:This socket is already used by another greenlet: <bound method Waiter.switch of <gevent.hub.Waiter object at 0x7f01f0594900>>

錯誤緣由說的是socke正在被使用,可是個人代碼中是新的socket,簡單分析了一下,應該不會出現問題,可是個人socek的使用部分是一個全局的client,而後程序運行的時候出現了上述錯誤.仔細推測我找出了緣由:

geven是個協程庫,那麼多個協程共用一個socek的時候就會出現上述錯誤了,因而我把socket改爲了局部的,問題解決.

修改前:

 

 

 

 

 

修改後:

 

 

 

 

修改後代碼:

from locust import Locust, events, task, TaskSet

import websocket

import time

import gzip

 

class WebSocketClient():

     def __init__(self, host):

         self.host = host

         #self.port = port

 

class WebSocketLocust(Locust):

     def __init__(self, *args, **kwargs):

         self.client = WebSocketClient("172.31.15.85")

 

class UserBehavior(TaskSet):

 

     # ws = websocket.WebSocket()

    # #self.ws.connect("ws://10.98.64.103:8807")

    # ws.connect("ws://pro-web-new.devtest.exshell-dev.com/r1/main/ws")

 

     @task(1)

     def buy(self):

         try:

            ws = websocket.WebSocket()

             # self.ws.connect("ws://10.98.64.103:8807")

             ws.connect("ws://pro-web-new.devtest.exshell-dev.com/r1/main/ws")

 

             start_time = time.time()

 

             #self.ws.send('{"url":"/buy","data":{"id":"123","issue":"20170822","doubled_num":2}}')

            #result = self.ws.recv()

 

            send_info = '{"sub": "market.ethusdt.kline.1min","id": "id10"}'

             # send_info = '{"event":"subscribe", "channel":"btc_usdt.deep"}'

             while True:

                 # time.sleep(5)

                # ws.send(json.dumps(send_info))

                 ws.send(send_info)

                 while (1):

                    compressData = ws.recv()

                    result = gzip.decompress(compressData).decode('utf-8')

                     if result[:7] == '{"ping"':

                        ts = result[8:21]

                         pong = '{"pong":' + ts + '}'

                         ws.send(pong)

                        ws.send(send_info)

                     # else:

                    #     # print(result)

                    #     with open('./test_result.txt', 'a') as f:

                    #         #f.write(threading.currentThread().name + '\n')

                    #         f.write(result + '\n')

         except Exception as e:

             print("error is:",e)

 

class ApiUser(WebSocketLocust):

    task_set = UserBehavior

     min_wait = 100

     max_wait = 200

 

 

 

壓測開始,隨着用戶數上升,壓力機端發生以下錯誤:500和502錯誤

 

 

 

這是協議進行握手時失敗,查詢後端行情應用服務器,也有大量報錯。

查看服務器發現打開最大文件數是1024,調整到65535,用以下命令調整:

 

第一步,修改/etc/sysctl.conf文件,在文件中添加以下行:

  net.ipv4.ip_local_port_range = 1024 65000

  這代表將系統對本地端口範圍限制設置爲1024~65000之間。請注意,本地端口範圍的最小值必須大於或等於1024;而端口範圍的最大值則應小於或等於65535.修改完後保存此文件。

  第二步,執行sysctl命令:

  [speng@as4 ~]$ sysctl -p

  若是系統沒有錯誤提示,就代表新的本地端口範圍設置成功。若是按上述端口範圍進行設置,則理論上單獨一個進程最多能夠同時創建60000多個TCP客戶端鏈接。

 

調整完成後複測,發現仍是報這個錯誤,請開發進行定位分析應用程序。

相關文章
相關標籤/搜索