Gunicorn 源碼閱讀

gunicorn 「Green Unicorn」,脫胎於ruby社區的Unicorn,是一個 WSGI HTTP Server。學習gunicorn後,咱們能夠把以前的 Bottle 程序正式部署起來。老規矩,本文分下面幾個部分:html

  • gunicorn 項目結構簡介
  • gunicorn 使用
  • gunicorn-application 實現
  • arbiter實現
  • sync-worker實現
  • 小結
  • 小技巧

gunicorn 項目結構簡介

gunicorn 源碼選擇的版本是 20.0.0,主要的文件及包以下:python

文件 描述
app包 guincorn 的 Application (不是wsgi定義的applicaton)
http包 gunicorn 對 http協議的一些處理
workers包 gunicorn 的工做類實現 ,包括同步sync實現,線程池版本實現gthread,以及異步版本實現 geventlet,gevent等
arbiter.py guicorn 的master實現

根據gunicorn的設計特色:linux

Gunicorn is based on the pre-fork worker model. This means that there is a central master process that manages a set of worker processes. The master never knows anything about individual clients. All requests and responses are handled completely by worker processes.nginx

gunicorn使用pre-fork 工做模型,也就是master提早fork出預約數量的work,管理worker集合。全部的request和response都由worker進程處理。web

咱們重點放在:gunicorn的服務實現,master-worker如何實現和協做上。django

gunicorn 使用

編寫測試app,能夠看到這是一個符合wsgi規範的application:flask

# myapp.py

def app(environ, start_response):  # env 和 http 狀態及頭設定回調
    data = b"Hello, World!\n"
    start_response("200 OK", [
            ("Content-Type", "text/plain"),
            ("Content-Length", str(len(data)))
    ])
    return iter([data])  # 返回數據
複製代碼

使用4個work節點,日誌級別debug的方式啓動服務,加載 myapp:appruby

# gunicorn -w 4 --log-level debug  myapp:app
[2021-02-23 17:58:57 +0800] [50258] [DEBUG] Current configuration:  # 準備配置
...
[2021-02-23 18:01:12 +0800] [50462] [INFO] Starting gunicorn 20.0.0  # 啓動gunicorn
[2021-02-23 18:01:12 +0800] [50462] [DEBUG] Arbiter booted  # 啓動master
[2021-02-23 18:01:12 +0800] [50462] [INFO] Listening at: http://127.0.0.1:8000 (50462)  # 監聽端口
[2021-02-23 18:01:12 +0800] [50462] [INFO] Using worker: sync
[2021-02-23 18:01:12 +0800] [50464] [INFO] Booting worker with pid: 50464 # 啓動worker
[2021-02-23 18:01:12 +0800] [50465] [INFO] Booting worker with pid: 50465
[2021-02-23 18:01:12 +0800] [50466] [INFO] Booting worker with pid: 50466
[2021-02-23 18:01:12 +0800] [50467] [INFO] Booting worker with pid: 50467
[2021-02-23 18:01:12 +0800] [50462] [DEBUG] 4 workers
複製代碼

使用 curl 測試服務markdown

# curl http://127.0.0.1:8000
Hello, World!
複製代碼

同時gunicorn中能夠看到 worker=50465 處理了這個http請求session

[2021-02-24 16:09:39 +0800] [50465] [DEBUG] GET /
複製代碼

運行時候,還能夠經過發送信號,手動擴充work節點數

# kill -TTIN 50462
複製代碼

觀察服務日誌,會發現 master=50462 進程處理了 ttin 信號,而且擴展worker節點數到5

...
[2021-02-24 18:02:56 +0800] [50462] [INFO] Handling signal: ttin
[2021-02-24 18:02:56 +0800] [75918] [INFO] Booting worker with pid: 75918
[2021-02-24 18:02:56 +0800] [50462] [DEBUG] 5 workers
複製代碼

使用 Ctrl+C 關閉服務,能夠看到也是 master=50462 進程處理了 int 信號,而且在關閉worker節點後關閉本身

^C[2021-02-25 15:06:54 +0800] [50462] [INFO] Handling signal: int
[2021-02-25 15:06:54 +0800] [50464] [INFO] Worker exiting (pid: 50464)
[2021-02-25 15:06:54 +0800] [50465] [INFO] Worker exiting (pid: 50465)
[2021-02-25 15:06:54 +0800] [50466] [INFO] Worker exiting (pid: 50466)
[2021-02-25 15:06:54 +0800] [50467] [INFO] Worker exiting (pid: 50467)
[2021-02-25 15:06:54 +0800] [75918] [INFO] Worker exiting (pid: 75918)
[2021-02-25 15:06:54 +0800] [50462] [INFO] Shutting down: Master
複製代碼

