python grpc 應用

一、rpc介紹python

二、grpclinux

三、基於grpc協議文件傳輸shell

四、基於grpc協議jmeter壓測獲取實時結果apache

五、基於grcp協議獲取jmeter最終壓測報告,並將報告保存至client端json

六、壓測中途中止jmeter
vim

grpc server (jmeter server)  192.168.18.128
rpc client (本機)

一、rpc協議介紹後端

RPC(Remote Procedure Call Protocol)——遠程過程調用協議,它是一種經過網絡從遠程計算機程序上請求服務,而不須要了解底層網絡技術的協議。RPC協議假定某些傳輸協議的存在,如TCP或UDP,爲通訊程序之間攜帶信息數據。在OSI網絡通訊模型中,RPC跨越了傳輸層和應用層。RPC使得開發包括網絡分佈式多程序在內的應用程序更加容易。bash

RPC採用客戶機/服務器模式。請求程序就是一個客戶機,而服務提供程序就是一個服務器。首先,客戶機調用進程發送一個有進程參數的調用信息到服務進程,而後等待應答信息。在服務器端,進程保持睡眠狀態直到調用信息到達爲止。當一個調用信息到達,服務器得到進程參數,計算結果,發送答覆信息,而後等待下一個調用信息,最後,客戶端調用進程接收答覆信息,得到進程結果,而後調用執行繼續進行。 -- 摘自 百度百科服務器

二、grpc介紹網絡

gRPC 是一款高性能、開源的 RPC 框架,產自 Google,基於 ProtoBuf 序列化協議進行開發,支持多種語言(Golang、Python、Java等),本篇只介紹 Python 的 gRPC 使用。由於 gRPC 對 HTTP/2 協議的支持使其在 Android、IOS 等客戶端後端服務的開發領域具備良好的前景。gRPC 提供了一種簡單的方法來定義服務,同時客戶端能夠充分利用 HTTP2 stream 的特性,從而有助於節省帶寬、下降 TCP 的鏈接次數、節省CPU的使用等。

三、基於grpc協議文件傳輸

目標:將本地的文件(test.file)傳輸至rpc server (192.168.18.128)

python 版本 2.7.13

3.一、

gRPC 的安裝:

[root@vm6 rpc]# pip2.7 install requests
[root@vm6 rpc]# pip2.7  install grpcio

安裝 ProtoBuf 相關的 python 依賴庫:

[root@vm6 rpc]# pip2.7 install protobuf

安裝 python grpc 的 protobuf 編譯工具:

[root@vm6 rpc]# pip2.7 install grpcio-tools

定義 gRPC 接口:

[root@vm6 ~]# mkdir rpc rpc/__init__.py
[root@vm6 ~]# cd rpc/
[root@vm6 rpc]# vim rpc.proto 
// grpc 版本
syntax = "proto3";   
package rpc;

//定義接口
service RPC {
//至關於定義接口方法 
    rpc sendConfFile(Content) returns (Status) {}
}

//至關於定義類屬性,此屬性用於接受文本
message  Content {
  string  text = 1;
}
//此屬性用於return 結果狀態嗎
message Status {
    int64 code = 1;
}

編譯 protobuf:

[root@vm6 rpc]# python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. ./rpc.proto

server 端代碼:

# -*- encoding=utf-8 -*-
from concurrent import futures
import time
import subprocess
import codecs
import sys
import os

import grpc
import rpc_pb2
import rpc_pb2_grpc

_ONE_DAY_IN_SECONDS = 60 * 60 * 24
#server端文件保存的位置
jmeter_config = os.path.join(os.getcwd(),r'conf/config.jmx')

class Performance(rpc_pb2_grpc.RPCServicer):

    def sendConfFile(self, content,context):
        ''' 保存配置文件,如config.jmx '''
        text = content.text
        try:
            print jmeter_config
            conf_handle = codecs.open(jmeter_config,'w',encoding='utf-8')
            conf_handle.write(text)
            return rpc_pb2.Status(code=0)
        except Exception,e:
            print e
            return rpc_pb2.Status(code=1)

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    rpc_pb2_grpc.add_RPCServicer_to_server(Performance(),server)
    server.add_insecure_port('[::]:50051')
    server.start()
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        server.stop(0)


if __name__ == '__main__':
    serve()

client 端代碼:

# -*- encoding=utf-8 -*-
from concurrent import futures
import time
import subprocess
import codecs
import sys
import os
import logging
import json
import grpc
import rpc_pb2
import rpc_pb2_grpc

logging.basicConfig(level=logging.DEBUG,
    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
    datefmt='%a, %d %b %Y %H:%M:%S',
    filename='rpc.log',
    filemode='a+')

