如
前文所述,Arbiter是gunicorn master進程的核心。Arbiter主要負責管理worker進程,包括啓動、監控、殺掉Worker進程;同時,Arbiter在某些信號發生的時候還能夠熱更新(reload)App應用,或者在線升級gunicorn。Arbiter的核心代碼在一個文件裏面,代碼量也不大,源碼在此:
https://github.com/benoitc/gunicorn。
Arbiter主要有如下方法:
setup:
處理配置項,最重要的是worker數量和worker工做模型
init_signal:
註冊信號處理函數
handle_xxx:
各個信號具體的處理函數
kill_worker,kill_workers:
向worker進程發信號
spawn_worker, spawn_workers:
fork出新的worker進程
murder_workers:
殺掉一段時間內未響應的worker進程
manage_workers:
根據配置文件的worker數量,以及當前active的worker數量,決定是要fork仍是kill worker進程
reexec:
接收到信號SIGUSR2調用,在線升級gunicorn
reload:
接收到信號SIGHUP調用,會根據新的配置新啓動worker進程,並殺掉以前的worker進程
sleep:
在沒有信號處理的時候,利用select的timeout進行sleep,可被喚醒
wakeup:
經過向管道寫消息,喚醒進程
run:
主循環
Arbiter真正被其餘代碼(Application)調用的函數只有__init__和run方法,在一句代碼裏:
Arbiter(self).run()
上面代碼中的self即爲Application實例,其中__init__調用setup進行配置項設置。下面是run方法僞代碼
def run()
self.init_signal()
self.LISTENERS = create_sockets(self.cfg, self.log)
self.manage_workers()
while True:
if no signal in SIG_QUEUE
self.sleep()
else:
handle_signal()
關於fork子進程
fork子進程的代碼在 spawn_worker, 源碼以下:
Arbiter.spawn_worker
主要流程:
(1)加載worker_class並實例化(默認爲同步模型 SyncWorker)
(2)父進程(master進程)fork以後return,以後的邏輯都在子進程中運行
(3)調用worker.init_process 進入循環,worker的全部工做都在這個循環中
(4)循環結束以後,調用sys.exit(0)
(5)最後,在finally中,記錄worker進程的退出
下面是我本身寫的一點代碼,把主要的fork流程簡化了一下
1 # prefork.py
2 import sys
3 import socket
4 import select
5 import os
6 import time
7
8 def do_sub_process():
9 pid = os.fork()
10 if pid < 0:
11 print 'fork error'
12 sys.exit(-1)
13 elif pid > 0:
14 print 'fork sub process %d' % pid
15 return
16
17 # must be child process
18 time.sleep(1)
19 print 'sub process will exit', os.getpid(), os.getppid()
20 sys.exit(0)
21
22 def main():
23 sub_num = 2
24 for i in range(sub_num):
25 do_sub_process()
26 time.sleep(10)
27 print 'main process will exit', os.getpid()
28
29 if __name__ == '__main__':
30 main()
在測試環境下輸出:
fork sub process 9601
fork sub process 9602
sub process will exit 9601 9600
sub process will exit 9602 9600
main process will exit 9600
須要注意的是第20行調用了
sys.exit, 保證子進程的結束,不然會繼續main函數中for循環,以及以後的邏輯。註釋掉第19行從新運行,看輸出就明白了。
關於kill子進程
master進程要kill worker進程就很簡單了,直接發信號,源碼以下:
1 def kill_worker(self, pid, sig):
2 """\
3 Kill a worker
4
5 :attr pid: int, worker pid
6 :attr sig: `signal.SIG*` value
7 """
8 try:
9 os.kill(pid, sig)
10 except OSError as e:
11 if e.errno == errno.ESRCH:
12 try:
13 worker = self.WORKERS.pop(pid)
14 worker.tmp.close()
15 self.cfg.worker_exit(self, worker)
16 return
17 except (KeyError, OSError):
18 return
19 raise
關於sleep與wakeup
咱們再來看看Arbiter的sleep和wakeup。Arbiter在沒有信號須要處理的時候會"sleep",固然,不是真正調用time.sleep,不然信號來了也不能第一時間處理。這裏得實現比較巧妙,利用了管道和select的timeout。看代碼就知道了
def sleep(self):
"""\
Sleep until PIPE is readable or we timeout.
A readable PIPE means a signal occurred.
"""
ready = select.select([self.PIPE[0]], [], [], 1.0) # self.PIPE = os.pipe()
if not ready[0]:
return
while os.read(self.PIPE[0], 1):
pass
代碼裏面的註釋寫得很是清楚,要麼PIPE可讀當即返回,要麼等待超時。管道可讀是由於有信號發生。這裏看看pipe函數html
-
os.
pipe
()
-
Create a pipe. Return a pair of file descriptors (r,w)
usable for reading and writing, respectively.git
那咱們看一下何時管道可讀:確定是往管道寫入的東西,這就是wakeup函數的功能
def wakeup(self):
"""
Wake up the arbiter by writing to the PIPE
"""
os.write(self.PIPE[1], b'.')
最後附上Arbiter的信號處理:github
- QUIT, INT: Quick shutdown
- TERM: Graceful shutdown. Waits for workers to finish their current requests up to the graceful timeout.
- HUP: Reload the configuration, start the new worker processes with a new configuration and gracefully shutdown older workers. If the application is not preloaded (using the
--preload
option), Gunicorn will also load the new version.
- TTIN: Increment the number of processes by one
- TTOU: Decrement the number of processes by one
- USR1: Reopen the log files
- USR2: Upgrade the Gunicorn on the fly. A separate TERM signal should be used to kill the old process. This signal can also be used to use the new versions of pre-loaded applications.
- WINCH: Gracefully shutdown the worker processes when Gunicorn is daemonized.
reference: