在Django中使用zerorpc
前言
隨着系統架構從集中式單點服務器到分佈式微服務方向的遷移,RPC是一個不可迴避的話題.如何在系統中引入對開發者友好,性能可靠的RPC服務是一個值得深思的問題.python
在調研了Thrift,gRPC,zerorpc等方案後,基於如下2點最後選擇了zerorpc:git
- Thrift,gRPC學習成本高,開發者須要從新定義返回結構增長了工做量
- zerorpc完美契合Python,能快速開發,而且支持Node.js,適用於當前技術棧
問題
雖然zerorpc能夠直接嵌入當前系統框架中,可是仍是有一些問題須要去考慮解決github
-
rpc 接口如何定義redis
-
rpc 服務如何啓動shell
-
高併發狀況下客戶端的可靠性django
服務端
在當前的系統中大量使用Celery,djang-celery定義Task的方式是在每一個install app中定義tasks.py
文件,而後經過@task
裝飾器來生成Task.因此這裏爲了方便定義rpc interface設計一套相似於Celery的規範.須要輸出rpc interface的app下面建立rpcs.py
文件服務器
# rpcs.py # coding: utf-8 from eebo.core.utils.zrpc import rpc from .models import Ticket from .serializers import TicketSerializer @rpc.register() def get_ticket(): t = Ticket.objects.first() s = TicketSerializer(t) return s.data @rpc.register(name='ticket_list', stream=True) def get_tickets(n): qs = Ticket.objects.all()[:n] s = TicketSerializer(qs, many=True) return iter(s.data)
rpc.register
裝飾器用來註冊函數到rpc服務上,可選參數:架構
- name: 客戶調用方法名稱, 沒有寫的狀況下就是func name如get_ticket
- stream: 默認False, 若是爲True, 則使用zerorpc的流式響應傳輸, 數據量比較大的狀況時使用, 返回可迭代對象
咱們來看看eebo.core.utils.zrpc
如何來實現這個註冊過程:併發
# coding: utf-8 import zerorpc class RPC(object): @classmethod def register(cls, name=None, stream=False): def _wrapper(func): setattr(cls, name or func.__name__, zerorpc.stream( lambda self, *args, **kwargs: func(*args, **kwargs)) if stream else staticmethod(func)) return func return _wrapper rpc = RPC()
經過一個類方法來往類上面綁定方法,須要注意的是name
的定義必須是全局惟一的.app
如今咱們有了定義rpc interface的方法,下面來看看如何啓動rpc server.
# runrpc.py # coding: utf-8 import re import sys import imp as _imp import importlib from django.conf import settings from django.core.management.base import BaseCommand, CommandError from eebo.core.utils.zrpc import rpc, ServerExecMiddleware naiveip_re = re.compile(r"""^(?: (?P<addr> (?P<ipv4>\d{1,3}(?:\.\d{1,3}){3}) | # IPv4 address (?P<ipv6>\[[a-fA-F0-9:]+\]) | # IPv6 address (?P<fqdn>[a-zA-Z0-9-]+(?:\.[a-zA-Z0-9-]+)*) # FQDN ):)?(?P<port>\d+)$""", re.X) class Command(BaseCommand): help = "Starts a lightweight RPC server for development." default_addr = '127.0.0.1' default_port = '4242' def add_arguments(self, parser): parser.add_argument('addrport', nargs='?', help='Optional port number, or ipaddr:port') def handle(self, *args, **options): self.use_ipv6 = False if not options['addrport']: self.addr = '' self.port = self.default_port else: m = re.match(naiveip_re, options['addrport']) if m is None: raise CommandError('"%s" is not a valid port number ' 'or address:port pair.' % options['addrport']) self.addr, _ipv4, _ipv6, _fqdn, self.port = m.groups() if not self.port.isdigit(): raise CommandError("%r is not a valid port number." % self.port) if self.addr: if _ipv6: self.addr = self.addr[1:-1] self.use_ipv6 = True self._raw_ipv6 = True elif self.use_ipv6 and not _fqdn: raise CommandError('"%s" is not a valid IPv6 address.' % self.addr) if not self.addr: self.addr = self.default_addr_ipv6 if self.use_ipv6 else self.default_addr self._raw_ipv6 = self.use_ipv6 self.run(**options) def run(self, **options): """Run the server, using the autoreloader if needed.""" self.autodiscover_rpc() server = self.get_server() try: server.run() except KeyboardInterrupt: server.close() sys.exit(0) def autodiscover_rpc(self, related_name='rpcs'): for pkg in settings.INSTALLED_APPS: try: pkg_path = importlib.import_module(pkg).__path__ except AttributeError: continue try: _imp.find_module(related_name, pkg_path) except ImportError: continue try: importlib.import_module('{0}.{1}'.format(pkg, related_name)) except ImportError: pass def get_server(self, *args, **options): """Return the default zerorpc server for the runner.""" import zerorpc server = zerorpc.Server(rpc, heartbeat=30) server.bind("tcp://{0}:{1}".format(self.addr, self.port)) # close django old connections zerorpc.Context.get_instance().register_middleware(ServerExecMiddleware()) # for sentry try: from raven.contrib.zerorpc import SentryMiddleware if hasattr(settings, 'RAVEN_CONFIG'): sentry = SentryMiddleware(hide_zerorpc_frames=False, dsn=settings.RAVEN_CONFIG['dsn']) zerorpc.Context.get_instance().register_middleware(sentry) except ImportError: pass return server
runrpc.py
是一個Django management commands 文件須要放到某個install app目錄的management/commands
下面,啓動服務器:
python manage.py runrpc 0.0.0.0:4242
autodiscover_rpc
自動發現rpc interface註冊函數get_server
生成zerorpc server對象
在get_server
中對zerorpc註冊了2箇中間件,SentryMiddleware
用於捕獲rpc interface拋出的異常發送到sentry,ServerExecMiddleware
用於處理Django db connection,看看代碼:
# zrpc.py # coding: utf-8 from django.db import close_old_connections class ServerExecMiddleware(object): def server_before_exec(self, request_event): close_old_connections() def server_after_exec(self, request_event, reply_event): close_old_connections()
在每一個rpc interface被調用前與調用後都調用close_old_connections
關閉db connection,這裏是爲了實現django.db
中對請求處理前與處理後註冊信號:
django.db.__init__.py signals.request_started.connect(close_old_connections) signals.request_finished.connect(close_old_connections)
目的是保證在rpc interface中使用ORM時,connection沒有超時斷開.
客戶端
因爲rpc的調用是阻塞的,不能全局只建立一個client.可是也不能每一個請求都建立client,因此這裏參考redis-py
的client實現,定義一個支持鏈接池的zerorpc client.
# zrpc.py # coding: utf-8 import os import zerorpc from redis.connection import BlockingConnectionPool from gevent.queue import LifoQueue class Connection(object): def __init__(self, connect_to, heartbeat=30): self.client = zerorpc.Client(heartbeat=heartbeat) self.client.connect(connect_to) self.pid = os.getpid() def disconnect(self): self.client.close() class RPCClient(object): def __init__(self, connect_to, heartbeat=30): self.connection_pool = BlockingConnectionPool(connection_class=Connection, queue_class=LifoQueue, timeout=heartbeat, connect_to=connect_to, heartbeat=heartbeat) def close(self): self.connection_pool.disconnect() def __getattr__(self, name): return lambda *args, **kwargs: self(name, *args, **kwargs) def __call__(self, name, *args, **kwargs): connection = self.connection_pool.get_connection('') try: return getattr(connection.client, name)(*args, **kwargs) finally: self.connection_pool.release(connection)
這裏直接複用了redis-py
定義的鏈接池,當前系統使用gunicorn + gevent的方式啓動Django服務,因此queue_class
使用了gevent的LifoQueue
.
在使用過程當中還發現了這個問題:
須要打個補丁解決:
import zmq.green as zmq # patch zmq garbage-collection Thread to use green Context: from zmq.utils.garbage import gc gc.context = zmq.Context()
總結
技術的選型須要契合項目實際狀況,不要盲目上新技術引入沒必要要的成本.爲了推廣方案,必須全局的考慮方案是否易使用,是否易部署.
完整代碼:
https://gist.github.com/zhu327/5b6c06eccc5758d4e642ee899a518687