最近運維跟我反饋我負責的應用服務線上監控到消費RabbitMQ消息隊列過慢,目前只有20左右,監控平臺會有消息積壓的告警。html
開發修改了一版應用服務的版本,提交給我作壓測驗證。web
以前沒有作過消息中間件的壓測,網上找了一圈測試方法,而且和開發溝通,最終確認經過壓測RabbitMQ event消息處理的接口來完成本次的壓測驗證。併發
壓測腳本:運維
import pika import multiprocessing as mp import time def main(counter): routing_key = "busi.mc.event.XXXX" # 被壓測的應用服務的key,指定消息的消費者 credentials = pika.PlainCredentials('guest', 'guest') parameters = pika.ConnectionParameters('XXX.XX.XXX.XX', 5672, '/', credentials) connection = pika.BlockingConnection(parameters) # 鏈接 RabbitMQ channel = connection.channel() # 建立頻道 for i in range(1, counter): # 循環生產信息,供消費者(被壓測的應用服務)消費 channel.basic_publish(exchange='mc-direct-exchange', routing_key=routing_key, body='{"clientId":"5e8J8aoi4F380gpDS4sdfd","eventType":1}', properties=pika.BasicProperties( content_type="text/plain", delivery_mode=1)) time.sleep(0.1) # if counter % 600 == 0: # time.sleep(1) connection.close() # 關閉鏈接 def loop_test(counter): for i in range(1, counter): main() if counter % 100 == 0: time.sleep(1) # 單個頻率 if __name__ == "__main__": # Define an output queue output = mp.Queue() # Setup a list of processes that we want to run processes = [mp.Process(target=main, args=(100000,)) for x in range(20)] # 消息總條數 併發數 # Run processes for p in processes: p.start() # Exit the completed processes for p in processes: p.join() # Get process results from the output queue # results = [output.get() for p in processes] # print(results)
腳本運行後,經過RabbitMQ的web管理後臺,查看消費消息的TPS已經能夠穩定在200左右,本次驗證經過了~~oop