本文主要譯自 zguide - chapter two. 但並非照本翻譯.linux
上一章咱們簡單的介紹了一個ZMQ, 並給出了三個套路的例子: 請求-迴應, 訂閱-發佈, 流水線(分治). 這一章, 咱們將深刻的探索一下ZMQ中的socket, 以及"套路"程序員
若是熟悉linux socket編程的同窗閱讀完了第一章, 必定有一種說不上來的彆扭感受.由於一般狀況下, 當咱們討論socket的時候, 咱們通常指的是操做系統提供的網絡編程接口裏的那個socket概念. 而在ZMQ中, 只是借用了這個概念的名字, 在ZMQ中, 咱們討論到socket的時候, 通常指代的是調用zmq_socket()
接口返回的那個socket, 具體一點: zmq socket.編程
zmq socket比起linux socket來講, 邏輯理解起來比較相似, 雖然二者內部徹底就不是同一種東西.windows
zmq_socket()
, zmq_close()
zmq_setsockopt()
, zmq_getsockopt()
zmq_bind()
, zmq_connect()
zmq_msg_send()
, zmq_msg_recv()
, zmq_send()
, zmq_recv()
但與linux socket不一樣的是, zmq socket沒有listen這個邏輯概念.後端
須要注意的是, zmq socket是void指針, 而消息則是結構實例. 這就意味着, 在C語言的API中, 須要zmq socket的地方, 傳遞的必定是值, 而須要傳遞消息的時候, 好比使用zmq_msg_send()
和zmq_msg_recv()
這樣的接口, 消息參數則傳遞其地址. 其設計哲學是: 在zmq中, socket不歸程序員掌控, 因此你可能拿到一個句柄(地址), 但不能看到它長什麼樣(不能看到socket實例), 但消息是程序員建立的, 是受程序員掌控的.api
在兩個結點上用ZMQ實現通信, 你須要分別爲兩個結點建立socket, 並在其中一個結點上調用zmq_bind()
, 在另外一個結點上建立對應的zmq_connect()
. 在ZMQ中, 請不要再以死板的"客戶端", "服務端"來區分網絡結點. 而要這樣理解: zmq_bind()
調用應該發生在網絡拓撲中那些不易變的結點上, 而zmq_connect()
應該發生在網絡拓撲中那些易變的結點上.緩存
ZMQ創建起的數據鏈接和常見的TCP鏈接有一些不一樣, 但也有一些共通之處, 以下:安全
在請求-迴應套路中, 咱們把比較不易變的邏輯結點稱爲服務端, 把易變, 也就是會常常性的退出, 或從新加入網絡拓撲的結點稱爲客戶端. 服務端向外提供服務, 必須提供一個"地址"供客戶端去上門, 換句話說, 在這個套路拓撲中, 那些常常來來去去的客戶端應該知道去哪找服務端. 但反過來, 服務端徹底不關心去哪找客戶端, 你愛來不來, 不來就滾, 不要打擾我飛昇. 對於不易變的結點, 應該使用zmq_bind()
函數, 對於易變的結點, 應該採用zmq_connect
bash
在傳統的linux socket編程中, 若是服務端尚未上線工做, 這個時候去啓動客戶端程序, 客戶端程序的connect()
調用會返回錯誤. 但在ZMQ中, 它妥善處理了這種狀況. 客戶端調用zmq_connect()
, 不會報錯, 僅會致使消息被阻塞而發不出去.服務器
不要小看這一點設計, 它反映出ZMQ的設計思想: 在請求-應答套路中, 它不光容許客戶端能夠隨時退出, 再回來. 甚至容許服務端去上個廁所.
另外, 一個服務端能夠屢次調用zmq_bind()
以將本身關聯到多個endpoint上.(所謂的endpoint, 就是通信協議+通信地址的組合, 它通常狀況下指代了在這種通信協議中的一個網絡結點, 但這個結點能夠是邏輯性的, 不必定只是一臺機器).這就意味着, zmq socket能夠同時接受來自多個不一樣通信協議的多簇請求消息.
zmq_bind(socket, "tcp://*:5555"); zmq_bind(socket, "tcp://*:999"); zmq_bind(socket, "inproc://suprise_motherfucker");
可是, 對於同一種通信協議裏的同一個endpoint, 你只能對其執行一次zmq_bind()
操做. 這裏有個例外, 就是ipc進程間通訊. 邏輯上容許另一個進程去使用以前一個進程已經使用過的ipc endpoint, 但不要濫用這特性: 這只是ZMQ提供給程序崩潰後恢復現場的一種手段, 在正常的代碼邏輯中, 不要作這樣的事情.
因此看到這裏你大概能理解zmq對bind和connect這兩個概念的態度: ZMQ努力的將這兩個概念之間的差別抹平, 但很遺憾, zmq並無將這兩個操做抽象成一個相似於touch的操做. 但仍是請謹記, 在你的網絡拓撲中, 讓不易變結點去使用zmq_bind()
, 讓易變結點去使用zmq_connect
zmq socket是分類型的, 不一樣類型的socket提供了差別化的服務, socket的類型與結點在拓撲中的角色有關, 也影響着消息的出入, 以及緩存策略. 不一樣類型的socket之間, 有些能夠互相鏈接, 但有些並不能, 這些規則, 以及如何在套路中爲各個結點安排合適類型的socket, 都是後續咱們將要講到的內容.
若是從網絡通信的角度來說, zmq是一個將傳統傳輸層封裝起來的網絡庫. 但從數據傳輸, 消息傳輸, 以及消息緩存這個角度來說, zmq彷佛又稱得上是一個消息隊列庫. 總之, zmq是一個優秀的庫, 優秀不是指它的實現, 它的性能, 而是它能解決的問題, 它的設計思路.
在第一章裏, 咱們接觸到了兩個有關消息收發的函數, zmq_send()
和zmq_recv()
, 如今, 咱們須要把術語規範一下.
zmq_send()
與zmq_recv()
是用來傳輸"數據"的接口. 而"消息"這個術語, 在zmq中有指定含義, 傳遞消息的接口是zmq_msg_send()
與zmq_msg_recv()
當咱們提及"數據"的時候, 咱們指的是二進制串. 當咱們說"消息"的時候, 指提是zmq中的一種特定結構體.
須要額外注意的是, 不管是調用zmq_send()
仍是zmq_msg_send()
, 當調用返回時, 消息並無真正被髮送出去, 更沒有被對方收到. 調用返回只表明zmq將你要發送的"消息"或"數據"放進了一個叫"發送緩衝區"的地方. 這是zmq實現收發異步且帶緩衝隊列的一個設計.
ZMQ底層封裝了三種單播通信協議, 分別是: 共享內存實現的線程間通信(inproc), 進程間通訊(ipc), 以及TCP/IP協議棧裏的TCP協議(tcp). 另外ZMQ底層還封裝了兩種廣播協議: PGM, EPGM. 多播咱們在很是後面的章節纔會介紹到, 在你瞭解它以前, 請不要使用多播協議, 即使你是在作一些相似於發佈-訂閱套路的東西.
對於多數場景來講, 底層協議選用tcp都是沒什麼問題的. 須要注意的是, zmq中的tcp, 被稱爲 "無鏈接的tcp協議", 而之因此起這麼一個精神分裂的名字, 是由於zmq容許在對端不存在的狀況下, 結點去zmq_connect()
. 你大體能夠想象zmq作了多少額外工做, 但這些對於你來講, 對於上層應用程序來講, 是透明瞭, 你沒必要去關心具體實現.
IPC通信相似於tcp, 也是"無鏈接"的, 目前, 這種方式不能在windows上使用, 很遺憾. 而且, 按照慣例, 在使用ipc做爲通信方式時, 咱們通常給endpoint加上一個.ipc
的後綴. 另外, 在Unix操做系統上, 使用ipc鏈接還請格外的注意不一樣進程的權限問題, 特別是從屬於兩個不一樣用戶的進程.
最後來講一下inproc, 也就是線程間通訊, 它只能用於同一進程內的不一樣線程通信. 比起tcp和ipc, 這種通信方式快的飛起. 它與tcp和ipc最大的區別是: 在有客戶端調用connect以前, 必須確保已經有一個服務端在對應的endpoint上調用了bind, 這個缺陷可能會在將來的某個版本被修正, 但就目前來說, 請務必當心注意.
很遺憾的是, ZMQ對於其底層封裝的網絡協議是有侵入性的, 換句話說, 你無法使用ZMQ去實現一個HTTP服務器. HTTP做爲一個五層協議, 使用TCP做爲傳輸層協議, 對TCP裏的報文格式是有規約限制的, 而ZMQ做爲一個封裝了TCP的4.5層協議, 其在數據交互時, 已經侵入了TCP的報文格式. 你沒法讓TCP裏的報文既知足HTTP的格式要求, 還知足ZMQ的格式要求.
關心ZMQ究竟是如何侵入它封裝的通信協議的, 這個在第三章, 當咱們接觸到ZMQ_ROUTER_RAW
這種socket配置項的時候纔會深刻討論, 目前你只須要明白, ZMQ對其底層封裝的通信協議有侵入.
這意味着, 你沒法無損的將ZMQ引入到一些現成的項目中. 這很遺憾.
咱們先前提到過, ZMQ在後臺使用獨立的線程來實現異步I/O處理. 通常狀況下吧, 一個I/O線程就應該足以處理當前進程的全部socket的I/O做業, 可是這個凡事總有個極限狀況, 因此總會存在一些很荀的場景, 你須要多開幾個I/O線程.
當你建立一個context的時候, ZMQ就在背後建立了一個I/O處理線程. 若是這麼一個I/O線程不能知足你的需求, 那麼就須要在建立context的時候加一些料, 讓ZMQ多建立幾個I/O處理線程. 通常有一個簡單估算I/O線程數量的方法: 每秒你的程序有幾個G字節的吞吐量, 你就開幾個I/O線程.
下面是自定義I/O線程數量的方法:
int io_threads = 4; void * context = zmq_ctx_new(); zmq_ctx_set(context, ZMQ_IO_THREADS, io_threads); assert(zmq_ctx_get(context, ZMQ_IO_THREADS) == io_threads);
回想一下你用linux socket + epoll編寫服務端應用程序的套路, 通常都是一個tcp鏈接專門開一個線程. ZMQ不同, ZMQ容許你在一個進程裏持有上千個鏈接(不必定是TCP哦), 但處理這上千個鏈接的I/O做業, 可能只有一個, 或者幾個線程而已, 而且事實也證實這樣作是可行的. 可能你的進程裏只有十幾個線程, 但就是能處理超過上千個鏈接.
當你的程序只使用inproc做爲通信手段的時候, 實際上是不須要線程來處理異步I/O的, 由於inproc是經過共享內存實現通信的. 這個時候你能夠手動設置I/O線程的數量爲0. 這是一個小小的優化手段, 嗯, 對性能的提高基本爲0.
ZMQ的設計是親套路的, ZMQ的核心其實在於路由與緩存, 這也是爲何做爲一個網絡庫, 它更多的被人從消息隊列這個角度瞭解到的緣由. 要用ZMQ實現套路, 關鍵在於使用正確的socket類型, 而後把拓撲中的socket組裝配對起來. 因此, 要懂套路, 就須要懂zmq裏的socket類型.
zmq提供了你構建以下套路的手段:
咱們在第一章中已經大體接觸了套路, 除了一夫一妻沒有接觸到, 這章稍後些部分咱們也將接觸這種套路.要了解具體socket的各個類型都是幹嗎用的, 能夠去閱讀zmq_socket()
的manpage, 我建議你去閱讀, 而且仔細閱讀, 反覆閱讀.下面列出的是能夠互相組合的socket類型. 雙方能夠替換bind
與connect
操做.
後續你還會看到有XPUB與XSUB兩種類型的socket. 就目前來講, 只有上面的socket配對鏈接是有效的, 其它沒列出的組合的行爲是未定義的, 但就目前的版原本說, 錯誤的組合socket類型並不會致使鏈接時出錯, 甚至可能會碰巧按你的預期運行, 但強烈不建議你這個瞎jb搞. 在將來的版本中, 組合非法的socket類型可能會致使API調用出錯.
libzmq有兩套收發消息的API接口, 這個以前咱們已經講過. 而且在第一章裏建議你多使用zmq_send()
與zmq_recv()
, 建議你規避zmq_msg_send()
與zmq_msg_recv()
. 但zmq_recv
有一個缺陷, 就是當你提供給zmq_recv()
接口的接收buffer不夠長時, zmq_recv()
會把數據截斷. 若是你沒法預測你要收到的二進制數據的長度, 那麼你只能使用zmq_msg_xxx()
接口.
從接口名上的msg
三個字母就能看出, 這個系列的接口是操縱結構體, 也就是"消息"(實際上是幀, 後面會講到), 而不是"數據", 而非緩衝區的接口, 實際上它們操縱的是zmq_msg_t
類型的結構. 這個系列的接口功能更爲豐富, 但使用起來也請務必萬分當心.
zmq_msg_init()
, zmq_msg_init_size()
, zmq_msg_init_data()
zmq_msg_send()
, zmq_msg_recv()
zmq_close()
zmq_msg_data()
, zmq_msg_size()
, zmq_msg_more()
zmq_msg_get()
, zmq_msg_set()
zmq_msg_copy()
, zmq_msg_move()
消息結構中封裝的數據是二進制的, 依然由程序員本身解釋. 關於zmq_msg_t
結構類型, 下面是你須要知道的基礎知識:
zmq_msg_t *
. 也就是說這是一個內部實現不對外開放的類型, 建立, 傳遞, 都應當以指針類型進行操做.zmq_msg_init()
建立一個消息對象, 而後將這個消息對象傳遞給zmq_msg_recv()
接口zmq_msg_init_size()
建立一個數據容量指定的消息對象, 而後把你要寫入的二進制數據經過內存拷貝函數, 好比memcpy()
寫入消息中, 最後調用zmq_msg_send()
, 看到這裏你應該明白, zmq_msg_init_size()
接口內部進行了內存分配.zmq_msg_t
實際上是引用計數方式實現的共享對象類型, "釋放"是指當前上下文放棄了對該消息的引用, 內部致使了實例的引用計數-1, 而"銷燬"則是完全把實例自己給free掉了. 當你"釋放"一個消息的時候, 應當調用zmq_msg_close()
接口. 若是消息實例在釋放後引用計數歸0, 那麼這個消息實例會被ZMQ自動銷燬掉.zmq_msg_data()
接口, 要獲取消息中數據的長度, 調用zmq_msg_size()
zmq_msg_move()
, zmq_msg_copy()
, zmq_msg_init_data()
這三個接口zmq_msg_send()
調用將消息發送給socket後, 這個消息內部包裝的數據會被清零, 也就是zmq_msg_size() == 0
, 因此, 你不該該連續兩次使用同一個zmq_msg_t *
值調用zmq_msg_send()
. 但須要注意的是, 這裏的"清零", 並不表明消息被"釋放", 也不表明消息被"銷燬". 消息仍是消息, 只是其中的數據被扔掉了.若是你想把同一段二進制數據發送屢次, 正確的作法是下面這樣:
zmq_msg_init_size()
, 建立第一個消息, 再經過memcpy
或相似函數將二進制數據寫入消息中zmq_msg_init()
建立第二個消息, 再調用zmq_msg_copy()
從第一個消息將數據"複製"過來zmq_msg_send()
發送上面的多個消息ZMQ還支持所謂的"多幀消息", 這種消息容許你把多段二進制數據一次性發送給對端. 這個特性在第三章咱們再講. (P.S.: 這是一個很重要的特性, 路由代理等高級套路就嚴重依賴這種多幀消息.). ZMQ中的消息有三層邏輯概念: 消息, 幀, 二進制數據. 用戶自定義的二進制數據被包裝成幀, 而後一個或多個幀組成一個消息. 消息是ZMQ拓撲網絡中兩個結點收發的單位, 但在ZMQ底層的傳輸協議中, 最小單位是幀.
換一個角度來說, ZMQ使用其底層的傳輸協議, 好比tcp, 好比inproc, 好比ipc來傳輸數據, 當ZMQ調用這些傳輸協議傳遞數據的時候, 最小單元是幀. 幀的完整性由傳輸協議來保證, 便是ZMQ自己不關心這個幀會不會破損, 幀的完整傳輸應當由這些傳輸協議去保證. 而在使用ZMQ構建應用程序的程序員眼中, 最小的傳輸單位是消息, 一個消息裏可能會有多個幀, 程序員不去關心消息從一端到另外一端是否會出現丟幀, 消息的完整性與原子性應當由ZMQ庫去保證.
前面咱們講過, ZMQ對其底層的傳輸協議是有侵入性的. 若是要了解ZMQ究竟是如何在傳輸協議的基礎上規定幀傳輸格式的, 能夠去閱讀這個規範.
在咱們到達第三章以前, 咱們所討論的消息中都僅包含一個幀. 這就是爲何在這一小節的描述中, 咱們幾乎有引導性的讓你以爲, zmq_msg_t
類型, 就是"消息", 其實不是, 其實zmq_msg_t
消息只是"幀".
zmq_msg_t
對象zmq_msg_send()
, zmq_msg_recv()
, 你能夠一幀一幀的發送數據. 能夠用屢次調用這些接口的方式來發送一個完整的消息, 或者接收一個完整的消息: 在發送時傳入ZMQ_SNDMORE
參數, 或在接收時, 經過zmq_getsockopt()
來獲取ZMQ_RCVMORE
選項的值. 更多關於如何使用低級API收發多幀消息的信息, 請參見相關接口的manpage關於消息或幀, 還有下面的一些特性:
zmq_send()
是一致的.zmq_msg_close()
接口來釋放這個zmq_msg_t
對象最後再強調一下, 在你不理解zmq_msg_t
的原理以前, 不要使用zmq_msg_init_data()
接口, 這是一個0拷貝接口, 若是不熟悉zmq_msg_t
結構的原理, 瞎jb用, 是會core dump的
在先前的全部例子程序中, 大多程序裏乾的都是這樣的事情
若是你接觸過linux中的select, pselect, epoll等多路IO複用接口, 你必定會好奇, 在使用zmq的時候, 如何實現相似的效果呢? 畢竟ZMQ不光把linux socket的細節給你封裝了, 連文件描述符都給你屏蔽封裝掉了, 顯然你無法直接調用相似於select, pselect, epoll這種接口了.
答案是, ZMQ本身搞了一個相似的玩意, zmq_poll()
瞭解一下.
咱們先看一下, 若是沒有多路IO接口, 若是咱們要從兩個socket上接收數據, 咱們會怎樣作. 下面是一個沒什麼卵用的示例程序, 它試圖從兩個socket上讀取數據, 使用了異步I/O. (若是你有印象的話, 應該記得對應的兩個endpoint其實是咱們在第一章寫的兩個示例程序的數據生產方: 天氣預報程序與村口的大喇叭)
#include <zmq.h> #include <stdio.h> int main(void) { void * context = zmq_ctx_new(); void * receiver = zmq_socket(context, ZMQ_PULL); zmq_connect(receiver, "tcp://localhost:5557"); void * subscriber = zmq_socket(context, ZMQ_SUB); zmq_connect(subscriber, "tcp://localhost:5556"); zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "10001 ", 6); while(1) { char msg[256]; while(1) { int size = zmq_recv(receiver, msg, 255, ZMQ_DONTWAIT); if(size != -1) { // 接收數據成功 } else { break; } } while(1) { int size = zmq_recv(subscriber, msg, 255, ZMQ_DONTWAIT); if(size == -1) { // 接收數據成功 } else { break; } } sleep(1); // 休息一下, 避免瘋狂循環 } zmq_close(receiver); zmq_close(subscriber); zmq_ctx_destroy(context); return 0; }
在沒有多路IO手段以前, 這基本上就是你能作到的最好情形了. 大循環裏的sleep()
讓人渾身難受. 不加sleep()
吧, 在沒有數據的時候, 這個無限空循環能把一個核心的cpu佔滿. 加上sleep()
吧, 收包又會有最壞狀況下1秒的延時.
但有了zmq_poll()
接口就不同了, 代碼就會變成這樣:
#include <zmq.h> #include <stdio.h> int main(void) { void * context = zmq_ctx_new(); void * receiver = zmq_socket(context, ZMQ_PULL); zmq_connect(receiver, "tcp://localhost:5557"); void * subscriber = zmq_socket(context, ZMQ_SUB); zmq_connect(subscriber, "tcp://localhost:5556"); zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "10001 ", 6); while(1) { char msg[256]; zmq_pollitem_t items[] = { {receiver, 0, ZMQ_POLLIN, 0}, {subscriber,0, ZMQ_POLLIN, 0}, }; zmq_poll(items, 2, -1); if(items[0].revents & ZMQ_POLLIN) { int size = zmq_recv(receiver, msg, 255, 0); if(size != -1) { // 接收消息成功 } } if(items[1].revents & ZMQ_POLLIN) { int size = zmq_recv(subscriber, msg, 255, 0); if(size != -1) { // 接收消息成功 } } } zmq_close(receiver); zmq_close(subscriber); zmq_ctx_destroy(context); return 0; }
zmq_pollitem_t
類型定義以下, 這個定義能夠從zmq_poll()
的manpage裏查到
typedef struct{ void * socket; // ZMQ的socket int fd; // 是的, zmq_poll()還能夠用來讀寫linux file descriptor short events; // 要被監聽的事件, 基礎事件有 ZMQ_POLLIN 和 ZMQ_POLLOUT, 分別是可讀可寫 short revents; // 從zmq_poll()調用返回後, 這裏存儲着觸發返回的事件 } zmq_pollitem_t;
咱們以前提到過, 用戶數據被包裝成zmq_msg_t
對象, 也就是幀, 而在幀上, 還有一個邏輯概念叫"消息". 那麼在具體編碼中, 如何發送多幀消息呢? 而又如何接收多幀消息呢? 簡單的講, 兩點:
zmq_msg_send()
傳入ZMQ_SNDMORE
選項, 告訴發送接口, "我後面還有其它幀"zmq_msg_recv()
接收一個幀, 就調用一次zmq_msg_more()
或者zmq_getsockopt() + ZMQ_RCVMORE
來判斷是否這是消息的最後一個幀發送示例:
zmq_msg_send(&msg, socket, ZMQ_SNDMORE); zmq_msg_send(&msg, socket, ZMQ_SNDMORE); zmq_msg_send(&msg, socket, 0); // 消息的最後一個幀
接收示例:
while(1) { zmq_msg_t msg; zmq_msg_init(&msg); zmq_msg_recv(&msg, socket, 0); // 作處理 zmq_msg_close(&msg); if(!zmq_msg_more(&msg)) // 注意, zmq_msg_more能夠在zmq_msg_close後被安全的調用 { break; } }
這裏有一個須要注意的有趣小細節: 要判斷一個收來的幀是否是消息的最後一個幀, 有兩種途徑, 一種是zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size)
, 另一種是zmq_msg_more(&msg)
. 前一種途徑的入參是socket, 後一種途徑的入參是msg. 這真是很因缺思汀. 目前來講, 兩種方法均可以, 不過我建議你使用zmq_getsockopt()
, 至於緣由嘛, 由於在zmq_msg_recv()
的manpage中, 是這樣建議的.
關於多幀消息, 你須要注意如下幾點:
zmq_poll()
時, 當socket可讀, 而且用zmq_msg_recv()
讀出一個幀時, 表明着不用等待下一次循環, 你直接繼續讀取, 必定能讀取能整個消息中剩餘的其它全部幀zmq_msg_more()
或zmq_getsockopt() + ZMQ_RCVMORE
檢查消息是否接收完整, 你一幀幀的收, 也會把整個消息裏的全部幀收集齊. 因此從這個角度看, zmq_msg_more()
能夠在把全部可讀的幀從socket裏統一接收到手以後, 再慢慢判斷這些幀應該怎麼拼裝. 因此這樣看, 它和zmq_getsockopt()
的功能也不算是徹底重複.ZMQ的目標是創建去中心化的消息通訊網絡拓撲. 但不要誤解"去中心"這三個字, 這並不意味着你的網絡拓撲在中心圈內空無一物. 實際上, 用ZMQ搭建的網絡拓撲中經常充滿了各類非業務處理的網絡結點, 咱們把這些感知消息, 傳遞消息, 分發消息, 但不實際處理消息的結點稱爲"中介", 在ZMQ構建的網絡中, 它們按應用場景有多個細化的名字, 好比"代理", "中繼", "裝置", "掮客"等.
這套邏輯在現實世界裏也很常見, 中間人, 中介公司, 它們不實際生產社會價值, 表面上看它們的存在是在吸兩頭的血, 這些皮條客在社會中的存在乎義在於: 它們減小了溝通的複雜度, 對通訊雙方進行了封裝, 提升了社會運行效率.
當構建一個稍有規模的頒式系統的時候, 一個避不開的問題就是, 網絡中的結點是如何感知其它結點的存在的? 結點會當機, 會擴容, 在這些變化發生的時候, 網絡中的其它正在工做的結點如何感知這些變化, 並保持系統總體正常運行呢? 這就是經典的"動態探索問題".
動態探索問題有一系列很經典的解決方案, 最簡單的解決方案就是把問題自己解決掉: 把網絡拓撲設計死, 代碼都寫死, 別讓它瞎jb來回變, 問題消滅了, done!. 這種解決方案的缺點就是若是網絡拓撲要有變動, 好比業務規模擴展了, 或者有個結點當機了, 網絡配置管理員會罵娘.
拓撲規模小的時候, 消滅問題的思路沒什麼壞處, 但拓撲稍微複雜一點, 顯然這就是一個很好笑的解決方案.好比說, 網絡中有一個發佈者, 有100多個訂閱者, 發佈者bind到endpoint上, 訂閱者connect到endpoint上. 若是代碼是寫死的, 若是發佈者自己出了點什麼問題, 或者發佈者一臺機器搞不住了, 須要橫向擴容, 你就得改代碼, 而後手動部署到100多臺訂閱者上. 這樣的運維成本太大了.
這種場景, 你就須要一個"中介", 對發佈者而言, 它今後無需關心訂閱者是誰, 在哪, 有多少人, 只須要把消息給中介就好了. 對於訂閱者而言, 它今後無需關注發佈者有幾個, 是否使用了多個endpoint, 在哪, 有多少人. 只須要向中介索取消息就好了. 雖然這時發佈者身上的問題轉嫁到的中介身上: 即中介是網絡中最易碎的結點, 若是中介掛了整個拓撲就掛了, 但因爲中介不處理業務邏輯, 只是一個相似於交換機的存在, 因此一樣的機器性能, 中介在單位時間能轉發的消息數量, 比發佈者和訂閱者能處理的消息高一個甚至幾個數量級. 是的, 使用中介引入了新的問題, 但解決了老的問題.
中介並無解決全部問題, 當你引入中介的時候, 中介又變成了網絡中最易碎的點, 因此在實際應用中, 要控制中介的權重, 避免整個網絡拓撲嚴重依賴於一箇中介這種狀況出現: ZMQ提倡去中心化, 不要把中介變成一個壟斷市場的掮客.
對於發佈者而言, 中介就是訂閱者, 而對於訂閱者而言, 中介就是發佈者. 中介使用兩種額外的socket類型: XPUB與XSUB. XSUB與真實的發佈者鏈接, XPUB與真實的訂閱者鏈接.
在咱們以前寫的請求-迴應套路程序中, 咱們有一個客戶端, 一個服務端. 這是一個十分簡化的例子, 實際應用場景中的請求-迴應套路中, 通常會有多個客戶端與多個服務端.
請求-應答模式有一個隱含的條件: 服務端是無狀態的. 不然就不能稱之爲"請求-應答"套路, 而應該稱之爲"嘮嗑套路".
要鏈接多個客戶端與多個服務端, 有兩種思路.
第一種暴力思路就是: 讓N個客戶端與M個服務端創建起N*M的全鏈接. 這確實是一個辦法, 雖然不是很優雅. 在ZMQ中, 實現起來還輕鬆很多: 由於ZMQ的socket能夠向多個endpoint發起鏈接, 這對於客戶端來講, 編碼難度下降了. 客戶端應用程序中能夠建立一個zmq_socket, 而後connect到多個服務端的endpoint上就好了. 這種思路作的話, 客戶端數量擴張很容易, 直接部署就能夠, 代碼不用改. 可是缺陷有兩個:
總的來講, 這是一種很暴力的解決辦法, 不適合用於健壯的生產環境. 可是這確實是一個辦法.
爲了解決上面兩個缺陷, 天然而然的咱們就會想到: 爲何不能把服務端抽象出來呢? 讓一個掮客來作那個惟一的endpoint, 以供全部客戶端connect, 而後掮客在背後再把請求體分發給各個服務端, 服務端作出迴應後掮客再代替服務端把迴應返回給客戶端, 這樣就解決了上面的問題:
而且, 掮客還能夠作到如下
因此, 在請求迴應套路中加入掮客, 是一個很明智的選擇, 這就是第二種思路, 這種思路不是沒有缺陷, 有, 並且很明顯: 掮客是整個系統中最脆弱的部分.
但這個缺陷能夠在必定程度上克服掉:
ZMQ中, 有兩個特殊的socket類型特別適合掮客使用:
關於這兩種特殊的socket的特性, 後續咱們會仔細深刻, 目前來講, 你只須要了解
多說無益, 來看代碼. 下面是在客戶端與服務端中插入掮客的代碼實例:
客戶端
#include <zmq.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket = zmq_socket(context, ZMQ_REQ); zmq_connect(socket, "tcp://localhost:5559"); for(int i = 0; i < 10; ++i) { s_send(socket, "Hello"); char * strRsp = s_recv(socket); printf("Received reply %d [%s]\n", i, strRsp); free(strRsp); } zmq_close(socket); zmq_ctx_destroy(context); return 0; }
服務端
#include <zmq.h> #include <unistd.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket = zmq_socket(context, ZMQ_REP); zmq_connect(socket, "tcp://localhost:5560"); while(1) { char * strReq = s_recv(socket); printf("Received request: [%s]\n", strReq); free(strReq); sleep(1); s_send(socket, "World"); } zmq_close(socket); zmq_ctx_destroy(context); return 0; }
掮客
#include <zmq.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket_for_client = zmq_socket(context, ZMQ_ROUTER); void * socket_for_server = zmq_socket(context, ZMQ_DEALER); zmq_bind(socket_for_client, "tcp://*:5559"); zmq_bind(socket_for_server, "tcp://*:5560"); zmq_pollitem_t items[] = { { socket_for_client, 0, ZMQ_POLLIN, 0 }, { socket_for_server, 0, ZMQ_POLLIN, 0 }, }; while(1) { zmq_msg_t message; zmq_poll(items, 2, -1); if(items[0].revents & ZMQ_POLLIN) { while(1) { zmq_msg_init(&message); zmq_msg_recv(&message, socket_for_client, 0); int more = zmq_msg_more(&message); zmq_msg_send(&message, socket_for_server, more ? ZMQ_SNDMORE : 0); zmq_msg_close(&message); if(!more) { break; } } } if(items[1].revents & ZMQ_POLLIN) { while(1) { zmq_msg_init(&message); zmq_msg_recv(&message, socket_for_server, 0); int more = zmq_msg_more(&message); zmq_msg_send(&message, socket_for_client, more ? ZMQ_SNDMORE : 0); zmq_msg_close(&message); if(!more) { break; } } } } zmq_close(socket_for_client); zmq_close(socket_for_server); zmq_ctx_destroy(context); return 0; }
客戶端和服務端因爲掮客的存在, 代碼都簡單了很多, 對於掮客的代碼, 有如下幾點須要思考:
s_send
與s_recv
互相傳遞字符串, 但在掮客那裏就須要用zmq_msg_t
進行轉發呢?上面三點實際上是同一個問題: 掮客是如何實現帶會話追蹤的轉發消息的?
另外, 若是你先啓動掮客, 再啓動客戶端, 再啓動服務端. 你會看到在服務端正確啓動後, 客戶端顯示它收到了回包.那麼:
這就是有關掮客的第二個問題: 如何配置緩衝區.
本章目前暫時不會對這三個問題作出解答, 你們先思考一下. 咱們將在下一章深刻掮客的細節進行進一步探索.
在上面的掮客代碼示例中, 核心代碼就是zmq_poll
對兩個socket的監聽, 以及while(1)
循環. ZMQ將這兩坨操做統一封裝到了一個函數中, 免得你們每次都要寫boring code.
int zmq_proxy (const void *frontend, const void *backend, const void *capture);
參數frontend
與backend
分別是與客戶端相連的socket
和與服務端相連的socket
. 在使用zmq_proxy
函數以前, 這兩個socket必須被正確配置好, 該調用connect就調用connect, 該調用bind就調用bind. 簡單來說, zmq_proxy
負責把frontend
與backend
之間的數據互相遞送給對方. 而若是僅僅是單純的遞送的話, 第三個參數capture
就應當被置爲NULL
, 而若是還想監聽一下數據, 那麼就再建立一個socket, 並將其值傳遞給capture
, 這樣, frontend
與backend
之間的數據都會有一份拷貝被送到capture
上的socket.
當咱們用zmq_proxy
重寫上面的掮客代碼的話, 代碼會很是簡潔, 會變成這樣:
#include <zmq.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket_for_client = zmq_socket(context, ZMQ_ROUTER); void * socket_for_server = zmq_socket(context, ZMQ_DEALER); zmq_bind(socket_for_client, "tcp://*:5559"); zmq_bind(socket_for_server, "tcp://*:5560"); zmq_proxy(socket_for_client, socket_for_server, NULL); zmq_close(socket_for_client); zmq_close(socket_for_server); zmq_ctx_destroy(context); return 0; }
橋接是服務器後端的一種經常使用技巧. 所謂的橋接有點相似於掮客, 可是解決問題的側重點不同. 掮客主要解決了三個問題:
而橋接解決的問題的側重點主要在:
這種設計思路經常使用於後臺服務的接口層. 接口層一方面鏈接着後端內部局域網, 另一方面對公提供服務. 這種服務能夠是請求-迴應式的服務, 也能夠是發佈-訂閱式的服務(顯然發佈方在後端內部的局域網裏). 這個時候接口層其實就完成了橋接的工做.
其實這種應用場景裏, 把這種技巧稱爲橋接
並非很合適. 由於橋接
是一個計算機網絡中硬件層的術語, 最初是用於線纜過長信號衰減時, 在線纜末端再加一個信號放大器之類的設備, 爲通訊續命用的.
原版ZMQ文檔在這裏提出bridging
這個術語, 也只是爲了說明一下, zmq_proxy
的適用場景不只侷限於作掮客, 而是應該在理解上更寬泛一點, zmq_proxy
函數就是互相傳遞兩個socket之間數據函數, 僅此而已, 而具體這個函數能應用在什麼樣的場景下, 掮客與橋接場景都可以使用, 但毫不侷限於此. 寫代碼思惟要活.
ZMQ庫對待錯誤, 或者叫異常, 的設計哲學是: 見光死. 前文中寫的多數示例代碼, 都沒有認真的檢查ZMQ庫函數調用的返回值, 也沒有關心它們執行失敗後會發生什麼. 通常狀況下, 這些函數都能正常工做, 但凡事總有個萬一, 萬一建立socket失敗了, 萬一bind或connect調用失敗了, 會發生什麼?
按照見光死的字面意思: 按咱們上面寫代碼的風格, 一旦出錯, 程序就掛掉退出了.
因此正確使用ZMQ庫的姿式是: 生產環境運行的代碼, 務必爲每個ZMQ庫函數的調用檢查返回值, 考慮調用失敗的狀況. ZMQ庫函數的設計也繼續了POSIX接口風格里的一些設計, 這些設計包括:
errno
中, 或zmq_errno()
中zmq_strerror()
可能得到真正健壯的代碼, 應該像下面這樣寫, 是的, 它很囉嗦, 但它很健壯:
// ... void * context = zmq_ctx_new(); assert(context); void * socket = zmq_socket(context, ZMQ_REP); assert(socket); int rc = zmq_bind(socket, "tcp://*:5555"); if(rc == -1) { printf("E: bind failed: %s\n", strerror(errno)); return -1; } // ...
有兩個比較例外的狀況須要你注意一下:
ZMQ_DONTWAIT
的函數返回-1時, 通常狀況下不是一個致命錯誤, 不該當致使程序退出. 好比在收包函數裏帶上這個標誌, 那麼語義只是說"沒數據可收", 是的, 收包函數會返回-1, 而且會置error
值爲EAGAIN
, 但這並不表明程序發生了不可逆轉的錯誤.zmq_ctx_destroy()
時, 若是此時有其它線程在忙, 好比在寫數據或者收數據什麼的, 那麼這會直接致使這些在幹活的線程, 調用的這些阻塞式接口函數返回-1, 而且errno
被置爲ETERM
. 這種狀況在實際編碼過程當中不該當出現.下面咱們寫一個健壯的分治套路, 和咱們在第一章中寫過的相似, 不一樣的是, 此次, 在監理收到"全部工做均完成"的消息以後, 會發消息給各個工程隊, 讓工程隊中止運行. 這個例子主要有兩個目的:
原先的分治套路代碼, 使用PUSH/PULL這兩種socket類型, 將任務分發給多個工程隊. 但在工做作完以後, 工程隊的程序還在運行, 工程隊的程序沒法得知任務什麼進修終止. 這裏咱們再摻入發佈-訂閱套路, 在工做作完以後, 監理向廣大工程隊, 經過PUB類型的socket發送"活幹活了"的消息, 而工程隊用SUB類型的socket一旦收到監理的消息, 就中止運行.
包工頭ventilator的代碼和上一章的一毛同樣, 只是對全部的ZMQ庫函數調用增長了錯誤處理. 照顧你們, 這裏再帖一遍
#include <zmq.h> #include <stdio.h> #include <time.h> #include <assert.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); assert(context); void * socket_to_sink = zmq_socket(context, ZMQ_PUSH); assert(socket_to_sink); void * socket_to_worker = zmq_socket(context, ZMQ_PUSH); assert(socket_to_worker); if(zmq_connect(socket_to_sink, "tcp://localhost:5558") == -1) { printf("E: connect failed: %s\n", strerror(errno)); return -1; } if(zmq_bind(socket_to_worker, "tcp://*:5557") == -1) { printf("E: bind failed: %s\n", strerror(errno)); return -1; } printf("Press Enter when all workers get ready:"); getchar(); printf("Sending tasks to workers...\n"); if(s_send(socket_to_sink, "Get ur ass up") == -1) { printf("E: s_send failed: %s\n", strerror(errno)); return -1; } srandom((unsigned)time(NULL)); int total_ms = 0; for(int i = 0; i < 100; ++i) { int workload = randof(100) + 1; total_ms += workload; char string[10]; snprintf(string, sizeof(string), "%d", workload); if(s_send(socket_to_worker, string) == -1) { printf("E: s_send failed: %s\n", strerror(errno)); return -1; } } printf("Total expected cost: %d ms\n", total_ms); zmq_close(socket_to_sink); zmq_close(socket_to_worker); zmq_ctx_destroy(context); return 0; }
接下來是工程隊worker的代碼, 這一版新增了一個socket_to_sink_of_control
來接收來自監理的中止消息:
#include <zmq.h> #include <assert.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); assert(context); void * socket_to_ventilator = zmq_socket(context, ZMQ_PULL); assert(socket_to_ventilator); if(zmq_connect(socket_to_ventilator, "tcp://localhost:5557") == -1) { printf("E: connect failed: %s\n", strerror(errno)); return -1; } void * socket_to_sink = zmq_socket(context, ZMQ_PUSH); assert(socket_to_sink); if(zmq_connect(socket_to_sink, "tcp://localhost:5558") == -1) { printf("E: connect failed: %s\n", strerror(errno)); return -1; } void * socket_to_sink_of_control = zmq_socket(context, ZMQ_SUB); assert(socket_to_sink_of_control); if(zmq_connect(socket_to_sink_of_control, "tcp://localhost:5559") == -1) { printf("E: connect failed: %s\n", strerror(errno)); return -1; } if(zmq_setsockopt(socket_to_sink_of_control, ZMQ_SUBSCRIBE, "", 0) == -1) { printf("E: setsockopt failed: %s\n", strerror(errno)); } zmq_pollitem_t items [] = { { socket_to_ventilator, 0, ZMQ_POLLIN, 0 }, { socket_to_sink_of_control, 0, ZMQ_POLLIN, 0 }, }; while(1) { if(zmq_poll(items, 2, -1) == -1) { printf("E: poll failed: %s\n", strerror(errno)); return -1; } if(items[0].revents & ZMQ_POLLIN) { char * strWork = s_recv(socket_to_ventilator); assert(strWork); printf("%s.", strWork); fflush(stdout); s_sleep(atoi(strWork)); free(strWork); if(s_send(socket_to_sink, "") == -1) { printf("E: s_send failed %s\n", strerror(errno)); return -1; } } if(items[1].revents & ZMQ_POLLIN) { break; } } zmq_close(socket_to_ventilator); zmq_close(socket_to_sink); zmq_close(socket_to_sink_of_control); zmq_ctx_destroy(context); return 0; }
接下來是監理的代碼, 這一版新增了socket_to_worker_of_control
來在任務結束以後給工程隊發佈中止消息:
#include <zmq.h> #include <assert.h> #include <stdint.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); assert(context); void * socket_to_worker = zmq_socket(context, ZMQ_PULL); if(zmq_bind(socket_to_worker, "tcp://*:5558") == -1) { printf("E: bind failed: %s\n", strerror(errno)); return -1; } void * socket_to_worker_of_control = zmq_socket(context, ZMQ_PUB); if(zmq_bind(socket_to_worker_of_control, "tcp://*:5559") == -1) { printf("E: bind failed: %s\n", strerror(errno)); return -1; } char * strBeginMsg = s_recv(socket_to_worker); assert(strBeginMsg); free(strBeginMsg); int64_t i64StartTime = s_clock(); for(int i = 0; i < 100; ++i) { char * strRes = s_recv(socket_to_worker); assert(strRes); free(strRes); if(i % 10 == 0) { printf(":"); } else { printf("."); } fflush(stdout); } printf("Total elapsed time: %d msec\n", (int)(s_clock() - i64StartTime)); if(s_send(socket_to_worker_of_control, "STOP") == -1) { printf("E: s_send failed: %s\n", strerror(errno)); return -1; } zmq_close(socket_to_worker); zmq_close(socket_to_worker_of_control); zmq_ctx_destroy(context); return 0; }
這個例子也展現瞭如何將多種套路揉合在一個場景中. 因此說寫代碼, 思惟要靈活.
通常狀況下, Linux上的程序在接收到諸如SIGINT
和SIGTERM
這樣的信號時, 其默認動做是讓進程退出. 這種退出信號的默認行爲, 只是簡單的把進程幹掉, 不會管什麼緩衝區有沒有正確刷新, 也不會管文件以及其它資源句柄是否是正確被釋放了.
這對於實際應用場景中的程序來講是不可接受的, 因此在編寫後臺應用的時候必定要注意這一點: 要妥善的處理POSIX Signal. 限於篇幅, 這裏不會對Signal進行進一步討論, 若是對這部份內容不是很熟悉的話, 請參閱<Unix環境高級編程>(<Advanced Programming in the UNIX Environment>)第十章(chapter 10. Signals).
下面是妥善處理Signal的一個例子
#include <stdlib.h> #include <stdio.h> #include <signal.h> #include <unistd.h> #include <fcntl.h> #include <assert.h> #include <string.h> #include <zmq.h> #define S_NOTIFY_MSG " " #define S_ERROR_MSG "Error while writing to self-pipe.\n" static int s_fd; static void s_signal_handler(int signal_value) { int rc = write(s_fd, S_NOTIFY_MSG, sizeof(S_NOTIFY_MSG)); if(rc != sizeof(S_NOTIFY_MSG)) { write(STDOUT_FILENO, S_ERROR_MSG, sizeof(S_ERROR_MSG) - 1); exit(1); } } static void s_catch_signals(int fd) { s_fd = fd; struct sigaction action; action.sa_handler = s_signal_handler; action.sa_flags = 0; sigemptyset(&action.sa_mask); sigaction(SIGINT, &action, NULL); sigaction(SIGTERM, &action, NULL); } int main(void) { int rc; void * context = zmq_ctx_new(); assert(context); void * socket = zmq_socket(context, ZMQ_REP); assert(socket); if(zmq_bind(socket, "tcp://*:5555") == -1) { printf("E: bind failed: %s\n", strerror(errno)); return -__LINE__; } int pipefds[2]; rc = pipe(pipefds); if(rc != 0) { printf("E: creating self-pipe failed: %s\n", strerror(errno)); return -__LINE__; } for(int i = 0; i < 2; ++i) { int flags = fcntl(pipefds[0], F_GETFL, 0); if(flags < 0) { printf("E: fcntl(F_GETFL) failed: %s\n", strerror(errno)); return -__LINE__; } rc = fcntl(pipefds[0], F_SETFL, flags | O_NONBLOCK); if(rc != 0) { printf("E: fcntl(F_SETFL) failed: %s\n", strerror(errno)); return -__LINE__; } } s_catch_signals(pipefds[1]); zmq_pollitem_t items[] = { { 0, pipefds[0], ZMQ_POLLIN, 0 }, { socket, 0, ZMQ_POLLIN, 0 }, }; while(1) { rc = zmq_poll(items, 2, -1); if(rc == 0) { continue; } else if(rc < 0) { if(errno == EINTR) { continue; } else { printf("E: zmq_poll failed: %s\n", strerror(errno)); return -__LINE__; } } // Signal pipe FD if(items[0].revents & ZMQ_POLLIN) { char buffer[2]; read(pipefds[0], buffer, 2); // clear notifying bytes printf("W: interrupt received, killing server...\n"); break; } // Read socket if(items[1].revents & ZMQ_POLLIN) { char buffer[255]; rc = zmq_recv(socket, buffer, 255, ZMQ_NOBLOCK); if(rc < 0) { if(errno == EAGAIN) { continue; } if(errno == EINTR) { continue; } printf("E: zmq_recv failed: %s\n", strerror(errno)); return -__LINE__; } printf("W: recv\n"); // Now send message back; // ... } } printf("W: cleaning up\n"); zmq_close(socket); zmq_ctx_destroy(context); return 0; }
上面這個程序的邏輯流程是這樣的:
ZMQ_REP
的zmq socket, 並將之bind在本地5555端口上而後程序爲信號SIGINT
與SIGTERM
掛載了自定義的信號處理函數, 信號處理函數作的事以下:
" "
"Err while writing to self-pipe"
並調用exit()
退出程序而後將zmq socket與管道1讀端均加入zmq_poll
SIGINT
或SIGTERM
信號, 則退出數據處理循環, 以後將依次調用zmq_close()
與zmq_ctx_destroy()
這種寫法使用了管道, 邏輯上清晰了, 代碼上繁瑣了, 但這都不是重點, 重點是這個版本的服務端程序在接收到SIGINT
與SIGTERM
時, 雖然也會退出進程, 但在退出以前會妥善的關閉掉zmq socket與zmq context.
而還有一種更簡潔的寫法(這種簡潔的寫在實際上是有潛在的漏洞的, 詳情請參見<Unix環境高級編程>(<Advanced Programming in the UNIX Environment>) 第十章(chapter 10. Signals) )
s_interrupted
SIGINT
之類的信號時, 置s_interrupted
爲1s_interrupted
的值, 若該值爲1, 則進入退出流程大體以下:
s_catch_signals(); // 註冊事件回調 client = zmq_socket(...); while(!s_interrupted) // 時刻檢查 s_interrupted 的值 { char * message = s_recv(client); if(!message) { break; // 接收消息異常時也退出 } // 處理業務邏輯 } zmq_close(close);
服務端應用程序最蛋疼的問題就是內存泄漏了, 這個問題已經困擾了C/C++程序員二三十年了, ZMQ在這裏建議你使用工具去檢測你的代碼是否有內存泄漏的風險. 這裏建議你使用的工具是: valgrind
默認狀況下, ZMQ自己會致使valgrind報一大堆的警告, 首先先屏蔽掉這些警告. 在你的工程目錄下新建一個文件名爲 vg.supp
, 寫入下面的內容
{ <socketcall_sendto> Memcheck:Param socketcall.sendto(msg) fun:send ... } { <socketcall_sendto> Memcheck:Param socketcall.send(msg) fun:send }
而後記得妥善處理掉諸如SIGINT
與SIGTERM
這樣的Signal. 不然valgrind會認爲不正確的退出程序會有內存泄漏風險. 最後, 在編譯你的程序時, 加上 -DDEBUG
選項. 而後以下運行valgrind
valgrind --tool=memcheck --leak-check=full --suppression=vg.supp <你的程序>
若是你的代碼寫的沒有什麼問題, 會獲得下面這樣的讚揚
==30536== ERROR SUMMARY: 0 errors from 0 contexts...
啊, 多線程, 給你們講一個笑話, 小明有一個問題, 而後小明決定使用多線程編程解決這個問題. 最後小明問題兩個了有.
傳統的多線程編程中, 或多或少都會摻入同步手段. 而這此同步手段通常都是程序員的噩夢, 信號量, 鎖. ZMQ則告誡廣大程序員: 不要使用信號量, 也不要使用鎖, 不要使用除了 zmq inproc
以外的任何手段進行線程間的數據交互.
ZMQ在多線程上的哲學是這樣的:
更細節的, 在進行多線程編程時, 你應當遵循如下的幾個點:
若是你程序要用到多個掮客, 好比, 多個線程都擁有本身獨立的掮客, 一個常見的錯誤就是: 在A線程裏建立掮客的左右兩端socket, 而後將socket傳遞給B線程裏的掮客. 這違反了上面的規則: 不要在線程間傳遞socket. 這種錯誤很難發覺, 而且出錯是隨機的, 出現問題後很難排查.
ZMQ對線程庫是沒有侵入性的, ZMQ沒有內置線程庫, 也沒有使用其它的線程實例. 使用ZMQ寫多線程應用程序, 多線程接口就是操做系統操做的線程接口. 因此它對線程相關的檢查工具是友好的: 好比Intel的Thread Checker. 這種設計的壞處就是你寫的程序在線程接口這一層的可移植性須要你本身去保證. 或者你須要使用其它第三方的可移植線程庫.
這裏咱們寫一個例子吧, 咱們把最初的請求-迴應套路代碼改形成多線程版的. 原始版的服務端是單進程單線程程序, 若是請求量比較低的話, 是沒有什麼問題的, 單線程的ZMQ應用程序吃滿一個CPU核心是沒有問題的, 但請求量再漲就有點捉襟見肘了, 這個時候就須要讓程序吃滿多個核心. 固然多進程服務也能完成任務, 但這裏主要是爲了介紹在多線程編程中使用ZMQ, 因此咱們把服務端改形成多線程模式.
另外, 顯然你能夠使用一個掮客, 再外加一堆服務端結點(不管結點是獨立的進程, 仍是獨立的機器)來讓服務端的處理能力更上一層樓. 但這更跑偏了.
仍是看代碼吧. 服務端代碼以下:
#include <pthread.h> #include <unistd.h> #include <assert.h> #include "zmq_helper.h" static void * worker_routine(void * context) { void * socket_to_main_thread = zmq_socket(context, ZMQ_REP); assert(socket_to_main_thread); zmq_connect(socket_to_main_thread, "inproc://workers"); while(1) { char * strReq = s_recv(socket_to_main_thread); printf("Received request: [%s]\n", strReq); free(strReq); sleep(1); s_send(socket_to_main_thread, "World"); } zmq_close(socket_to_main_thread); return NULL; } int main(void) { void * context = zmq_ctx_new(); assert(context); void * socket_to_client = zmq_socket(context, ZMQ_ROUTER); assert(socket_to_client); zmq_bind(socket_to_client, "tcp://*:5555"); void * socket_to_worker_thread = zmq_socket(context, ZMQ_DEALER); assert(socket_to_worker_thread); zmq_bind(socket_to_worker_thread, "inproc://workers"); for(int i = 0; i < 5; ++i) { pthread_t worker; pthread_create(&worker, NULL, worker_routine, context); } zmq_proxy(socket_to_client, socket_to_worker_thread, NULL); zmq_close(socket_to_client); zmq_close(socket_to_worker_thread); zmq_ctx_destroy(context); return 0; }
這就是一個很正統的設計思路, 多個線程之間是互相獨立的, worker線程自己很容易能改形成獨立的進程, 主線程作掮客.
來, 下面就是一個例子, 使用PAIR socket完成線程同步, 內部通訊使用的是inproc
#include <zmq.h> #include <pthread.h> #include "zmq_helper.h" static void * thread1_routine(void * context) { printf("thread 1 start\n"); void * socket_to_thread2 = zmq_socket(context, ZMQ_PAIR); zmq_connect(socket_to_thread2, "inproc://thread_1_2"); printf("thread 1 ready, send signal to thread 2\n"); s_send(socket_to_thread2, "READY"); zmq_close(socket_to_thread2); printf("thread 1 end\n"); return NULL; } static void * thread2_routine(void * context) { printf("thread 2 start\n"); void * socket_to_thread1 = zmq_socket(context, ZMQ_PAIR); zmq_bind(socket_to_thread1, "inproc://thread_1_2"); pthread_t thread1; pthread_create(&thread1, NULL, thread1_routine, context); char * str = s_recv(socket_to_thread1); free(str); zmq_close(socket_to_thread1); void * socket_to_mainthread = zmq_socket(context, ZMQ_PAIR); zmq_connect(socket_to_mainthread, "inproc://thread_2_main"); printf("thread 2 ready, send signal to main thread\n"); s_send(socket_to_mainthread, "READY"); zmq_close(socket_to_mainthread); printf("thread 2 end\n"); return NULL; } int main(void) { printf("main thread start\n"); void * context = zmq_ctx_new(); void * socket_to_thread2 = zmq_socket(context, ZMQ_PAIR); zmq_bind(socket_to_thread2, "inproc://thread_2_main"); pthread_t thread2; pthread_create(&thread2, NULL, thread2_routine, context); char * str = s_recv(socket_to_thread2); free(str); zmq_close(socket_to_thread2); printf("Test over\n"); zmq_ctx_destroy(context); printf("main thread end\n"); return 0; }
這個簡單的程序包含了幾個編寫多線程同步時的潛規則:
須要注意的是, 上面這種寫法的多線程, 很難拆成多個進程, 上面這種寫法通常用於壓根就不許備拆分的服務端應用程序. inproc很快, 性能很好, 可是不能用於多進程或多結點通訊.
另一種常見的設計就是使用tcp來傳遞同步信息. 使用tcp使得多線程拆分紅多進程成爲一種可能. 另一種同步場景就是使用發佈-訂閱套路. 而不使用PAIR. 甚至能夠使用掮客使用的ROUTER/DEALER進行同步. 但須要注意下面幾點:
zmq_send
接口發送的消息將變成一個多幀消息被髮出去. 若是你發的同步消息不帶語義, 那麼還好, 若是你發送的消息帶語義, 那麼請特別當心這一點, 多幀消息的細節將在第三章進行進一步討論. 而DEALER則會把消息廣播給全部對端, 這一點和PUSH同樣, 請額外注意. 總之創建在閱讀第三章以前, 不要用ROUTER或DEALER作線程同步.zmq_setsockopt
設置過濾器, 不然SUB端收不到任何消息, 這一點很煩.因此總的來講, 用PAIR是最方便的選擇.
當你須要同步, 或者協調的兩個結點位於兩個不一樣的機器上時, PAIR就不那麼好用了, 直接緣由就是: PAIR不支持斷線重連. 在同一臺機器上, 多個進程之間同步, 沒問題, 多個線程之間同步, 也沒問題. 由於單機內創建起的通信鏈接基本不可能發生意外中斷, 而一旦發生中斷, 必定是進程掛了, 這個時候麻煩事是進程爲何掛了, 而不是通信鏈接爲何掛了.
可是在跨機器的結點間進行同步, 就須要考慮到網絡波動的緣由了. 結點自己上運行的服務可能沒什麼問題, 但就是網線被剪了, 這種狀況下使用PAIR就再也不合適了, 你就必須使用其它socket類型了.
另外, 線程同步與跨機器結點同步之間的另一個重大區別是: 線程數量通常是固定的, 服務穩定運行期間, 線程數目通常不會增長, 也不會減小. 但跨機器結點可能會橫向擴容. 因此要考慮的事情就又我了一坨.
咱們下面會給出一個示例程序, 向你展現跨機器結點之間的同步到底應該怎麼作. 還記得上一章咱們講發佈-訂閱套路的時候, 提到的, 在訂閱方創建鏈接的那段短暫的時間內, 全部發布方發佈的消息都會被丟棄嗎? 這裏咱們將改進那個程序, 在下面改進版的發佈-訂閱套路中, 發佈方會等待全部訂閱方都創建鏈接完成後, 纔開始廣播消息. 下面將要展現的代碼主要作了如下的工做:
來看代碼:
發佈方代碼:
#include <zmq.h> #include "zmq_helper.h" #define SUBSCRIBER_EXPECTED 10 int main(void) { void * context = zmq_ctx_new(); void * socket_for_pub = zmq_socket(context, ZMQ_PUB); int sndhwm = 1100000; zmq_setsockopt(socket_for_pub, ZMQ_SNDHWM, &sndhwm, sizeof(int)); zmq_bind(socket_for_pub, "tcp://*:5561"); void * socket_for_sync = zmq_socket(context, ZMQ_REP); zmq_bind(socket_for_sync, "tcp://*:5562"); printf("waiting for subscribers\n"); int subscribers_count = 0; while(subscribers_count < SUBSCRIBER_EXPECTED) { char * str = s_recv(socket_for_sync); free(str); s_send(socket_for_sync, ""); subscribers_count++; } printf("broadingcasting messages\n"); for(int i = 0; i < 1000000; ++i) { s_send(socket_for_pub, "Lalalal"); } s_send(socket_for_pub, "END"); zmq_close(socket_for_pub); zmq_close(socket_for_sync); zmq_ctx_destroy(context); return 0; }
訂閱方代碼
#include <zmq.h> #include <unistd.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket_for_sub = zmq_socket(context, ZMQ_SUB); zmq_connect(socket_for_sub, "tcp://localhost:5561"); zmq_setsockopt(socket_for_sub, ZMQ_SUBSCRIBE, "", 0); sleep(1); void * socket_for_sync = zmq_socket(context, ZMQ_REQ); zmq_connect(socket_for_sync, "tcp://localhost:5562"); s_send(socket_for_sync, ""); char * str = s_recv(socket_for_sync); free(str); int i = 0; while(1) { char * str = s_recv(socket_for_sub); if(strcmp(str, "END") == 0) { free(str); break; } free(str); i++; } printf("Received %d broadcast message\n", i); zmq_close(socket_for_sub); zmq_close(socket_for_sync); zmq_ctx_destroy(context); return 0; }
最後帶一個啓動腳本:
#! /bin/bash echo "Starting subscribers..." for((a=0; a<10; a++)); do ./subscriber & done echo "Starting publisher..." ./publisher
運行啓動腳本以後, 你大概會獲得相似於下面的結果:
Starting subscribers... Starting publisher... waiting for subscribers broadingcasting messages Received 1000000 broadcast message Received 1000000 broadcast message Received 1000000 broadcast message Received 1000000 broadcast message Received 1000000 broadcast message Received 1000000 broadcast message Received 1000000 broadcast message Received 1000000 broadcast message Received 1000000 broadcast message Received 1000000 broadcast message
你看, 此次有了同步手段, 每一個訂閱者都確實收到了100萬條消息, 一條很多
上面的代碼還有一個細節須要你注意一下:
注意到在訂閱者的代碼中, 有一行sleep(1)
, 若是去掉這一行, 運行結果可能(很小的機率)不是咱們指望的那樣. 之因此這樣作是由於:
先建立用於接收消息的socket_for_sub
, 而後connect
之. 再去作同步操做. 有可能: 同步的REQ與REP對話已經完成, 可是socket_for_sub
的鏈接過程尚未結束. 這個時候仍是會丟掉消息. 也就是說, 這個sleep(1)
操做是爲了確認: 在同步操做完成以後, 用於發佈-訂閱套路的通信鏈接必定創建好了.
接觸過與性能有關的網絡編程的*nix端後臺開發的同步必定據說這這樣的一個術語: 零拷貝(Zero-Copy). 你仔細回想咱們經過網絡編程接收, 發送消息的過程. 若是咱們要發送一個消息, 咱們須要把這個消息傳遞給發送相關的接口, 若是咱們須要接收一個消息, 咱們須要把咱們的緩衝區提供給接收消息的函數.
這裏就有一個性能痛點, 特別是在接收消息的時候: 在網絡接口API底層, 必定有另一個緩衝區率先接收了數據, 以後, 你調用收包函數, 諸如recv
這樣的函數, 將你的緩衝區提供給函數, 而後, 數據須要從事先收到數據的緩衝區, 拷貝至你本身提供給API的緩衝區.
若是咱們向更底層追究一點, 會發現網絡編程中, 最簡單的發收消息模型裏, 至少存在着兩到三次拷貝, 不光收包的過程當中有, 發包也有. 上面講到的只是離應用開發者最近的一層發生的拷貝動做. 而實際上, 可能發生拷貝的地方有: 應用程序與API交互層, API與協議棧交互層, 協議棧/內核空間交互層, 等等.
對於更深層次來說, 不是咱們應用程序開發者應該關心的地方, 而且時至今日, 從協議棧到離咱們最近的那一層, 操做系統基本上都作了避免拷貝的優化. 那麼, ZMQ做爲一個網絡庫, 在使用的進修, 應用程序開發就應當避免離咱們最近的那一次拷貝.
這也是爲何ZMQ庫除了zmq_send
與zmq_recv
以外, 又配套zmq_msg_t
類型再提供了zmq_msg_send
與zmq_msg_recv
接口的緣由. zmq_msg_t
內置了一個緩衝區, 能夠用來收發消息, 當你使用msg系的接口時, 收與發都發生在zmq_msg_t
實例的緩衝區中, 不存在拷貝問題.
總之, 要避免拷貝, 須要如下幾步:
zmq_msg_init_data()
建立一個zmq_msg_t
實例. 接口返回的是zmq_msg_t
的句柄. 應用開發者看不到底層實現.memcpy
之類的接口寫入zmq_msg_t
中, 再傳遞給zmq_msg_send
. 接收數據時, 直接將zmq_msg_t
句柄傳遞給zmq_msg_recv
zmq_msg_t
被髮送以後, 其中的數據就自動被釋放了. 也就是, 對於同一個zmq_msg_t
句柄, 你不能連續兩次調用zmq_msg_send
zmq_msg_t
內部使用了引用計數的形式來指向真正存儲數據的緩衝區, 也就是說, zmq_msg_send
會將這個計數減一. 當計數爲0時, 數據就會被釋放. ZMQ庫對於zmq_msg_t
的具體實現並無作過多介紹, 也只點到這一層爲止.zmq_msg_t
是有可能共享同一段二進制數據的. 這也是zmq_msg_copy
作的事情. 若是你須要將同一段二進制數據發送屢次, 那麼請使用zmq_msg_copy
來生成額外的zmq_msg_t
句柄. 每次zmq_msg_copy
操做都將致使真正的數據的引用計數被+1. 每次zmq_msg_send
則減1, 引用計數爲0, 數據自動釋放.zmq_msg_close
接口. 注意: 在zmq_msg_send
被調用以後, ZMQ庫自動調用了zmq_msg_close
, 你能夠理解爲, 在zmq_msg_send
內部, 完成數據發送後, 自動調用了zmq_msg_close
zmq_msg_t
的內部實現是一個黑盒, 因此若是要接收數據, 雖然調用zmq_msg_recv
的過程當中沒有發生拷貝, 但應用程序開發者最終仍是須要把數據讀出來. 這就必須有一次拷貝. 這是沒法避免的. 或者換一個角度來描述這個蛋疼的點: ZMQ沒有向咱們提供真正的零拷貝收包接口. 收包時的拷貝是無可避免的.最後給你們一個忠告: 拷貝確實是一個後端服務程序的性能問題. 但瓶頸通常不在調用網絡庫時發生的拷貝, 而在於其它地方的拷貝. zmq_msg_t
的使用重心不該該在"優化拷貝, 提高性能"這個點上, 而是第三章要提到和進一步深刻講解的多幀消息.
以前咱們講到的發佈-訂閱套路里, 發佈者廣播的消息全是字符串, 而訂閱者篩選過濾消息也是按字符串匹配前幾個字符, 這種策略有點土. 假如咱們能把發佈者廣播的消息分紅兩段: 消息頭與消息體. 消息頭裏寫明信息類型, 消息體裏再寫具體的信息內容. 這樣過濾器直接匹配消息頭就能決定這個消息要仍是不要, 這就看起來洋氣多了.
ZMQ中使用多幀消息支持這一點. 發佈者發佈多幀消息時, 訂閱者的過濾器在匹配時, 只匹配第一幀.
多說無益, 來看例子, 在具體展現發佈者與訂閱者代碼以前, 須要爲咱們的zmq_help.h
文件再加一個函數, 用於發送多帖消息的s_sendmore
/* * 把字符串做爲字節數據, 發送至zmq socket, 但不發送字符串末尾的'\0'字節 * 而且通知socket, 後續還有幀要發送 * 發送成功時, 返回發送的字節數 */ static inline int s_sendmore(void * socket, const char * string) { return zmq_send(socket, string, strlen(string), ZMQ_SNDMORE); }
下面是發佈者的代碼:
#include <zmq.h> #include <unistd.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket = zmq_socket(context, ZMQ_PUB); zmq_bind(socket, "tcp://*:5563"); while(1) { s_sendmore(socket, "A"); s_send(socket, "We don't want to see this"); s_sendmore(socket, "B"); s_send(socket, "We would like to see this"); sleep(1); } zmq_close(socket); zmq_ctx_destroy(context); return 0; }
下面是訂閱者的代碼:
#include <zmq.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket = zmq_socket(context, ZMQ_SUB); zmq_connect(socket, "tcp://localhost:5563"); zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "B", 1); while(1) { char * strMsgType = s_recv(socket); char * strMsgContent = s_recv(socket); printf("[%s] %s\n", strMsgType, strMsgContent); free(strMsgType); free(strMsgContent); } zmq_close(socket); zmq_ctx_destroy(socket); return 0; }
這裏有兩點:
消息愈加越快, 愈加越多, 你慢慢的就會意識到一個問題: 內存資源很寶貴, 而且很容易被用盡. 若是你不注意到這一點, 服務器上某個進程阻塞個幾秒鐘, 就炸了.
想象一下這個場景: 在同一臺機器上, 有一個進程A在瘋狂的向進程B發送消息. 忽然, B以爲很累, 休息了3秒(好比CPU過載, 或者B在跑GC吧, 無所謂什麼緣由), 這3秒鐘B處理不過來A發送的數據了. 那麼在這3秒鐘, A依然瘋狂的試圖向B發送消息, 會發生什麼? 若是B有收包緩衝區的話, 這個緩衝區確定被塞滿了, 若是A有發送緩衝區的話, 這個緩衝區也應該被塞滿了. 剩餘的沒被髮出去的消息就堆積到A進程的內存空間裏, 這個時候若是A程序寫的很差, 那麼A進程因爲內存被瘋狂佔用, 很快就會掛掉.
這是一個消息隊列裏的經典問題, 就是消息生產者和消費者的速度不匹配的時候, 消息中間件應當怎麼設計的問題. 這個問題的根實際上是在B身上, 但B對於消息隊列的設計者來講是不可控的: 這是消息隊列使用者寫的B程序, 你怎麼知道那波屌人寫的啥屌代碼? 因此雖然問題由B產生, 但最好仍是在A那裏解決掉.
最簡單的策略就是: A保留一些緩存能力, 應對突發性的情況. 超過必定限度的話, 就要扔消息了. 不能把這些生產出來的消息, 發不出去還存着. 這太蠢了.
另外還有一種策略, 若是A只是一個消息中轉者, 能夠在超過限度後, 告訴生產消息的上流, 你停一下, 我這邊滿了, 請不要再給我發消息了. 這種狀況下的解決方案, 其實就是經典的"流控"問題. 這個方案其實也很差, A只能向上遊發出一聲呻吟, 但上游若是執意仍是要發消息給A, A也沒辦法去剪網線, 因此轉一圈又回來了: 仍是得扔消息.
ZMQ裏, 有一個概念叫"高水位閾值", (high-water mark. HWM), 這個值實際上是網絡結點自身能緩存的消息的能力. 在ZMQ中, 每個活動的鏈接, 即socket, 都有本身的消息緩衝隊列, HWM指的就是這個隊列的容量. 對於某些socket類型, 諸如SUB/PULL/REQ/REP來講, 只有收包隊列. 對於某此socket類型來講, 諸如DEALER/ROUTER/PAIR, 既能收還能發, 就有兩個隊列, 一個用於收包, 一個用於發包.
在ZMQ 2.X版本中, HWM的值默認是無限的. 這種狀況下很容易出現咱們這一小節開頭講的問題: 發送消息的api接口永遠不會報錯, 對端假死以後內存就會炸. 在ZMQ 3.X版本中, 這個值默認是1000, 這就合理多了.
當socket的HWM被觸及後, 再調用發送消息接口, ZMQ要麼會阻塞接口, 要麼就扔掉消息. 具體哪一種行爲取決於sokcet的類型.
顯然在這種狀況下, 若是以非阻塞形式發包, 接口會返回失敗.
另外, 很特殊的是, inproc類型兩端的兩個socket共享同一個隊列: 真實的HWM值是雙方設置的HWM值的總和. 你能夠將inproc方式想象成一根管子, 雙方設置HWM時只是在宣稱我須要佔用多長的管子, 但真實的管子長度是兩者的總和.
最後, 很反直覺的是, HWM的單位是消息個數, 而不是字節數. 這就頗有意思了. 另外, HWM觸頂時, 隊列中的消息數量通常很差恰好就等於你設置的HWM值, 真實狀況下, 可能會比你設置的HWM值小, 極端狀況下可能只有你設置的HWM的一半.
當你寫代碼, 編譯, 連接, 運行, 而後發現收不到消息, 這個時候你應當這樣排查:
zmq_setsockopt
設置過濾器若是你使用的是SUB類型的socket, 上面兩點你都作正確了, 仍是有可能收不到消息. 這是由於ZMQ內部的隊列在鏈接創建以後可能尚未初始化完成. 這種狀況沒什麼好的解決辦法, 有兩個土辦法
sleep(1)
最後, 若是你確實找不到出錯的緣由, 但就是看不到消息, 請考慮向ZeroMQ 社區提問.