在Django中使用zerorpc

在Django中使用zerorpc

前言

隨着系統架構從集中式單點服務器到分佈式微服務方向的遷移,RPC是一個不可迴避的話題.如何在系統中引入對開發者友好,性能可靠的RPC服務是一個值得深思的問題.python

在調研了Thrift,gRPC,zerorpc等方案後,基於如下2點最後選擇了zerorpc:git

  • Thrift,gRPC學習成本高,開發者須要從新定義返回結構增長了工做量
  • zerorpc完美契合Python,能快速開發,而且支持Node.js,適用於當前技術棧

問題

雖然zerorpc能夠直接嵌入當前系統框架中,可是仍是有一些問題須要去考慮解決github

  • rpc 接口如何定義redis

  • rpc 服務如何啓動shell

  • 高併發狀況下客戶端的可靠性django

 

服務端

在當前的系統中大量使用Celery,djang-celery定義Task的方式是在每一個install app中定義tasks.py文件,而後經過@task裝飾器來生成Task.因此這裏爲了方便定義rpc interface設計一套相似於Celery的規範.須要輸出rpc interface的app下面建立rpcs.py文件服務器

# rpcs.py
# coding: utf-8

from eebo.core.utils.zrpc import rpc
from .models import Ticket
from .serializers import TicketSerializer


@rpc.register()
def get_ticket():
    t = Ticket.objects.first()
    s = TicketSerializer(t)
    return s.data


@rpc.register(name='ticket_list', stream=True)
def get_tickets(n):
    qs = Ticket.objects.all()[:n]
    s  = TicketSerializer(qs, many=True)
    return iter(s.data)

  

rpc.register裝飾器用來註冊函數到rpc服務上,可選參數:架構

  • name: 客戶調用方法名稱, 沒有寫的狀況下就是func name如get_ticket
  • stream: 默認False, 若是爲True, 則使用zerorpc的流式響應傳輸, 數據量比較大的狀況時使用, 返回可迭代對象

咱們來看看eebo.core.utils.zrpc如何來實現這個註冊過程:併發

# coding: utf-8

import zerorpc


class RPC(object):
    @classmethod
    def register(cls, name=None, stream=False):
        def _wrapper(func):
            setattr(cls, name or func.__name__, zerorpc.stream(
                lambda self, *args, **kwargs: func(*args, **kwargs)) if stream
                    else staticmethod(func))
            return func

        return _wrapper


rpc = RPC()

  

經過一個類方法來往類上面綁定方法,須要注意的是name的定義必須是全局惟一的.app

如今咱們有了定義rpc interface的方法,下面來看看如何啓動rpc server.

# runrpc.py
# coding: utf-8

import re
import sys
import imp as _imp
import importlib
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError

from eebo.core.utils.zrpc import rpc, ServerExecMiddleware

naiveip_re = re.compile(r"""^(?:
(?P<addr>
    (?P<ipv4>\d{1,3}(?:\.\d{1,3}){3}) |         # IPv4 address
    (?P<ipv6>\[[a-fA-F0-9:]+\]) |               # IPv6 address
    (?P<fqdn>[a-zA-Z0-9-]+(?:\.[a-zA-Z0-9-]+)*) # FQDN
):)?(?P<port>\d+)$""", re.X)