若是對gunicon的參數不瞭解,可使用下面命令查看幫助

# gunicorn -h
usage: gunicorn [OPTIONS] [APP_MODULE]

optional arguments:
  -h, --help            show this help message and exit
  ...
  -w INT, --workers INT
                        The number of worker processes for handling requests. [1]
複製代碼

幫助使用咱們熟悉的 argparse 實現。

class Setting(object):
			
	def add_option(self, parser):
        args = tuple(self.cli)

        help_txt = "%s [%s]" % (self.short, self.default)
        help_txt = help_txt.replace("%", "%%")

        kwargs = {
            "dest": self.name,
            "action": self.action or "store",
            "type": self.type or str,
            "default": None,
            "help": help_txt
        }
        ...
        parser.add_argument(*args, **kwargs)  # 添加選項

class Workers(Setting):  # --workers 的選項類
    name = "workers"
    section = "Worker Processes"
    cli = ["-w", "--workers"]
    meta = "INT"
    validator = validate_pos_int
    type = int
    default = int(os.environ.get("WEB_CONCURRENCY", 1))
    desc = """\
        The number of worker processes for handling requests.

        A positive integer generally in the ``2-4 x $(NUM_CORES)`` range.
        You'll want to vary this a bit to find the best for your particular
        application's work load.

        By default, the value of the ``WEB_CONCURRENCY`` environment variable.
        If it is not defined, the default is ``1``.
        """

def parser(self):
    kwargs = {
        "usage": self.usage,
        "prog": self.prog
    }
    parser = argparse.ArgumentParser(**kwargs)
    parser.add_argument("-v", "--version",
            action="version", default=argparse.SUPPRESS,
            version="%(prog)s (version " + __version__ + ")\n",
            help="show program's version number and exit")
    parser.add_argument("args", nargs="*", help=argparse.SUPPRESS)

    keys = sorted(self.settings, key=self.settings.__getitem__)  # 動態添加參數選項
    for k in keys:
        self.settings[k].add_option(parser)

    return parser
複製代碼

gunicorn-application 實現

gunicorn的application主要是下面三個類實現。須要注意的是這裏的application能夠理解爲web-server的application;bottle/flask/django等實現的是web-framework的applicaiton。前者動態加載後者,前者處理http服務,後者處理單次的http請求。

  • BaseApplication
    • Application
      • WSGIApplication

3個Application梳理後,大概的代碼模版以下:

class WSGIApplication(Application)
	
	def __init__(self, usage=None, prog=None):
		self.do_load_config()  # 加載配置
			
	def do_load_config():
		...
		cfg = self.init(parser, args, args.args)  # 初始化配置
		...
	
	def init(...):
	    ...
	    self.app_uri = args[0]  # 獲取wsgi-application參數
	
  def load(...):
  		util.import_app(self.app_uri)  # 動態加載wsgi-application
  		...
	
	def run(...):
		self.load()
		Arbiter(self).run()  # 啓動master,也就是Arbiter

def run():  # 運行服務
    """\
    The ``gunicorn`` command line runner for launching Gunicorn with
    generic WSGI applications.
    """
    from gunicorn.app.wsgiapp import WSGIApplication
    WSGIApplication("%(prog)s [OPTIONS] [APP_MODULE]").run()

if __name__ == '__main__':
    run()
複製代碼

application部分的實現,相對比較簡單,就再也不贅述。

arbiter實現

Arbiter 仲裁者,事實上的master進程核心,整理後代碼模版以下:

class Arbiter(object):
	def __init__(self, app):
		self.worker_class = self.cfg.worker_class  # worker類
		self.num_workers = self.cfg.worker  # worker數量
        ...
    
    def start():
		self.init_signals()  # 初始化信號監聽
		...
		sock.create_socket(...) # 建立socket服務
    
    def run(self):  
		self.start()
		try:
            self.manage_workers() # 啓動節點

            while True: #  無限循環
                ...
                sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None
                if sig is None:
                    self.sleep()  # 持續休眠
                    self.murder_workers()
                    self.manage_workers()
                    continue

                if sig not in self.SIG_NAMES:
                    self.log.info("Ignoring unknown signal: %s", sig)
                    continue
    							# 處理信號
                signame = self.SIG_NAMES.get(sig)
                handler = getattr(self, "handle_%s" % signame, None)
                ...
                handler()
                self.wakeup()  # 喚醒
        except (StopIteration, KeyboardInterrupt):
           ...
    		
