使用django+rpc進行服務內部交互

1、爲何使用rpc。django

1)相比uwsgi,使用rpc的長鏈接能夠不須要頻繁建立鏈接,提升傳輸效率。json

2)rpc支持同步和異步,對於不須要等待返回的消息能夠不等待返回繼續運行,減小客戶端等待時間。app

3)使用rpc入口是咱們本身定義的,能夠根據不一樣消息類型定製不一樣的策略。dom

 

2、設計思路異步

使用統一入口,採用django的url resolve匹配,而後完成調用,不改變django rest接口的開發模式。性能

服務端處理採用同步異步分離,異步任務用單獨的進程處理,併爲異步任務制定處理策略:ui

1)對於同步任務,仍然須要當即調用返回。this

2)對於異步任務,能夠進行任務分級:url

      一級是重要任務,屬於系統能力不足時必須優先保障的;spa

      二級任務,在系統能力足夠時仍然須要執行,一旦能力不足,優先保障一級任務;

3)對異步任務,制定執行策略:

      一是必須執行的任務,這部分任務即便積壓也有一條條所有執行完成;

      二是隻須要執行最後一條的,常見於更新信息,對於積壓多條的同一消息,丟棄前面的,保留最後一條;

      三是可丟棄的,遇到性能不足,這一類消息不執行,直接丟棄。

 

3、 grpc的proto文件

syntax = "proto3";
package rpc;
service RPCServer {
  rpc handel(Input) returns (Output){}
}

message Input {
  string params = 1;
}

message Output {
  string content = 1;
}

入參爲Input,返回爲Output,全部接口調用都走這邊。

 

4、客戶端調用

import grpc
import time
import json
import traceback
import threading
import uuid
from datetime import datetime

from . import data_pb2, data_pb2_grpc

_HOST = ''
_PORT = ''
CHANNEL = grpc.insecure_channel(_HOST + ':' + _PORT)


class ManoEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return str(obj)
        if isinstance(obj, uuid.UUID):
            return str(obj)
        return json.JSONEncoder.default(self, obj)


def mano_encode(data):
    return json.dumps(data, cls=ManoEncoder)


def call_rpc(url, headers, resource, content, logger):
    try:
        params = json.dumps({
            'url': url,
            'headers': headers,
            'method': resource['method'],
            'content': content
        })
        timeout = resource.get('timeout', 5)
        client = data_pb2_grpc.RPCServerStub(CHANNEL)
        response = client.handel.future(data_pb2.Input(params=params), timeout)
        while not response.done():
            time.sleep(0.01)
        result = json.loads(response.result().content)
        print(result['status_code'])
        return result['status_code'], mano_encode(result['data'])
    except Exception as err:
        logger.error(traceback.format_exc())
        logger.error('call url %s failed, msg is %s' % (url, err.message))
        return '409', err.message

入參params需包含:rest url,頭信息headers,rest類型,以及request body;

結果採用異步獲取,不持續佔用鏈接,對於不須要結果的,能夠不等待,這邊沒寫。

 

5、服務端實現

import os
import django

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "*.settings")
django.setup()

import grpc
import json
import time
import random
import traceback
import threading
import uuid
import logging
from datetime import datetime

from concurrent import futures
from multiprocessing import Process, Queue, Value
from Queue import Queue as ManoQueue

from . import data_pb2, data_pb2_grpc
from django.urls import get_resolver
from django.utils.functional import cached_property

_ONE_DAY_IN_SECONDS = 60 * 60 * 24
_HOST = '[::]'
_PORT = '12330'
_PROCESS_COUNT = 2
RESOLVER = get_resolver()
logger = logging.getLogger(__name__)

message_queue = Queue()  # 異步任務隊列,用於進程通訊
status_level2 = Value('I', 1)  # 二級隊列狀態,用於進程通訊


class ManoEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return str(obj)
        if isinstance(obj, uuid.UUID):
            return str(obj)
        return json.JSONEncoder.default(self, obj)


def mano_encode(data):
    return json.dumps(data, cls=ManoEncoder)


class RPCServer(data_pb2_grpc.RPCServerServicer):
    def handel(self, request, context):
        input_info = json.loads(request.params)
        if input_info.get('reply', True) is True:  # reply爲True表明同步,不然異步
            res_url = input_info['url']
            headers = input_info['headers']
            method = input_info['method']
            content = input_info['content']
            status_code, data = self.call_sync(res_url, headers, method, content)
            return data_pb2.Output(content=mano_encode({'data': data, 'status_code': status_code}))
        else:
            if input_info['queue_detail']['level'] == 2 and not status_level2:
                data = 'queue of status level2 is not active'
                status_code = '409'
            else:
                message_queue.put(request.params)
                data = 'success'
                status_code = '201'
            return data_pb2.Output(content=mano_encode({'data': data, 'status_code': status_code}))

    @staticmethod
    def call_sync(res_url, headers, method, content):
        try:
            resp_status, resp_body = call_inner(res_url, headers, method, content, logger)
            return resp_status, resp_body
        except Exception as err:
            logger.error(traceback.format_exc())
            logger.error('call url %s failed, msg is %s' % (res_url, err.message))
            return '409', err.message


def main():  # rpc 服務主進程
    bind_address = '%s:%s' % (_HOST, _PORT)
    _run_server(bind_address)  # 啓動rpc進程
    _run_queue_process()  # 啓動異步任務處理進程