rpc_server = r'192.168.18.128'
rpc_port = '50051'

class performance():

  '''性能測試的客戶端接口'''

  def __init__(self,ip,port):

    '''初始化,鏈接RPC服務'''

    logging.info("performance_client init")
    conn = grpc.insecure_channel(ip + ':' + port)
    self.stub_client = rpc_pb2_grpc.RPCStub(channel=conn)

  def sendConfig(self,filename):
      file_handle = codecs.open(filename, 'r', encoding='utf-8')
      content = file_handle.read()
      '''向RPC server發送測試的配置文件'''
      response =  self.stub_client.sendConfFile(rpc_pb2.Content(text=content))
      print response.code

if __name__ == '__main__':
    client = performance(rpc_server,rpc_port)
    ##將本地的rpc_client.py文件當作測試文件發送到server端
    client.sendConfig('rpc_client.py')

啓動server端:

[root@vm6 rpc]# python rpc_server.py

啓動client端:

D:\xisuo\rpc>python rpc_client.py
0

查看server端的文件:

[root@vm6 rpc]# more conf/config.jmx 
# -*- encoding=utf-8 -*-
from concurrent import futures
import time
import subprocess
import codecs
import sys
import os
import logging
。。。略

四、基於grpc協議jmeter壓測獲取實時結果

4.一、server端部署jdk,jmeter

將jdk解壓到/usr/local/ 配置環境變量

將jmeter解壓到/usr/local

4.二、由於咱們是有jmeter的no gui模式在Linux執行,故須要jmeter的jmx文件,咱們能夠在本地使用gui模式先生成jmx文件

最終結果文件爲 test.jmx,內容以下:

<?xml version="1.0" encoding="UTF-8"?>
<jmeterTestPlan version="1.2" properties="2.4" jmeter="2.9 r1437961">
  <hashTree>
    <TestPlan guiclass="TestPlanGui" testclass="TestPlan" testname="測試計劃" enabled="true">
      <stringProp name="TestPlan.comments"></stringProp>
      <boolProp name="TestPlan.functional_mode">false</boolProp>
      <boolProp name="TestPlan.serialize_threadgroups">false</boolProp>
      <elementProp name="TestPlan.user_defined_variables" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="用戶定義的變量" enabled="true">
        <collectionProp name="Arguments.arguments"/>
      </elementProp>
      <stringProp name="TestPlan.user_define_classpath"></stringProp>
    </TestPlan>
    <hashTree>
      <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="測試" enabled="true">
        <stringProp name="TestPlan.comments">測試</stringProp>
        <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
        <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="循環控制器" enabled="true">
          <boolProp name="LoopController.continue_forever">false</boolProp>
          <stringProp name="LoopController.loops">50</stringProp>
        </elementProp>
        <stringProp name="ThreadGroup.num_threads">2</stringProp>
        <stringProp name="ThreadGroup.ramp_time">1</stringProp>
        <longProp name="ThreadGroup.start_time">1496464278000</longProp>
        <longProp name="ThreadGroup.end_time">1496464278000</longProp>
        <boolProp name="ThreadGroup.scheduler">false</boolProp>
        <stringProp name="ThreadGroup.duration"></stringProp>
        <stringProp name="ThreadGroup.delay"></stringProp>
      </ThreadGroup>
      <hashTree>
        <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="HTTP請求" enabled="true">
          <elementProp name="HTTPsampler.Arguments" elementType="Arguments" guiclass="HTTPArgumentsPanel" testclass="Arguments" testname="用戶定義的變量" enabled="true">
            <collectionProp name="Arguments.arguments"/>
          </elementProp>
          <stringProp name="HTTPSampler.domain">lansgg.blog.51cto.com</stringProp>
          <stringProp name="HTTPSampler.port">80</stringProp>
          <stringProp name="HTTPSampler.connect_timeout"></stringProp>
          <stringProp name="HTTPSampler.response_timeout"></stringProp>
          <stringProp name="HTTPSampler.protocol"></stringProp>
          <stringProp name="HTTPSampler.contentEncoding"></stringProp>
          <stringProp name="HTTPSampler.path"></stringProp>
          <stringProp name="HTTPSampler.method">GET</stringProp>
          <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
          <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
          <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
          <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
          <stringProp name="HTTPSampler.implementation">HttpClient4</stringProp>
          <boolProp name="HTTPSampler.monitor">false</boolProp>
          <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
          <stringProp name="TestPlan.comments">HTTP請求</stringProp>
        </HTTPSamplerProxy>
        <hashTree/>
        <ResultCollector guiclass="ViewResultsFullVisualizer" testclass="ResultCollector" testname="察看結果樹" enabled="true">
          <boolProp name="ResultCollector.error_logging">false</boolProp>
          <objProp>
            <name>saveConfig</name>
            <value class="SampleSaveConfiguration">
              <time>true</time>
              <latency>true</latency>
              <timestamp>true</timestamp>
              <success>true</success>
              <label>true</label>
              <code>true</code>
              <message>true</message>
              <threadName>true</threadName>
              <dataType>true</dataType>
              <encoding>false</encoding>
              <assertions>true</assertions>
              <subresults>true</subresults>
              <responseData>false</responseData>
              <samplerData>false</samplerData>
              <xml>false</xml>
              <fieldNames>false</fieldNames>
              <responseHeaders>false</responseHeaders>
              <requestHeaders>false</requestHeaders>
              <responseDataOnError>false</responseDataOnError>
              <saveAssertionResultsFailureMessage>false</saveAssertionResultsFailureMessage>
              <assertionsResultsToSave>0</assertionsResultsToSave>
              <bytes>true</bytes>
            </value>
          </objProp>
          <stringProp name="TestPlan.comments">察看結果樹</stringProp>
          <stringProp name="filename"></stringProp>
        </ResultCollector>
        <hashTree/>
      </hashTree>
    </hashTree>
  </hashTree>