複製代碼

在瞭解Arbiter工做前先了解一下信號, linux 系統可使用下面命令查看信號清單

# kill -l
 1) SIGHUP	 2) SIGINT	 3) SIGQUIT	 4) SIGILL	 5) SIGTRAP
 6) SIGABRT	 7) SIGBUS	 8) SIGFPE	 9) SIGKILL	10) SIGUSR1
 11) SIGSEGV	12) SIGUSR2	13) SIGPIPE	14) SIGALRM	15) SIGTERM
...
複製代碼
  • 1 (SIGHUP): terminate a connection, or reload the configuration for daemons
  • 2 (SIGINT): interrupt the session from the dialogue station
  • 3 (SIGQUIT): terminate the session from the dialogue station
  • 4 (SIGILL): illegal instruction was executed
  • 5 (SIGTRAP): do a single instruction (trap)
  • 6 (SIGABRT): abnormal termination
  • 7 (SIGBUS): error on the system bus
  • 8 (SIGFPE): floating point error
  • 9 (SIGKILL): immmediately terminate the process
  • 10 (SIGUSR1): user-defined signal
  • 11 (SIGSEGV): segmentation fault due to illegal access of a memory segment
  • 12 (SIGUSR2): user-defined signal
  • 13 (SIGPIPE): writing into a pipe, and nobody is reading from it
  • 14 (SIGALRM): the timer terminated (alarm)
  • 15 (SIGTERM): terminate the process in a soft way

信號是操做系統提供的事件,能夠用來進行跨進程的通訊。Arbiter.init_signals 作的工做以下:

SIGNALS = [getattr(signal, "SIG%s" % x)
               for x in "HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()]
               
def init_signals(self):
    ...

    # initialize all signals
    for s in self.SIGNALS:
        signal.signal(s, self.signal)
    signal.signal(signal.SIGCHLD, self.handle_chld)  # 添加信號監聽器

def signal(self, sig, frame):
	if len(self.SIG_QUEUE) < 5:
			self.SIG_QUEUE.append(sig)
			self.wakeup()
複製代碼

以前演示的擴容信號 TTIN 是這樣處理的 :

def handle_ttin(self):
    """\
    SIGTTIN handling.
    Increases the number of workers by one.
    """
    self.num_workers += 1  # 擴容 
    self.manage_workers()  # 管理worker 
複製代碼

Arbiter的sleep和warkeup是這樣實現的:

self.PIPE = pair = os.pipe()  # 建立管道
	
def sleep(self):
	"""\
    Sleep until PIPE is readable or we timeout.
    A readable PIPE means a signal occurred.
    """
    try:
        ready = select.select([self.PIPE[0]], [], [], 1.0)  # 使用select監聽管道的數據變化
        if not ready[0]:
            return
        while os.read(self.PIPE[0], 1):  # 讀取管道數據
            pass
    except (select.error, OSError) as e:
        ...
        
def wakeup(self):
    """\
    Wake up the arbiter by writing to the PIPE
    """
    try:
        os.write(self.PIPE[1], b'.')  # 管道寫入
    except IOError as e:
        ...
    
複製代碼

須要說明的是Arbiter經過 sock.create_sockets 建立了socket,並綁定端口和監聽,而後在fork-worker的時候,將socket傳遞給了子進程。

worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
                                   self.app, self.timeout / 2.0,
                                   self.cfg, self.log)
self.cfg.pre_fork(self, worker)
pid = os.fork()
if pid != 0:
		worker.pid = pid  # 記錄worker的pid
    self.WORKERS[pid] = worker # 添加到worker集合
    return pid
複製代碼

銷燬worker是使用信號:

def kill_workers(self, sig):
    """\
    Kill all workers with the signal `sig`
    :attr sig: `signal.SIG*` value
    """
    worker_pids = list(self.WORKERS.keys())
    for pid in worker_pids:
        os.kill(pid, sig)
複製代碼

sync-worker實現