def _run_server(bind_address):
    grpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=100, ))
    data_pb2_grpc.add_RPCServerServicer_to_server(RPCServer(), grpc_server)
    grpc_server.add_insecure_port(bind_address)
    grpc_server.start()


def _run_queue_process():
    worker = Process(target=_handle_no_wait_request, args=(message_queue, status_level2,))
    worker.start()
    worker.join()


def _handle_no_wait_request(q, status_2):  # 異步任務分類
    first_order_queue = [ManoQueue(maxsize=0), list()]
    second_order_queue = [ManoQueue(maxsize=1000), list()]
    mano_queue = [first_order_queue, second_order_queue]
    thread_pool = futures.ThreadPoolExecutor(max_workers=50)
    threading.Thread(target=_start_message_monitor, args=(q, mano_queue, status_2,)).start()  # 根據策略進行異步任務分類
    while True:
        num_threads = len(thread_pool._threads)
        if num_threads < 50:
            input_info = _get_request(mano_queue)  # 獲取本次需執行的任務,每一個隊列機會均等
            res_url = input_info['url']
            headers = input_info['headers']
            method = input_info['method']
            content = input_info['content']
            thread_pool.submit(RPCServer.call_sync, res_url, headers, method, content)  # 交給工做線程
            logger.info('handle success')
        else:
            logger.info('process busy')
            time.sleep(0.1)


def _start_message_monitor(q, mano_queue, status_2):
    while True:
        data = q.get()
        _handel_by_queue(data, mano_queue, status_2)


def _get_request(mano_queue):
    active_index = _get_active_queue(mano_queue)
    if active_index:
        index = random.choice(active_index)
        i, k = int(index.split('_')[0]), int(index.split('_')[1])
        q = mano_queue[i][k]
        if isinstance(q, ManoQueue):
            request_info = json.loads(q.get())
        else:
            request_info = json.loads(q.pop(0))
    else:
        request_info = {}
    return request_info


def _get_active_queue(mano_queue):
    active_index = []
    if not mano_queue[0][0].empty():
        active_index.append('0_0')
    if not mano_queue[1][0].empty():
        active_index.append('1_0')
    if len(mano_queue[0][1]) != 0:
        active_index.append('0_1')
    if len(mano_queue[1][1]) != 0:
        active_index.append('1_1')
    return active_index


def _handel_by_queue(data, mano_queue, status_2):  # 根據請求級別進行消息分類
    input_info = json.loads(data)
    level = input_info['queue_detail']['level']
    policy = input_info['queue_detail']['limit_policy']
    if level == 1:
        _handel_by_policy(mano_queue[0], policy, data)
    elif level == 2:
        request_queue = mano_queue[1]
        _handel_by_policy(mano_queue[1], policy, data)
        if request_queue[0].qsize() > 0.8 * request_queue[0].maxsize:
            status_2.value = 0
        elif request_queue[0].qsize() < 0.6 * request_queue[0].maxsize:
            status_2.value = 1


def _handel_by_policy(request_queue, policy, data):  # 根據請求策略進行消息分類
    if policy == 'execute':  # 必須執行的異步任務
        request_queue[0].put(data)
    elif policy == 'last':  # 阻塞時能夠只執行最後一次的異步任務
        try:
            while True:
                request_queue[1].remove(data)
        except ValueError:
            request_queue[1].append(data)
    else:  # 阻塞時能夠丟棄的異步任務
        if request_queue[0].qsize < request_queue[0].maxsize * 0.6:
            request_queue[0].put(data)  # 先丟棄前面的


def call_inner(res_url, headers, method, content, logger):
    logger.info('[call_inner] url is %s' % res_url)
    url, params = get_url_and_params(res_url)
    meta = get_meta(headers)
    request = Request(url=url, full_url=res_url, params=params, content=content, meta=meta, method=method)
    resolver_match = RESOLVER.resolve(url)  # URL 匹配
    callback, callback_args, callback_kwargs = resolver_match
    call_method = getattr(callback.view_class(), method.lower())
    if not method:
        return '404', 'not support this operate'
    try:
        if callback_kwargs:
            result = call_method(request, '', **callback_kwargs)
        else:
            result = call_method(request)
    except BaseException as err:
        logger.error(traceback.format_exc())
        logger.error('call url %s failed, msg is %s' % (res_url, err.message))
        return '409', err.message
    return str(result.status_code), result.data


def get_url_and_params(full_url):
    params = {}
    if '?' in full_url:
        url, params_str = full_url.split('?')[0], full_url.split('?')[1]
        for key_value in params_str.split('&'):
            key, value = key_value.split('=')[0], key_value.split('=')[1]
            params[key] = value
    else:
        url = full_url
    return url, params


def get_meta(headers):
    meta = {}
    # custom
    return meta


class Request(object):
    def __init__(self, **kwargs):
        self.data = self.get_content(kwargs['content'])
        self.query_params = kwargs['params']
        self.path = kwargs['url']
        self.full_path = kwargs['full_url']
        self.FILES = {}
        self.META = kwargs['meta']
        self.COOKIES = {}
        self._request = InnerOBJ(kwargs['method'])

    @staticmethod
    def get_content(content):
        if not content:
            req_data = {}
        else:
            req_data = content if isinstance(content, dict) else json.loads(content)
        return req_data

    def __str__(self):
        return '<Request> %s' % self.path

    @cached_property
    def GET(self):
        return self.query_params

    def get_full_path(self):
        return self.full_path


class InnerOBJ(object):
    def __init__(self, method):
        self.method = method.upper()


if __name__ == '__main__':
    main()
相關文章
相關標籤/搜索