</jmeterTestPlan>

當咱們在linux終端上執行時,結果以下,

[root@vm6 rpc]# /usr/local/apache-jmeter-3.2/bin/jmeter -n -t test.jmx -l text.jtl

Creating summariser <summary>
Created the tree successfully using test.jmx
Starting the test @ Sat Jun 03 20:41:25 CST 2017 (1496493685813)
Waiting for possible Shutdown/StopTestNow/Heapdump message on port 4445
summary +     46 in 00:00:18 =    2.5/s Avg:  2745 Min:   423 Max:  3135 Err:     0 (0.00%) Active: 49 Started: 50 Finished: 1
summary +     54 in 00:00:08 =    7.2/s Avg:  2018 Min:   413 Max:  8000 Err:     0 (0.00%) Active: 0 Started: 50 Finished: 50
summary =    100 in 00:00:26 =    3.9/s Avg:  2353 Min:   413 Max:  8000 Err:     0 (0.00%)
Tidying up ...    @ Sat Jun 03 20:42:07 CST 2017 (1496493727553)
... end of run

而咱們要將這些結果實時的在rpc client展現出來

接口文件:

syntax = "proto3";
package rpc;


service RPC {
// send config jmx
    rpc sendConfFile(Content) returns (Status) {}
//  run jmeter test
    rpc runJMeter(Content) returns (stream Content) {}
}

message  Content {
  string  text = 1;
}

message Status {
    int64 code = 1;
}

server端:

# -*- encoding=utf-8 -*-
from concurrent import futures
import time
import subprocess
import codecs
import sys
import os
import json
import logging
import grpc
import rpc_pb2
import rpc_pb2_grpc

logging.basicConfig(level=logging.DEBUG,
    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
    datefmt='%a, %d %b %Y %H:%M:%S',
    filename='rpc_server.log',
    filemode='a+')

_ONE_DAY_IN_SECONDS = 60 * 60 * 24
jmeter_config = os.path.join(os.getcwd(),r'conf/config.jmx')
jmeter_path = r'/usr/local/apache-jmeter-3.2/bin/jmeter'
jmeter_result = os.path.join(os.getcwd(),r'result/result.jtl')

class Performance(rpc_pb2_grpc.RPCServicer):

    def sendConfFile(self, content,context):
        ''' 保存配置文件,如config.jmx '''
        text = content.text
        try:
            conf_handle = codecs.open(jmeter_config,'w',encoding='utf-8')
            conf_handle.write(text)
            logging.info("sendConfFile Success!")
            return rpc_pb2.Status(code=0)
        except Exception,e:
            print e
            return rpc_pb2.Status(code=1)

    def runJMeter(self, content,context):
        logging.info("begin runJmeter.")
        '''執行測試任務,並將實時結果返回'''
        iplist = content.text
        if iplist:
            cmd = jmeter_path + " -n -t " + jmeter_config + " -l " + jmeter_result + " -R "  +  iplist
        else:
            cmd = jmeter_path + " -n -t " + jmeter_config + " -l " + jmeter_result
        logging.info(cmd)
        popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, universal_newlines=True,shell=True)
        for stdout_line in iter(popen.stdout.readline, ""):
            log_line = rpc_pb2.Content(text=stdout_line)
            yield log_line
        popen.stdout.close()
        return_code = popen.wait()
        if return_code:
            logging.warn(return_code)
            raise subprocess.CalledProcessError(return_code, cmd)



def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    rpc_pb2_grpc.add_RPCServicer_to_server(Performance(),server)
    server.add_insecure_port('[::]:50051')
    server.start()
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        server.stop(0)


if __name__ == '__main__':
    serve()

client端:

# -*- encoding=utf-8 -*-
from concurrent import futures
import time
import subprocess
import codecs
import sys
import os
import logging
import json
import grpc
import rpc_pb2
import rpc_pb2_grpc

logging.basicConfig(level=logging.DEBUG,
    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
    datefmt='%a, %d %b %Y %H:%M:%S',
    filename='rpc.log',
    filemode='a+')

rpc_server = r'192.168.18.128'
rpc_port = '50051'
jmeter_config = os.path.join(os.getcwd(),r'conf/config.jmx')
jmeter_path = r'/usr/local/apache-jmeter-3.2/bin/jmeter'
jmeter_result = os.path.join(os.getcwd(),r'conf/result.jtl')

class performance():

  '''性能測試的客戶端接口'''

  def __init__(self,ip,port):

    '''初始化,鏈接RPC服務'''

    logging.info("performance_client init")
    conn = grpc.insecure_channel(ip + ':' + port)
    self.stub_client = rpc_pb2_grpc.RPCStub(channel=conn)

  def sendConfig(self,filename):
      file_handle = codecs.open(filename, 'r', encoding='utf-8')
      content = file_handle.read()
      '''向RPC server發送測試的配置文件'''
      response =  self.stub_client.sendConfFile(rpc_pb2.Content(text=content))
      return response.code


  def runJmeter(self,iplist):

      '''運行測試 返回一個生成器,內容爲測試過程當中的實時輸出'''
      content = iplist
      logging.info("iplist %s" %content)
      for log in self.stub_client.runJMeter(rpc_pb2.Content(text=content)):
          yield log


if __name__ == '__main__':
    client = performance(rpc_server,rpc_port)
    code = client.sendConfig('test.jmx')
    iplist = r'10.1.1.1,10.1.1.2'
    for real_time_results in client.runJmeter(None):
        print "get realtime log from server : %s" % real_time_results.text

結果:

wKioL1kybpPh0wajAAA2IbnRzKk342.png-wh_50

五、基於grcp協議獲取jmeter最終壓測報告,並將報告保存至client端

六、壓測中途中止jmeter

接口文件

syntax = "proto3";
package rpc;


service RPC {
// send config jmx
    rpc sendConfFile(Content) returns (Status) {}
//  run jmeter test
    rpc runJMeter(Content) returns (stream Content) {}
//  generateResult
    rpc generateResult(empty) returns (stream Content) {}
//  getResult
    rpc getResult(empty) returns (stream Content) {}
//  getResult
    rpc stopJMeter(empty) returns (stream Content) {}
}

message  Content {
  string  text = 1;
}

message Status {
    int64 code = 1;
}
message empty {

}

server端:

# -*- encoding=utf-8 -*-
from concurrent import futures
import time
import subprocess
import codecs
import sys
import os
import json
import logging
import grpc
import rpc_pb2
import rpc_pb2_grpc

logging.basicConfig(level=logging.DEBUG,
    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
    datefmt='%a, %d %b %Y %H:%M:%S',
    filename='rpc_server.log',
    filemode='a+')

_ONE_DAY_IN_SECONDS = 60 * 60 * 24
jmeter_config = os.path.join(os.getcwd(),r'conf/config.jmx')
jmeter_path = r'/usr/local/apache-jmeter-3.2/bin/jmeter'
jmeter_result_file = os.path.join(os.getcwd(),r'result/result.jtl')
jmeter_result_dir = os.path.join(os.getcwd(),r'result/summary/')