接下來,咱們看看worker,主要是sync-worker的實現。worker的關係主要以下:

  • Worker 處理信號
    • SyncWorker 同步處理http請求
    • ThreadWorker 使用線程處理http請求

接以前Arbiter中fork-worker的代碼,建立完成的work進入 init_process

# Process Child
worker.pid = os.getpid()
try:
    util._setproctitle("worker [%s]" % self.proc_name)
    self.log.info("Booting worker with pid: %s", worker.pid)
    self.cfg.post_fork(self, worker)
    worker.init_process()
    sys.exit(0)
複製代碼

work的init_process模版以下:

def init_process(self):
    """\
    If you override this method in a subclass, the last statement
    in the function should be to call this method with
    super().init_process() so that the ``run()`` loop is initiated.
    """
    # For waking ourselves up
    self.PIPE = os.pipe()  # 建立管道
    ...
    self.wait_fds = self.sockets + [self.PIPE[0]]  # 監聽管道和socket
			...
    self.init_signals()  # 初始化信號監聽
			...
    self.load_wsgi()  # 加載wsgi的應用
    ...
    # Enter main run loop
    self.booted = True
    self.run()  # 工做循環
複製代碼

work同樣的進行信號監聽:

SIGNALS = [getattr(signal, "SIG%s" % x)
            for x in "ABRT HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()]
def init_signals(self):
    # reset signaling
    for s in self.SIGNALS:
    		signal.signal(s, signal.SIG_DFL)
    # init new signaling
    signal.signal(signal.SIGQUIT, self.handle_quit)
    signal.signal(signal.SIGTERM, self.handle_exit)
    signal.signal(signal.SIGINT, self.handle_quit)
    ...

    if hasattr(signal, 'set_wakeup_fd'):
    		signal.set_wakeup_fd(self.PIPE[1])  # 等待select喚醒
複製代碼

work最重要的run循環:

def run(self, timeout):
    listener = self.sockets[0]
    while self.alive:
					...
        # Accept a connection. If we get an error telling us
        # that no connection is waiting we fall down to the
        # select which is where we'll wait for a bit for new
        # workers to come give us some love.
        try:
            self.accept(listener)  # 接受客戶端連接
            # Keep processing clients until no one is waiting. This
            # prevents the need to select() for every client that we
            # process.
            continue

        except EnvironmentError as e:
              ...

        try:
            self.wait(timeout)  # 休眠等待 
        except StopWaiting:
            return
複製代碼

處理客戶端鏈接,這一部分和以前介紹http比較相似,也再也不贅述。

def accept(self, listener):
    client, addr = listener.accept()
    client.setblocking(1)
    util.close_on_exec(client)
    self.handle(listener, client, addr)
複製代碼

work處理完成請求後進入等待

def wait(self, timeout):
    try:
        ret = select.select(self.wait_fds, [], [], timeout)
        if ret[0]:
            if self.PIPE[0] in ret[0]:
                os.read(self.PIPE[0], 1)
            return ret[0]

    except select.error as e:
        if e.args[0] == errno.EINTR:
            return self.sockets
        if e.args[0] == errno.EBADF:
            if self.nr < 0:
                return self.sockets
            else:
                raise StopWaiting
        raise
複製代碼

小結

能夠用下面一張圖展現gunicorn的工做流程,做爲咱們的小結論

Request flow of Django with Gunicorn and Nginx as a reverse proxy.

小技巧

可使用thread,實現一個定時器

# reloader.py

class Reloader(threading.Thread):
    def __init__(self, extra_files=None, interval=1, callback=None):
        super().__init__()
        self.setDaemon(True)
        self._interval = interval
        self._callback = callback

    def run(self):
        mtimes = {}
        while True:
            for filename in self.get_files():
                try:
                    mtime = os.stat(filename).st_mtime
                except OSError:
                    continue
                old_time = mtimes.get(filename)
                if old_time is None:
                    mtimes[filename] = mtime
                    continue
                elif mtime > old_time:
                    if self._callback:
                        self._callback(filename)
            time.sleep(self._interval)
複製代碼

在使用 gunicorn myapp:app 命令的時候, myapp:app 沒有靜態的 import ,而是這樣動態加載的:

# util.py

klass = components.pop(-1)

mod = importlib.import_module('.'.join(components))

return getattr(mod, klass)
複製代碼

參考連接

相關文章
相關標籤/搜索