class Command(BaseCommand):
    help = "Starts a lightweight RPC server for development."

    default_addr = '127.0.0.1'
    default_port = '4242'

    def add_arguments(self, parser):
        parser.add_argument('addrport',
                            nargs='?',
                            help='Optional port number, or ipaddr:port')

    def handle(self, *args, **options):

        self.use_ipv6 = False
        if not options['addrport']:
            self.addr = ''
            self.port = self.default_port
        else:
            m = re.match(naiveip_re, options['addrport'])
            if m is None:
                raise CommandError('"%s" is not a valid port number '
                                   'or address:port pair.' %
                                   options['addrport'])
            self.addr, _ipv4, _ipv6, _fqdn, self.port = m.groups()
            if not self.port.isdigit():
                raise CommandError("%r is not a valid port number." %
                                   self.port)
            if self.addr:
                if _ipv6:
                    self.addr = self.addr[1:-1]
                    self.use_ipv6 = True
                    self._raw_ipv6 = True
                elif self.use_ipv6 and not _fqdn:
                    raise CommandError('"%s" is not a valid IPv6 address.' %
                                       self.addr)
        if not self.addr:
            self.addr = self.default_addr_ipv6 if self.use_ipv6 else self.default_addr
            self._raw_ipv6 = self.use_ipv6
        self.run(**options)

    def run(self, **options):
        """Run the server, using the autoreloader if needed."""
        self.autodiscover_rpc()

        server = self.get_server()

        try:
            server.run()
        except KeyboardInterrupt:
            server.close()
            sys.exit(0)

    def autodiscover_rpc(self, related_name='rpcs'):
        for pkg in settings.INSTALLED_APPS:
            try:
                pkg_path = importlib.import_module(pkg).__path__
            except AttributeError:
                continue

            try:
                _imp.find_module(related_name, pkg_path)
            except ImportError:
                continue

            try:
                importlib.import_module('{0}.{1}'.format(pkg, related_name))
            except ImportError:
                pass

    def get_server(self, *args, **options):
        """Return the default zerorpc server for the runner."""
        import zerorpc
        server = zerorpc.Server(rpc, heartbeat=30)
        server.bind("tcp://{0}:{1}".format(self.addr, self.port))
        # close django old connections
        zerorpc.Context.get_instance().register_middleware(ServerExecMiddleware())

        # for sentry
        try:
            from raven.contrib.zerorpc import SentryMiddleware
            if hasattr(settings, 'RAVEN_CONFIG'):
                sentry = SentryMiddleware(hide_zerorpc_frames=False,
                                          dsn=settings.RAVEN_CONFIG['dsn'])
                zerorpc.Context.get_instance().register_middleware(sentry)
        except ImportError:
            pass

        return server

  

runrpc.py是一個Django management commands 文件須要放到某個install app目錄的management/commands下面,啓動服務器:

python manage.py runrpc 0.0.0.0:4242
  • autodiscover_rpc 自動發現rpc interface註冊函數
  • get_server 生成zerorpc server對象

get_server中對zerorpc註冊了2箇中間件,SentryMiddleware用於捕獲rpc interface拋出的異常發送到sentry,ServerExecMiddleware用於處理Django db connection,看看代碼:

# zrpc.py
# coding: utf-8

from django.db import close_old_connections

class ServerExecMiddleware(object):

    def server_before_exec(self, request_event):
        close_old_connections()

    def server_after_exec(self, request_event, reply_event):
        close_old_connections()

  

在每一個rpc interface被調用前與調用後都調用close_old_connections關閉db connection,這裏是爲了實現django.db中對請求處理前與處理後註冊信號:

django.db.__init__.py

signals.request_started.connect(close_old_connections)
signals.request_finished.connect(close_old_connections)

  

目的是保證在rpc interface中使用ORM時,connection沒有超時斷開.

客戶端

因爲rpc的調用是阻塞的,不能全局只建立一個client.可是也不能每一個請求都建立client,因此這裏參考redis-py的client實現,定義一個支持鏈接池的zerorpc client.

# zrpc.py
# coding: utf-8

import os
import zerorpc

from redis.connection import BlockingConnectionPool
from gevent.queue import LifoQueue

class Connection(object):
    def __init__(self, connect_to, heartbeat=30):
        self.client = zerorpc.Client(heartbeat=heartbeat)
        self.client.connect(connect_to)
        self.pid = os.getpid()

    def disconnect(self):
        self.client.close()


class RPCClient(object):
    def __init__(self, connect_to, heartbeat=30):
        self.connection_pool = BlockingConnectionPool(connection_class=Connection,
            queue_class=LifoQueue, timeout=heartbeat, connect_to=connect_to, heartbeat=heartbeat)

    def close(self):
        self.connection_pool.disconnect()

    def __getattr__(self, name):
        return lambda *args, **kwargs: self(name, *args, **kwargs)

    def __call__(self, name, *args, **kwargs):
        connection = self.connection_pool.get_connection('')
        try:
            return getattr(connection.client, name)(*args, **kwargs)
        finally:
            self.connection_pool.release(connection)

  

這裏直接複用了redis-py定義的鏈接池,當前系統使用gunicorn + gevent的方式啓動Django服務,因此queue_class使用了gevent的LifoQueue.

在使用過程當中還發現了這個問題:

https://github.com/0rpc/zerorpc-python/issues/123

須要打個補丁解決:

import zmq.green as zmq

# patch zmq garbage-collection Thread to use green Context:
from zmq.utils.garbage import gc
gc.context = zmq.Context()

  

總結

技術的選型須要契合項目實際狀況,不要盲目上新技術引入沒必要要的成本.爲了推廣方案,必須全局的考慮方案是否易使用,是否易部署.

完整代碼:

https://gist.github.com/zhu327/5b6c06eccc5758d4e642ee899a518687

相關文章
相關標籤/搜索