一、rpc介紹python
二、grpclinux
三、基於grpc協議文件傳輸shell
四、基於grpc協議jmeter壓測獲取實時結果apache
五、基於grcp協議獲取jmeter最終壓測報告,並將報告保存至client端json
六、壓測中途中止jmeter
vim
grpc server (jmeter server) 192.168.18.128 rpc client (本機)
一、rpc協議介紹後端
RPC(Remote Procedure Call Protocol)——遠程過程調用協議,它是一種經過網絡從遠程計算機程序上請求服務,而不須要了解底層網絡技術的協議。RPC協議假定某些傳輸協議的存在,如TCP或UDP,爲通訊程序之間攜帶信息數據。在OSI網絡通訊模型中,RPC跨越了傳輸層和應用層。RPC使得開發包括網絡分佈式多程序在內的應用程序更加容易。bash
RPC採用客戶機/服務器模式。請求程序就是一個客戶機,而服務提供程序就是一個服務器。首先,客戶機調用進程發送一個有進程參數的調用信息到服務進程,而後等待應答信息。在服務器端,進程保持睡眠狀態直到調用信息到達爲止。當一個調用信息到達,服務器得到進程參數,計算結果,發送答覆信息,而後等待下一個調用信息,最後,客戶端調用進程接收答覆信息,得到進程結果,而後調用執行繼續進行。 -- 摘自 百度百科服務器
二、grpc介紹網絡
gRPC 是一款高性能、開源的 RPC 框架,產自 Google,基於 ProtoBuf 序列化協議進行開發,支持多種語言(Golang、Python、Java等),本篇只介紹 Python 的 gRPC 使用。由於 gRPC 對 HTTP/2 協議的支持使其在 Android、IOS 等客戶端後端服務的開發領域具備良好的前景。gRPC 提供了一種簡單的方法來定義服務,同時客戶端能夠充分利用 HTTP2 stream 的特性,從而有助於節省帶寬、下降 TCP 的鏈接次數、節省CPU的使用等。
三、基於grpc協議文件傳輸
目標:將本地的文件(test.file)傳輸至rpc server (192.168.18.128)
python 版本 2.7.13
3.一、
gRPC 的安裝:
[root@vm6 rpc]# pip2.7 install requests [root@vm6 rpc]# pip2.7 install grpcio
安裝 ProtoBuf 相關的 python 依賴庫:
[root@vm6 rpc]# pip2.7 install protobuf
安裝 python grpc 的 protobuf 編譯工具:
[root@vm6 rpc]# pip2.7 install grpcio-tools
定義 gRPC 接口:
[root@vm6 ~]# mkdir rpc rpc/__init__.py [root@vm6 ~]# cd rpc/ [root@vm6 rpc]# vim rpc.proto // grpc 版本 syntax = "proto3"; package rpc; //定義接口 service RPC { //至關於定義接口方法 rpc sendConfFile(Content) returns (Status) {} } //至關於定義類屬性,此屬性用於接受文本 message Content { string text = 1; } //此屬性用於return 結果狀態嗎 message Status { int64 code = 1; }
編譯 protobuf:
[root@vm6 rpc]# python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. ./rpc.proto
server 端代碼:
# -*- encoding=utf-8 -*- from concurrent import futures import time import subprocess import codecs import sys import os import grpc import rpc_pb2 import rpc_pb2_grpc _ONE_DAY_IN_SECONDS = 60 * 60 * 24 #server端文件保存的位置 jmeter_config = os.path.join(os.getcwd(),r'conf/config.jmx') class Performance(rpc_pb2_grpc.RPCServicer): def sendConfFile(self, content,context): ''' 保存配置文件,如config.jmx ''' text = content.text try: print jmeter_config conf_handle = codecs.open(jmeter_config,'w',encoding='utf-8') conf_handle.write(text) return rpc_pb2.Status(code=0) except Exception,e: print e return rpc_pb2.Status(code=1) def serve(): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) rpc_pb2_grpc.add_RPCServicer_to_server(Performance(),server) server.add_insecure_port('[::]:50051') server.start() try: while True: time.sleep(_ONE_DAY_IN_SECONDS) except KeyboardInterrupt: server.stop(0) if __name__ == '__main__': serve()
client 端代碼:
# -*- encoding=utf-8 -*- from concurrent import futures import time import subprocess import codecs import sys import os import logging import json import grpc import rpc_pb2 import rpc_pb2_grpc logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%a, %d %b %Y %H:%M:%S', filename='rpc.log', filemode='a+') rpc_server = r'192.168.18.128' rpc_port = '50051' class performance(): '''性能測試的客戶端接口''' def __init__(self,ip,port): '''初始化,鏈接RPC服務''' logging.info("performance_client init") conn = grpc.insecure_channel(ip + ':' + port) self.stub_client = rpc_pb2_grpc.RPCStub(channel=conn) def sendConfig(self,filename): file_handle = codecs.open(filename, 'r', encoding='utf-8') content = file_handle.read() '''向RPC server發送測試的配置文件''' response = self.stub_client.sendConfFile(rpc_pb2.Content(text=content)) print response.code if __name__ == '__main__': client = performance(rpc_server,rpc_port) ##將本地的rpc_client.py文件當作測試文件發送到server端 client.sendConfig('rpc_client.py')
啓動server端:
[root@vm6 rpc]# python rpc_server.py
啓動client端:
D:\xisuo\rpc>python rpc_client.py 0
查看server端的文件:
[root@vm6 rpc]# more conf/config.jmx # -*- encoding=utf-8 -*- from concurrent import futures import time import subprocess import codecs import sys import os import logging 。。。略
四、基於grpc協議jmeter壓測獲取實時結果
4.一、server端部署jdk,jmeter
將jdk解壓到/usr/local/ 配置環境變量
將jmeter解壓到/usr/local
4.二、由於咱們是有jmeter的no gui模式在Linux執行,故須要jmeter的jmx文件,咱們能夠在本地使用gui模式先生成jmx文件
最終結果文件爲 test.jmx,內容以下:
<?xml version="1.0" encoding="UTF-8"?> <jmeterTestPlan version="1.2" properties="2.4" jmeter="2.9 r1437961"> <hashTree> <TestPlan guiclass="TestPlanGui" testclass="TestPlan" testname="測試計劃" enabled="true"> <stringProp name="TestPlan.comments"></stringProp> <boolProp name="TestPlan.functional_mode">false</boolProp> <boolProp name="TestPlan.serialize_threadgroups">false</boolProp> <elementProp name="TestPlan.user_defined_variables" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="用戶定義的變量" enabled="true"> <collectionProp name="Arguments.arguments"/> </elementProp> <stringProp name="TestPlan.user_define_classpath"></stringProp> </TestPlan> <hashTree> <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="測試" enabled="true"> <stringProp name="TestPlan.comments">測試</stringProp> <stringProp name="ThreadGroup.on_sample_error">continue</stringProp> <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="循環控制器" enabled="true"> <boolProp name="LoopController.continue_forever">false</boolProp> <stringProp name="LoopController.loops">50</stringProp> </elementProp> <stringProp name="ThreadGroup.num_threads">2</stringProp> <stringProp name="ThreadGroup.ramp_time">1</stringProp> <longProp name="ThreadGroup.start_time">1496464278000</longProp> <longProp name="ThreadGroup.end_time">1496464278000</longProp> <boolProp name="ThreadGroup.scheduler">false</boolProp> <stringProp name="ThreadGroup.duration"></stringProp> <stringProp name="ThreadGroup.delay"></stringProp> </ThreadGroup> <hashTree> <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="HTTP請求" enabled="true"> <elementProp name="HTTPsampler.Arguments" elementType="Arguments" guiclass="HTTPArgumentsPanel" testclass="Arguments" testname="用戶定義的變量" enabled="true"> <collectionProp name="Arguments.arguments"/> </elementProp> <stringProp name="HTTPSampler.domain">lansgg.blog.51cto.com</stringProp> <stringProp name="HTTPSampler.port">80</stringProp> <stringProp name="HTTPSampler.connect_timeout"></stringProp> <stringProp name="HTTPSampler.response_timeout"></stringProp> <stringProp name="HTTPSampler.protocol"></stringProp> <stringProp name="HTTPSampler.contentEncoding"></stringProp> <stringProp name="HTTPSampler.path"></stringProp> <stringProp name="HTTPSampler.method">GET</stringProp> <boolProp name="HTTPSampler.follow_redirects">true</boolProp> <boolProp name="HTTPSampler.auto_redirects">false</boolProp> <boolProp name="HTTPSampler.use_keepalive">true</boolProp> <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp> <stringProp name="HTTPSampler.implementation">HttpClient4</stringProp> <boolProp name="HTTPSampler.monitor">false</boolProp> <stringProp name="HTTPSampler.embedded_url_re"></stringProp> <stringProp name="TestPlan.comments">HTTP請求</stringProp> </HTTPSamplerProxy> <hashTree/> <ResultCollector guiclass="ViewResultsFullVisualizer" testclass="ResultCollector" testname="察看結果樹" enabled="true"> <boolProp name="ResultCollector.error_logging">false</boolProp> <objProp> <name>saveConfig</name> <value class="SampleSaveConfiguration"> <time>true</time> <latency>true</latency> <timestamp>true</timestamp> <success>true</success> <label>true</label> <code>true</code> <message>true</message> <threadName>true</threadName> <dataType>true</dataType> <encoding>false</encoding> <assertions>true</assertions> <subresults>true</subresults> <responseData>false</responseData> <samplerData>false</samplerData> <xml>false</xml> <fieldNames>false</fieldNames> <responseHeaders>false</responseHeaders> <requestHeaders>false</requestHeaders> <responseDataOnError>false</responseDataOnError> <saveAssertionResultsFailureMessage>false</saveAssertionResultsFailureMessage> <assertionsResultsToSave>0</assertionsResultsToSave> <bytes>true</bytes> </value> </objProp> <stringProp name="TestPlan.comments">察看結果樹</stringProp> <stringProp name="filename"></stringProp> </ResultCollector> <hashTree/> </hashTree> </hashTree> </hashTree> </jmeterTestPlan>
當咱們在linux終端上執行時,結果以下,
[root@vm6 rpc]# /usr/local/apache-jmeter-3.2/bin/jmeter -n -t test.jmx -l text.jtl Creating summariser <summary> Created the tree successfully using test.jmx Starting the test @ Sat Jun 03 20:41:25 CST 2017 (1496493685813) Waiting for possible Shutdown/StopTestNow/Heapdump message on port 4445 summary + 46 in 00:00:18 = 2.5/s Avg: 2745 Min: 423 Max: 3135 Err: 0 (0.00%) Active: 49 Started: 50 Finished: 1 summary + 54 in 00:00:08 = 7.2/s Avg: 2018 Min: 413 Max: 8000 Err: 0 (0.00%) Active: 0 Started: 50 Finished: 50 summary = 100 in 00:00:26 = 3.9/s Avg: 2353 Min: 413 Max: 8000 Err: 0 (0.00%) Tidying up ... @ Sat Jun 03 20:42:07 CST 2017 (1496493727553) ... end of run
而咱們要將這些結果實時的在rpc client展現出來
接口文件:
syntax = "proto3"; package rpc; service RPC { // send config jmx rpc sendConfFile(Content) returns (Status) {} // run jmeter test rpc runJMeter(Content) returns (stream Content) {} } message Content { string text = 1; } message Status { int64 code = 1; }
server端:
# -*- encoding=utf-8 -*- from concurrent import futures import time import subprocess import codecs import sys import os import json import logging import grpc import rpc_pb2 import rpc_pb2_grpc logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%a, %d %b %Y %H:%M:%S', filename='rpc_server.log', filemode='a+') _ONE_DAY_IN_SECONDS = 60 * 60 * 24 jmeter_config = os.path.join(os.getcwd(),r'conf/config.jmx') jmeter_path = r'/usr/local/apache-jmeter-3.2/bin/jmeter' jmeter_result = os.path.join(os.getcwd(),r'result/result.jtl') class Performance(rpc_pb2_grpc.RPCServicer): def sendConfFile(self, content,context): ''' 保存配置文件,如config.jmx ''' text = content.text try: conf_handle = codecs.open(jmeter_config,'w',encoding='utf-8') conf_handle.write(text) logging.info("sendConfFile Success!") return rpc_pb2.Status(code=0) except Exception,e: print e return rpc_pb2.Status(code=1) def runJMeter(self, content,context): logging.info("begin runJmeter.") '''執行測試任務,並將實時結果返回''' iplist = content.text if iplist: cmd = jmeter_path + " -n -t " + jmeter_config + " -l " + jmeter_result + " -R " + iplist else: cmd = jmeter_path + " -n -t " + jmeter_config + " -l " + jmeter_result logging.info(cmd) popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, universal_newlines=True,shell=True) for stdout_line in iter(popen.stdout.readline, ""): log_line = rpc_pb2.Content(text=stdout_line) yield log_line popen.stdout.close() return_code = popen.wait() if return_code: logging.warn(return_code) raise subprocess.CalledProcessError(return_code, cmd) def serve(): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) rpc_pb2_grpc.add_RPCServicer_to_server(Performance(),server) server.add_insecure_port('[::]:50051') server.start() try: while True: time.sleep(_ONE_DAY_IN_SECONDS) except KeyboardInterrupt: server.stop(0) if __name__ == '__main__': serve()
client端:
# -*- encoding=utf-8 -*- from concurrent import futures import time import subprocess import codecs import sys import os import logging import json import grpc import rpc_pb2 import rpc_pb2_grpc logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%a, %d %b %Y %H:%M:%S', filename='rpc.log', filemode='a+') rpc_server = r'192.168.18.128' rpc_port = '50051' jmeter_config = os.path.join(os.getcwd(),r'conf/config.jmx') jmeter_path = r'/usr/local/apache-jmeter-3.2/bin/jmeter' jmeter_result = os.path.join(os.getcwd(),r'conf/result.jtl') class performance(): '''性能測試的客戶端接口''' def __init__(self,ip,port): '''初始化,鏈接RPC服務''' logging.info("performance_client init") conn = grpc.insecure_channel(ip + ':' + port) self.stub_client = rpc_pb2_grpc.RPCStub(channel=conn) def sendConfig(self,filename): file_handle = codecs.open(filename, 'r', encoding='utf-8') content = file_handle.read() '''向RPC server發送測試的配置文件''' response = self.stub_client.sendConfFile(rpc_pb2.Content(text=content)) return response.code def runJmeter(self,iplist): '''運行測試 返回一個生成器,內容爲測試過程當中的實時輸出''' content = iplist logging.info("iplist %s" %content) for log in self.stub_client.runJMeter(rpc_pb2.Content(text=content)): yield log if __name__ == '__main__': client = performance(rpc_server,rpc_port) code = client.sendConfig('test.jmx') iplist = r'10.1.1.1,10.1.1.2' for real_time_results in client.runJmeter(None): print "get realtime log from server : %s" % real_time_results.text
結果:
五、基於grcp協議獲取jmeter最終壓測報告,並將報告保存至client端
六、壓測中途中止jmeter
接口文件
syntax = "proto3"; package rpc; service RPC { // send config jmx rpc sendConfFile(Content) returns (Status) {} // run jmeter test rpc runJMeter(Content) returns (stream Content) {} // generateResult rpc generateResult(empty) returns (stream Content) {} // getResult rpc getResult(empty) returns (stream Content) {} // getResult rpc stopJMeter(empty) returns (stream Content) {} } message Content { string text = 1; } message Status { int64 code = 1; } message empty { }
server端:
# -*- encoding=utf-8 -*- from concurrent import futures import time import subprocess import codecs import sys import os import json import logging import grpc import rpc_pb2 import rpc_pb2_grpc logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%a, %d %b %Y %H:%M:%S', filename='rpc_server.log', filemode='a+') _ONE_DAY_IN_SECONDS = 60 * 60 * 24 jmeter_config = os.path.join(os.getcwd(),r'conf/config.jmx') jmeter_path = r'/usr/local/apache-jmeter-3.2/bin/jmeter' jmeter_result_file = os.path.join(os.getcwd(),r'result/result.jtl') jmeter_result_dir = os.path.join(os.getcwd(),r'result/summary/') class Performance(rpc_pb2_grpc.RPCServicer): def sendConfFile(self, content,context): ''' 保存配置文件,如config.jmx ''' text = content.text try: conf_handle = codecs.open(jmeter_config,'w',encoding='utf-8') conf_handle.write(text) logging.info("sendConfFile Success!") return rpc_pb2.Status(code=0) except Exception,e: print e return rpc_pb2.Status(code=1) def runJMeter(self, content,context): logging.info("begin runJmeter.") '''執行測試任務,並將實時結果返回''' iplist = content.text if iplist: cmd = jmeter_path + " -n -t " + jmeter_config + " -l " + jmeter_result_file + " -R " + iplist else: cmd = jmeter_path + " -n -t " + jmeter_config + " -l " + jmeter_result_file logging.info(cmd) popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, universal_newlines=True,shell=True) for stdout_line in iter(popen.stdout.readline, ""): log_line = rpc_pb2.Content(text=stdout_line) yield log_line popen.stdout.close() return_code = popen.wait() if return_code: logging.warn(return_code) raise subprocess.CalledProcessError(return_code, cmd) def generateResult(self,empty,context): '''調用jmeter,生成彙總測試結果 ''' cmd = jmeter_path + " -g " + jmeter_result_file + " -o " + jmeter_result_dir logging.info(cmd) popen = subprocess.Popen(cmd, stdout=subprocess.PIPE,shell=True) return_code = popen.wait() return rpc_pb2.Status(code=return_code) def getResult(self, empty, content): '''獲取彙總測試結果,返回給客戶端 ''' summary_file = os.path.join(jmeter_result_dir,r"content/js/dashboard.js") file_handle = codecs.open(summary_file,'r',encoding='utf-8') for line in file_handle.readlines(): yield rpc_pb2.Content(text=line) file_handle.close() def stopJMeter(self, empty, content): '''殺死正在執行的任務''' os.system("ps -ef | grep jmeter | grep -v grep | awk '{print $2}'| xargs kill -9") return rpc_pb2.Status(code=1) def serve(): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) rpc_pb2_grpc.add_RPCServicer_to_server(Performance(),server) server.add_insecure_port('[::]:50051') server.start() try: while True: time.sleep(_ONE_DAY_IN_SECONDS) except KeyboardInterrupt: server.stop(0) if __name__ == '__main__': serve()
client端:
# -*- encoding=utf-8 -*- from concurrent import futures import time import subprocess import codecs import sys import os import logging import json import grpc import rpc_pb2 import rpc_pb2_grpc logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%a, %d %b %Y %H:%M:%S', filename='rpc.log', filemode='a+') rpc_server = r'192.168.18.128' rpc_port = '50051' jmeter_config = os.path.join(os.getcwd(),r'conf/config.jmx') jmeter_path = r'/usr/local/apache-jmeter-3.2/bin/jmeter' jmeter_result = os.path.join(os.getcwd(),r'conf/result.jtl') class performance(): '''性能測試的客戶端接口''' def __init__(self,ip,port): '''初始化,鏈接RPC服務''' logging.info("performance_client init") conn = grpc.insecure_channel(ip + ':' + port) self.stub_client = rpc_pb2_grpc.RPCStub(channel=conn) def sendConfig(self,filename): file_handle = codecs.open(filename, 'r', encoding='utf-8') content = file_handle.read() '''向RPC server發送測試的配置文件''' response = self.stub_client.sendConfFile(rpc_pb2.Content(text=content)) return response.code def runJmeter(self,iplist): '''運行測試 返回一個生成器,內容爲測試過程當中的實時輸出''' content = iplist logging.info("iplist %s" %content) for log in self.stub_client.runJMeter(rpc_pb2.Content(text=content)): yield log def generateResult(self): '''在rpc server 端生成 jmeter 最終報告''' response = self.stub_client.generateResult(rpc_pb2.empty()) return response.code def getResult(self, local_file): '''從RPC server回傳測試結果,保存到本地文件local_file''' file_handle = codecs.open(local_file, 'w', encoding='utf-8') for line in self.stub_client.getResult(rpc_pb2.empty()): file_handle.write(line.text) file_handle.close() def stopJmeter(self): '''終止測試任務 ''' self.stub_client.stopJMeter(rpc_pb2.empty()) if __name__ == '__main__': client = performance(rpc_server,rpc_port) code = client.sendConfig('test.jmx') iplist = r'10.1.1.1,10.1.1.2' for real_time_results in client.runJmeter(None): print "get realtime log from server : %s" % real_time_results.text print client.generateResult() print client.getResult('2222222222222222') client.stopJmeter()
結果:
當你想中止jmeter壓測,調用stop便可。