gunicorn 「Green Unicorn」,脫胎於ruby社區的Unicorn,是一個 WSGI HTTP Server。學習gunicorn後,咱們能夠把以前的 Bottle 程序正式部署起來。老規矩,本文分下面幾個部分:html
gunicorn 源碼選擇的版本是 20.0.0
,主要的文件及包以下:python
文件 | 描述 |
---|---|
app包 | guincorn 的 Application (不是wsgi定義的applicaton) |
http包 | gunicorn 對 http協議的一些處理 |
workers包 | gunicorn 的工做類實現 ,包括同步sync實現,線程池版本實現gthread,以及異步版本實現 geventlet,gevent等 |
arbiter.py | guicorn 的master實現 |
根據gunicorn的設計特色:linux
Gunicorn is based on the pre-fork worker model. This means that there is a central master process that manages a set of worker processes. The master never knows anything about individual clients. All requests and responses are handled completely by worker processes.nginx
gunicorn使用pre-fork 工做模型,也就是master提早fork出預約數量的work,管理worker集合。全部的request和response都由worker進程處理。web
咱們重點放在:gunicorn的服務實現,master-worker如何實現和協做上。django
編寫測試app,能夠看到這是一個符合wsgi規範的application:flask
# myapp.py
def app(environ, start_response): # env 和 http 狀態及頭設定回調
data = b"Hello, World!\n"
start_response("200 OK", [
("Content-Type", "text/plain"),
("Content-Length", str(len(data)))
])
return iter([data]) # 返回數據
複製代碼
使用4個work節點,日誌級別debug的方式啓動服務,加載 myapp:appruby
# gunicorn -w 4 --log-level debug myapp:app
[2021-02-23 17:58:57 +0800] [50258] [DEBUG] Current configuration: # 準備配置
...
[2021-02-23 18:01:12 +0800] [50462] [INFO] Starting gunicorn 20.0.0 # 啓動gunicorn
[2021-02-23 18:01:12 +0800] [50462] [DEBUG] Arbiter booted # 啓動master
[2021-02-23 18:01:12 +0800] [50462] [INFO] Listening at: http://127.0.0.1:8000 (50462) # 監聽端口
[2021-02-23 18:01:12 +0800] [50462] [INFO] Using worker: sync
[2021-02-23 18:01:12 +0800] [50464] [INFO] Booting worker with pid: 50464 # 啓動worker
[2021-02-23 18:01:12 +0800] [50465] [INFO] Booting worker with pid: 50465
[2021-02-23 18:01:12 +0800] [50466] [INFO] Booting worker with pid: 50466
[2021-02-23 18:01:12 +0800] [50467] [INFO] Booting worker with pid: 50467
[2021-02-23 18:01:12 +0800] [50462] [DEBUG] 4 workers
複製代碼
使用 curl
測試服務markdown
# curl http://127.0.0.1:8000
Hello, World!
複製代碼
同時gunicorn中能夠看到 worker=50465 處理了這個http請求session
[2021-02-24 16:09:39 +0800] [50465] [DEBUG] GET /
複製代碼
運行時候,還能夠經過發送信號,手動擴充work節點數
# kill -TTIN 50462
複製代碼
觀察服務日誌,會發現 master=50462 進程處理了 ttin
信號,而且擴展worker節點數到5
...
[2021-02-24 18:02:56 +0800] [50462] [INFO] Handling signal: ttin
[2021-02-24 18:02:56 +0800] [75918] [INFO] Booting worker with pid: 75918
[2021-02-24 18:02:56 +0800] [50462] [DEBUG] 5 workers
複製代碼
使用 Ctrl+C
關閉服務,能夠看到也是 master=50462 進程處理了 int
信號,而且在關閉worker節點後關閉本身
^C[2021-02-25 15:06:54 +0800] [50462] [INFO] Handling signal: int
[2021-02-25 15:06:54 +0800] [50464] [INFO] Worker exiting (pid: 50464)
[2021-02-25 15:06:54 +0800] [50465] [INFO] Worker exiting (pid: 50465)
[2021-02-25 15:06:54 +0800] [50466] [INFO] Worker exiting (pid: 50466)
[2021-02-25 15:06:54 +0800] [50467] [INFO] Worker exiting (pid: 50467)
[2021-02-25 15:06:54 +0800] [75918] [INFO] Worker exiting (pid: 75918)
[2021-02-25 15:06:54 +0800] [50462] [INFO] Shutting down: Master
複製代碼
若是對gunicon的參數不瞭解,可使用下面命令查看幫助
# gunicorn -h
usage: gunicorn [OPTIONS] [APP_MODULE]
optional arguments:
-h, --help show this help message and exit
...
-w INT, --workers INT
The number of worker processes for handling requests. [1]
複製代碼
幫助使用咱們熟悉的 argparse 實現。
class Setting(object):
def add_option(self, parser):
args = tuple(self.cli)
help_txt = "%s [%s]" % (self.short, self.default)
help_txt = help_txt.replace("%", "%%")
kwargs = {
"dest": self.name,
"action": self.action or "store",
"type": self.type or str,
"default": None,
"help": help_txt
}
...
parser.add_argument(*args, **kwargs) # 添加選項
class Workers(Setting): # --workers 的選項類
name = "workers"
section = "Worker Processes"
cli = ["-w", "--workers"]
meta = "INT"
validator = validate_pos_int
type = int
default = int(os.environ.get("WEB_CONCURRENCY", 1))
desc = """\
The number of worker processes for handling requests.
A positive integer generally in the ``2-4 x $(NUM_CORES)`` range.
You'll want to vary this a bit to find the best for your particular
application's work load.
By default, the value of the ``WEB_CONCURRENCY`` environment variable.
If it is not defined, the default is ``1``.
"""
def parser(self):
kwargs = {
"usage": self.usage,
"prog": self.prog
}
parser = argparse.ArgumentParser(**kwargs)
parser.add_argument("-v", "--version",
action="version", default=argparse.SUPPRESS,
version="%(prog)s (version " + __version__ + ")\n",
help="show program's version number and exit")
parser.add_argument("args", nargs="*", help=argparse.SUPPRESS)
keys = sorted(self.settings, key=self.settings.__getitem__) # 動態添加參數選項
for k in keys:
self.settings[k].add_option(parser)
return parser
複製代碼
gunicorn的application主要是下面三個類實現。須要注意的是這裏的application能夠理解爲web-server的application;bottle/flask/django等實現的是web-framework的applicaiton。前者動態加載後者,前者處理http服務,後者處理單次的http請求。
3個Application梳理後,大概的代碼模版以下:
class WSGIApplication(Application)
def __init__(self, usage=None, prog=None):
self.do_load_config() # 加載配置
def do_load_config():
...
cfg = self.init(parser, args, args.args) # 初始化配置
...
def init(...):
...
self.app_uri = args[0] # 獲取wsgi-application參數
def load(...):
util.import_app(self.app_uri) # 動態加載wsgi-application
...
def run(...):
self.load()
Arbiter(self).run() # 啓動master,也就是Arbiter
def run(): # 運行服務
"""\
The ``gunicorn`` command line runner for launching Gunicorn with
generic WSGI applications.
"""
from gunicorn.app.wsgiapp import WSGIApplication
WSGIApplication("%(prog)s [OPTIONS] [APP_MODULE]").run()
if __name__ == '__main__':
run()
複製代碼
application部分的實現,相對比較簡單,就再也不贅述。
Arbiter 仲裁者,事實上的master進程核心,整理後代碼模版以下:
class Arbiter(object):
def __init__(self, app):
self.worker_class = self.cfg.worker_class # worker類
self.num_workers = self.cfg.worker # worker數量
...
def start():
self.init_signals() # 初始化信號監聽
...
sock.create_socket(...) # 建立socket服務
def run(self):
self.start()
try:
self.manage_workers() # 啓動節點
while True: # 無限循環
...
sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None
if sig is None:
self.sleep() # 持續休眠
self.murder_workers()
self.manage_workers()
continue
if sig not in self.SIG_NAMES:
self.log.info("Ignoring unknown signal: %s", sig)
continue
# 處理信號
signame = self.SIG_NAMES.get(sig)
handler = getattr(self, "handle_%s" % signame, None)
...
handler()
self.wakeup() # 喚醒
except (StopIteration, KeyboardInterrupt):
...
複製代碼
在瞭解Arbiter工做前先了解一下信號, linux 系統可使用下面命令查看信號清單
# kill -l
1) SIGHUP 2) SIGINT 3) SIGQUIT 4) SIGILL 5) SIGTRAP
6) SIGABRT 7) SIGBUS 8) SIGFPE 9) SIGKILL 10) SIGUSR1
11) SIGSEGV 12) SIGUSR2 13) SIGPIPE 14) SIGALRM 15) SIGTERM
...
複製代碼
信號是操做系統提供的事件,能夠用來進行跨進程的通訊。Arbiter.init_signals 作的工做以下:
SIGNALS = [getattr(signal, "SIG%s" % x)
for x in "HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()]
def init_signals(self):
...
# initialize all signals
for s in self.SIGNALS:
signal.signal(s, self.signal)
signal.signal(signal.SIGCHLD, self.handle_chld) # 添加信號監聽器
def signal(self, sig, frame):
if len(self.SIG_QUEUE) < 5:
self.SIG_QUEUE.append(sig)
self.wakeup()
複製代碼
以前演示的擴容信號 TTIN
是這樣處理的 :
def handle_ttin(self):
"""\
SIGTTIN handling.
Increases the number of workers by one.
"""
self.num_workers += 1 # 擴容
self.manage_workers() # 管理worker
複製代碼
Arbiter的sleep和warkeup是這樣實現的:
self.PIPE = pair = os.pipe() # 建立管道
def sleep(self):
"""\
Sleep until PIPE is readable or we timeout.
A readable PIPE means a signal occurred.
"""
try:
ready = select.select([self.PIPE[0]], [], [], 1.0) # 使用select監聽管道的數據變化
if not ready[0]:
return
while os.read(self.PIPE[0], 1): # 讀取管道數據
pass
except (select.error, OSError) as e:
...
def wakeup(self):
"""\
Wake up the arbiter by writing to the PIPE
"""
try:
os.write(self.PIPE[1], b'.') # 管道寫入
except IOError as e:
...
複製代碼
須要說明的是Arbiter經過 sock.create_sockets
建立了socket,並綁定端口和監聽,而後在fork-worker的時候,將socket傳遞給了子進程。
worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
self.app, self.timeout / 2.0,
self.cfg, self.log)
self.cfg.pre_fork(self, worker)
pid = os.fork()
if pid != 0:
worker.pid = pid # 記錄worker的pid
self.WORKERS[pid] = worker # 添加到worker集合
return pid
複製代碼
銷燬worker是使用信號:
def kill_workers(self, sig):
"""\
Kill all workers with the signal `sig`
:attr sig: `signal.SIG*` value
"""
worker_pids = list(self.WORKERS.keys())
for pid in worker_pids:
os.kill(pid, sig)
複製代碼
接下來,咱們看看worker,主要是sync-worker的實現。worker的關係主要以下:
接以前Arbiter中fork-worker的代碼,建立完成的work進入 init_process
# Process Child
worker.pid = os.getpid()
try:
util._setproctitle("worker [%s]" % self.proc_name)
self.log.info("Booting worker with pid: %s", worker.pid)
self.cfg.post_fork(self, worker)
worker.init_process()
sys.exit(0)
複製代碼
work的init_process模版以下:
def init_process(self):
"""\
If you override this method in a subclass, the last statement
in the function should be to call this method with
super().init_process() so that the ``run()`` loop is initiated.
"""
# For waking ourselves up
self.PIPE = os.pipe() # 建立管道
...
self.wait_fds = self.sockets + [self.PIPE[0]] # 監聽管道和socket
...
self.init_signals() # 初始化信號監聽
...
self.load_wsgi() # 加載wsgi的應用
...
# Enter main run loop
self.booted = True
self.run() # 工做循環
複製代碼
work同樣的進行信號監聽:
SIGNALS = [getattr(signal, "SIG%s" % x)
for x in "ABRT HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()]
def init_signals(self):
# reset signaling
for s in self.SIGNALS:
signal.signal(s, signal.SIG_DFL)
# init new signaling
signal.signal(signal.SIGQUIT, self.handle_quit)
signal.signal(signal.SIGTERM, self.handle_exit)
signal.signal(signal.SIGINT, self.handle_quit)
...
if hasattr(signal, 'set_wakeup_fd'):
signal.set_wakeup_fd(self.PIPE[1]) # 等待select喚醒
複製代碼
work最重要的run循環:
def run(self, timeout):
listener = self.sockets[0]
while self.alive:
...
# Accept a connection. If we get an error telling us
# that no connection is waiting we fall down to the
# select which is where we'll wait for a bit for new
# workers to come give us some love.
try:
self.accept(listener) # 接受客戶端連接
# Keep processing clients until no one is waiting. This
# prevents the need to select() for every client that we
# process.
continue
except EnvironmentError as e:
...
try:
self.wait(timeout) # 休眠等待
except StopWaiting:
return
複製代碼
處理客戶端鏈接,這一部分和以前介紹http比較相似,也再也不贅述。
def accept(self, listener):
client, addr = listener.accept()
client.setblocking(1)
util.close_on_exec(client)
self.handle(listener, client, addr)
複製代碼
work處理完成請求後進入等待
def wait(self, timeout):
try:
ret = select.select(self.wait_fds, [], [], timeout)
if ret[0]:
if self.PIPE[0] in ret[0]:
os.read(self.PIPE[0], 1)
return ret[0]
except select.error as e:
if e.args[0] == errno.EINTR:
return self.sockets
if e.args[0] == errno.EBADF:
if self.nr < 0:
return self.sockets
else:
raise StopWaiting
raise
複製代碼
能夠用下面一張圖展現gunicorn的工做流程,做爲咱們的小結論
可使用thread,實現一個定時器
# reloader.py
class Reloader(threading.Thread):
def __init__(self, extra_files=None, interval=1, callback=None):
super().__init__()
self.setDaemon(True)
self._interval = interval
self._callback = callback
def run(self):
mtimes = {}
while True:
for filename in self.get_files():
try:
mtime = os.stat(filename).st_mtime
except OSError:
continue
old_time = mtimes.get(filename)
if old_time is None:
mtimes[filename] = mtime
continue
elif mtime > old_time:
if self._callback:
self._callback(filename)
time.sleep(self._interval)
複製代碼
在使用 gunicorn myapp:app
命令的時候, myapp:app 沒有靜態的 import ,而是這樣動態加載的:
# util.py
klass = components.pop(-1)
mod = importlib.import_module('.'.join(components))
return getattr(mod, klass)
複製代碼