class Performance(rpc_pb2_grpc.RPCServicer):

    def sendConfFile(self, content,context):
        ''' 保存配置文件,如config.jmx '''
        text = content.text
        try:
            conf_handle = codecs.open(jmeter_config,'w',encoding='utf-8')
            conf_handle.write(text)
            logging.info("sendConfFile Success!")
            return rpc_pb2.Status(code=0)
        except Exception,e:
            print e
            return rpc_pb2.Status(code=1)

    def runJMeter(self, content,context):
        logging.info("begin runJmeter.")
        '''執行測試任務,並將實時結果返回'''
        iplist = content.text
        if iplist:
            cmd = jmeter_path + " -n -t " + jmeter_config + " -l " + jmeter_result_file + " -R "  +  iplist
        else:
            cmd = jmeter_path + " -n -t " + jmeter_config + " -l " + jmeter_result_file
        logging.info(cmd)
        popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, universal_newlines=True,shell=True)
        for stdout_line in iter(popen.stdout.readline, ""):
            log_line = rpc_pb2.Content(text=stdout_line)
            yield log_line
        popen.stdout.close()
        return_code = popen.wait()
        if return_code:
            logging.warn(return_code)
            raise subprocess.CalledProcessError(return_code, cmd)

    def generateResult(self,empty,context):
        '''調用jmeter,生成彙總測試結果
        '''
        cmd = jmeter_path + " -g " + jmeter_result_file + " -o " +  jmeter_result_dir
        logging.info(cmd)
        popen = subprocess.Popen(cmd, stdout=subprocess.PIPE,shell=True)
        return_code = popen.wait()
        return rpc_pb2.Status(code=return_code)

    def getResult(self, empty, content):
        '''獲取彙總測試結果,返回給客戶端
        '''
        summary_file = os.path.join(jmeter_result_dir,r"content/js/dashboard.js")
        file_handle = codecs.open(summary_file,'r',encoding='utf-8')
        for line in file_handle.readlines():
            yield rpc_pb2.Content(text=line)
        file_handle.close()

    def stopJMeter(self, empty, content):
        '''殺死正在執行的任務'''
        os.system("ps -ef | grep jmeter | grep -v grep  | awk '{print $2}'| xargs kill -9")
        return rpc_pb2.Status(code=1)



def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    rpc_pb2_grpc.add_RPCServicer_to_server(Performance(),server)
    server.add_insecure_port('[::]:50051')
    server.start()
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        server.stop(0)


if __name__ == '__main__':
    serve()

client端:

# -*- encoding=utf-8 -*-
from concurrent import futures
import time
import subprocess
import codecs
import sys
import os
import logging
import json
import grpc
import rpc_pb2
import rpc_pb2_grpc

logging.basicConfig(level=logging.DEBUG,
    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
    datefmt='%a, %d %b %Y %H:%M:%S',
    filename='rpc.log',
    filemode='a+')

rpc_server = r'192.168.18.128'
rpc_port = '50051'
jmeter_config = os.path.join(os.getcwd(),r'conf/config.jmx')
jmeter_path = r'/usr/local/apache-jmeter-3.2/bin/jmeter'
jmeter_result = os.path.join(os.getcwd(),r'conf/result.jtl')

class performance():

  '''性能測試的客戶端接口'''

  def __init__(self,ip,port):

    '''初始化,鏈接RPC服務'''

    logging.info("performance_client init")
    conn = grpc.insecure_channel(ip + ':' + port)
    self.stub_client = rpc_pb2_grpc.RPCStub(channel=conn)

  def sendConfig(self,filename):
      file_handle = codecs.open(filename, 'r', encoding='utf-8')
      content = file_handle.read()
      '''向RPC server發送測試的配置文件'''
      response =  self.stub_client.sendConfFile(rpc_pb2.Content(text=content))
      return response.code


  def runJmeter(self,iplist):

      '''運行測試 返回一個生成器,內容爲測試過程當中的實時輸出'''
      content = iplist
      logging.info("iplist %s" %content)
      for log in self.stub_client.runJMeter(rpc_pb2.Content(text=content)):
          yield log

  def generateResult(self):
      '''在rpc server 端生成 jmeter 最終報告'''
      response = self.stub_client.generateResult(rpc_pb2.empty())
      return response.code

  def getResult(self, local_file):
      '''從RPC server回傳測試結果,保存到本地文件local_file'''
      file_handle = codecs.open(local_file, 'w', encoding='utf-8')
      for line in self.stub_client.getResult(rpc_pb2.empty()):
          file_handle.write(line.text)
      file_handle.close()

  def stopJmeter(self):
    '''終止測試任務
    '''
    self.stub_client.stopJMeter(rpc_pb2.empty())

if __name__ == '__main__':
    client = performance(rpc_server,rpc_port)
    code = client.sendConfig('test.jmx')
    iplist = r'10.1.1.1,10.1.1.2'
    for real_time_results in client.runJmeter(None):
        print "get realtime log from server : %s" % real_time_results.text
    print client.generateResult()
    print client.getResult('2222222222222222')
    client.stopJmeter()

結果:

wKiom1kyhayBlly7AAAdbAQudMk914.png-wh_50

當你想中止jmeter壓測,調用stop便可。

相關文章
相關標籤/搜索