今天老師要給你們介紹一個比較特別的 RPC 服務器模型,這個模型不一樣於 Nginx、不一樣於 Redis、不一樣於 Apache、不一樣於 Tornado、不一樣於 Netty,它的原型是 Node Cluster 的多進程併發模型。這種併發模型 Java 同窗看完後是很憂傷的,由於他們永遠享受不了。python
咱們知道 Nginx 的併發模型是一個多進程併發模型,它的 Master 進程在綁定監聽地址端口後 fork 出了多個 Slave 進程共同競爭處理這個服務端套接字接收到的不少客戶端鏈接。json
這多個 Slave 進程會共享同一個處於操做系統內核態的套接字隊列,操做系統的網絡模塊在處理完三次握手後就會將套接字塞進這個隊列。這是一個生產者消費者模型,生產者是操做系統的網絡模塊,消費者是多個 Slave 進程,隊列中的對象是客戶端套接字。bash
這種模型在負載均衡上有一個缺點,那就是套接字分配不均勻,造成了相似於貧富分化的局面,也就是「閒者愈閒,忙者愈忙」的狀態。這是由於當多個進程競爭同一個套接字隊列時,操做系統採用了 LIFO 的策略,最後一個來 accept 的進程最優先拿到 套接字。越是繁忙的進程越是有更多的機會調用 accept,它能拿到的套接字也就越多。服務器
Node Cluster 爲了解決負載均衡問題,它採用了不一樣的策略。它也是多進程併發模型,Master 進程會 fork 出多個子進程來處理客戶端套接字。可是不存在競爭問題,由於負責 accept 套接字的只能是 Master 進程,Slave 進程只負責處理客戶端套接字請求。那就存在一個問題,Master 進程拿到的客戶端套接字如何傳遞給 Slave 進程。微信
這時,神奇的 sendmsg 登場了。它是操做系統提供的系統調用,能夠在不一樣的進程之間傳遞文件描述符。sendmsg 會搭乘一個特殊的「管道」將 Master 進程的套接字描述符傳遞到 Slave 進程,Slave 進程經過 recvmsg 系統調用從這個「管道」中將描述符取出來。這個「管道」比較特殊,它是 Unix 域套接字。普通的套接字能夠跨機器傳輸消息,Unix 域套接字只能在同一個機器的不一樣進程之間傳遞消息。同管道同樣,Unix 域套接字也分爲有名套接字和無名套接字,有名套接字會在文件系統指定一個路徑名,無關進程之間均可以經過這個路徑來訪問 Unix 域套接字。而無名套接字通常用於父子進程之間,父進程會經過 socketpair 調用來建立套接字,而後 fork 出來子進程,這樣子進程也會同時持有這個套接字的引用。後續父子進程就能夠經過這個套接字互相通訊。網絡
注意這裏的傳遞描述符,本質上不是傳遞,而是複製。父進程的描述符並不會在 sendmsg 自動關閉自動消失,子進程收到的描述符和父進程的描述符也不是同一個整數值。可是父子進程的描述符都會指向同一個內核套接字對象。併發
有了描述符的傳遞能力,父進程就能夠將 accept 到的客戶端套接字輪流傳遞給多個 Slave 進程,負載均衡的目標就能夠順利實現了。app
接下來咱們就是用 Python 代碼來擼一遍 Node Cluster 的併發模型。由於 sendmsg 和 recvmsg 方法到了 Python3.5 才內置進來,因此下面的代碼須要使用 Python3.5+才能夠運行。負載均衡
咱們看 sendmsg 方法的定義socket
socket.sendmsg(buffers[, ancdata[, flags[, address]]])
複製代碼
咱們只須要關心第二個參數 ancdata,描述符是經過ancdata 參數傳遞的,它的意思是 「輔助數據」,而 buffers 表示須要傳遞的消息內容,由於消息內容這裏沒有意義,因此這個字段能夠任意填寫,可是必需要有內容,若是沒有內容,sendmsg 方法就是一個空調用。
import socket, struct
def send_fds(sock, fd):
return sock.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack("i", fd))])
# ancdata 參數是一個三元組的列表,三元組的第一個參數表示網絡協議棧級別 level,第二個參數表示輔助數據的類型 type,第三個參數纔是攜帶的數據,level=SOL_SOCKET 表示傳遞的數據處於 TCP 協議層級,type=SCM_RIGHTS 就表示攜帶的數據是文件描述符。咱們傳遞的描述符 fd 是一個整數,須要使用 struct 包將它序列化成二進制。
複製代碼
再看 recvmsg 方法的定義
msg, ancdata, flags, addr = socket.recvmsg(bufsize[, ancbufsize[, flags]])
複製代碼
一樣,咱們只須要關心返回的 ancdata 數據,它裏面包含了咱們須要的文件描述符。可是須要提供消息體的長度和輔助數據的長度參數。輔助數據的長度比較特殊,須要使用 CMSG_LEN 方法來計算,由於輔助數據裏面還有咱們看不到的額外的頭部信息。
bufsize = 1 # 消息內容的長度
ancbufsize = socket.CMSG_LEN(struct.calcsize('i')) # 輔助數據的長度
msg, ancdata, flags, addr = socket.recvmsg(bufsize, ancbufsize) # 收取消息
level, type, fd_bytes = ancdata[0] # 取第一個元祖,注意發送消息時咱們傳遞的是一個三元組的列表
fd = struct.unpack('i', fd_bytes) # 反序列化
複製代碼
下面我來獻上完整的服務器代碼,爲了簡單起見,咱們在 Slave 進程中處理 RPC 請求使用同步模型。
# coding: utf
# sendmsg recvmsg python3.5+才能夠支持
import os
import json
import struct
import socket
def handle_conn(conn, addr, handlers):
print(addr, "comes")
while True:
# 簡單起見,這裏就沒有使用循環讀取了
length_prefix = conn.recv(4)
if not length_prefix:
print(addr, "bye")
conn.close()
break # 關閉鏈接,繼續處理下一個鏈接
length, = struct.unpack("I", length_prefix)
body = conn.recv(length)
request = json.loads(body)
in_ = request['in']
params = request['params']
print(in_, params)
handler = handlers[in_]
handler(conn, params)
def loop_slave(pr, handlers):
while True:
bufsize = 1
ancsize = socket.CMSG_LEN(struct.calcsize('i'))
msg, ancdata, flags, addr = pr.recvmsg(bufsize, ancsize)
cmsg_level, cmsg_type, cmsg_data = ancdata[0]
fd = struct.unpack('i', cmsg_data)[0]
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd)
handle_conn(sock, sock.getpeername(), handlers)
def ping(conn, params):
send_result(conn, "pong", params)
def send_result(conn, out, result):
response = json.dumps({"out": out, "result": result}).encode('utf-8')
length_prefix = struct.pack("I", len(response))
conn.sendall(length_prefix)
conn.sendall(response)
def loop_master(serv_sock, pws):
idx = 0
while True:
sock, addr = serv_sock.accept()
pw = pws[idx % len(pws)]
# 消息數據,whatever
msg = [b'x']
# 輔助數據,攜帶描述符
ancdata = [(
socket.SOL_SOCKET,
socket.SCM_RIGHTS,
struct.pack('i', sock.fileno()))]
pw.sendmsg(msg, ancdata)
sock.close() # 關閉引用
idx += 1
def prefork(serv_sock, n):
pws = []
for i in range(n):
# 開闢父子進程通訊「管道」
pr, pw = socket.socketpair()
pid = os.fork()
if pid < 0: # fork error
return pws
if pid > 0:
# 父進程
pr.close() # 父進程不用讀
pws.append(pw)
continue
if pid == 0:
# 子進程
serv_sock.close() # 關閉引用
pw.close() # 子進程不用寫
return pr
return pws
if __name__ == '__main__':
serv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serv_sock.bind(("localhost", 8080))
serv_sock.listen(1)
pws_or_pr = prefork(serv_sock, 10)
if hasattr(pws_or_pr, '__len__'):
if pws_or_pr:
loop_master(serv_sock, pws_or_pr)
else:
# fork 所有失敗,沒有子進程,Game Over
serv_sock.close()
else:
handlers = {
"ping": ping
}
loop_slave(pws_or_pr, handlers)
複製代碼
父進程使用 fork 調用建立了多個子進程,而後又使用 socketpair 調用爲每個子進程都建立一個無名套接字用來傳遞描述符。父進程使用 roundrobin 策略平均分配接收到的客戶端套接字。子進程接收到的是一個描述符整數,須要將描述符包裝成套接字對象後方可讀寫。打印對比發送和接收到的描述符,你會發現它們倆的值並不相同,這是由於 sendmsg 將描述符發送到內核後,內核給描述符指向的內核套接字又從新分配了一個新的描述符對象。
本文節選之在線技術小冊《深刻理解 RPC》,感興趣的讀者請繼續閱讀《深刻理解 RPC》全書內容。
閱讀更多深度技術文章,微信掃一掃上面的二維碼或者搜索關注公衆號「碼洞」或 「codehole」