本文主要譯自 zguide - chapter one. 但並非照本翻譯.linux
介紹性的話我這裏就不翻譯了, 總結起來就是zmq很cool, 你應該嘗試一下.git
在Linux和Mac OS上, 請經過隨機附帶的包管理軟件, 或者home brew安裝zmq. 包名通常就叫zmq, 安裝上就好.程序員
安裝後, 以Mac OS爲例, 會出現一個新的頭文件 /usr/local/include/zmq.h
, 和一個連接庫 /usr/local/lib/libzmq.a
.github
因此, 若是你使用C語言, 那麼很簡單, 寫代碼的時候加上頭文件 #include <zmq.h>
就行了, 連接的時候加上庫 -lzmq
就行了.面試
若是你使用的不是C語言, 那麼也很簡單, 去複習一下C語言, 而後再回來看這個教程. 須要注意的是, 這個教程裏的全部示例代碼在編譯的時候須要指定 -std=c99.apache
先放一個一問一答的例子來讓你感覺一下編程
這是服務端代碼設計模式
#include <zmq.h> #include <stdio.h> #include <unistd.h> #include <string.h> #include <assert.h> int main(void) { void * context = zmq_ctx_new(); void * socket = zmq_socket(context, ZMQ_REP); zmq_bind(socket, "tcp://*:5555"); while(1) { char buffer[10]; int bytes = zmq_recv(socket, buffer, 10, 0); buffer[bytes] = '\0'; printf("[Server] Recevied Request Message: %d bytes, content == \"%s\"\n", bytes, buffer); sleep(1); const char * replyMsg = "World"; bytes = zmq_send(socket, replyMsg, strlen(replyMsg), 0); printf("[Server] Sended Reply Message: %d bytes, content == \"%s\"\n", bytes, replyMsg); } zmq_close(socket); zmq_ctx_destroy(context); return 0; }
這是客戶端代碼api
#include <zmq.h> #include <string.h> #include <stdio.h> #include <unistd.h> int main(void) { printf("Connecting to server...\n"); void * context = zmq_ctx_new(); void * socket = zmq_socket(context, ZMQ_REQ); zmq_connect(socket, "tcp://localhost:5555"); for(int i = 0; i < 10; ++i) { char buffer[10]; const char * requestMsg = "Hello"; int bytes = zmq_send(socket, requestMsg, strlen(requestMsg), 0); printf("[Client][%d] Sended Request Message: %d bytes, content == \"%s\"\n", i, bytes, requestMsg); bytes = zmq_recv(socket, buffer, 10, 0); buffer[bytes] = '\0'; printf("[Client][%d] Received Reply Message: %d bytes, content == \"%s\"\n", i, bytes, buffer); } zmq_close(socket); zmq_ctx_destroy(context); return 0; }
這是makefile緩存
all: client server %: %.c gcc -std=c99 $^ -o $@ -lzmq
這個例子就很簡單, 要點有如下
服務端上:
客戶端上
看起來套路和你從<Unix 網絡編程>裏學到的差很少嘛. 不過, 你能夠試試, 先啓動客戶端, 而後再啓動服務端, 你會發現, 程序沒有崩潰. 這就是zmq高明的地方, 把操做系統原生脆弱的網絡編程接口進行了封裝. 而且實際上不止於此, 後面咱們會學到更多. 這只是開胃小菜.
你可能注意到了咱們上面的例子裏, 其實客戶端與服務端互相傳輸的數據裏, 並無包含C風格字符串最後一位的'\0'. 請時刻謹記這一點, 網絡通訊中, 流動在網絡編程API上的數據, 對於API自己來講, 都是字節序列而已, 如何解釋這些字節序列, 是網絡編程API的使用者的責任. 好比上面, 咱們須要在每次接收數據的時候記錄接收的數據的大小, 而且在buffer中爲接收到的數據以後的一個字節賦值爲0, 即人爲的把接收到的數據解釋爲字符串. 而對於zmq_send
與zmq_recv
來講, 它並不關心客戶端與服務端傳輸的數據具體是什麼.
這在全部網絡編程API中都是這個套路, 不光是zmq, linux socket, winsock, 都是這樣. 字符串? 不存在的. 我能看見的, 只是字節序列而已.
當你要把zmq應用到實際項目中的時候, 版本號注是一個你必須關注的事情了. 固然, 項目初期你能夠不關心它, 或者項目規模較小的時候你能夠不關心它. 但隨着項目的進展, 項目中使用到的庫的版本號就成了全部人必須關心的事情. 實際上全部第三方庫的版本都是一個須要項目owner關心的事情, 由於總有一些sb會作出如下的事情:
因此, 在這裏衷心的建議你, 時刻關注你項目中使用的全部第三方庫, 搞清楚你的項目構造工具鏈的運行過程. 而對於zmq來講, 要得到zmq的版本, 須要以下調用一些函數
#include <zmq.h> #include <stdio.h> int main(void) { int major = 0; int minor = 0; int patch = 0; zmq_version(&major, &minor, &patch); printf("ZMQ_VERSION == %d.%d.%d\n", major, minor, patch); return 0; }
在我寫(抄)這個教程的時候, 我使用的版本號是4.2.5
有三件事我建議你養成習慣
如今我要寫三個工具函數, 這三個函數都不完美, 但它們都會出現大後續的示例程序裏, 用於縮減示例程序的篇幅:
第一個工具函數: 向zmq socket發送字符串數據, 但不帶結尾的'\0'
/* * 把字符串做爲字節數據, 發送至zmq socket, 但不發送字符串末尾的'\0'字節 * 發送成功時, 返回發送的字節數 */ static inline int s_send(void * socket, const char * string) { return zmq_send(socket, string, strlen(string), 0); }
第二個工具函數: 從zmq socket中接收數據, 並把其解釋爲一個字符串
/* * 從zmq socket中接收數據, 並將其解釋爲C風格字符串 * 注意: 該函數返回的字符串是爲在堆區建立的字符串 * 請在使用結束後手動調用free將其釋放 */ static inline char * s_recv(void * socket) { char buffer[256]; int length = zmq_recv(socket, buffer, 255, 0); if(length == -1) { return NULL; } buffer[length] = '\0'; return strndup(buffer, sizeof(buffer) - 1); }
第三個函數: 在取值範圍 [0, x) 中隨機生成一個整數
/* * 生成一個位於 [0, num)區間的隨機數 */ #define randof(num) (int)((float)(num) * random() / (RAND_MAX + 1.0))
這些工具函數都會以靜態內聯函數的形式寫在一個名爲 "zmq_helper.h" 的頭文件中, 在後續用得着這些工具函數的時候, 示例程序將直接使用, 而不作額外的說明. 對應的, 當新增一個工具函數的時候, 工具函數自己的源代碼會在合適的時候貼出
相信以Java爲主要工做語言的同窗, 在畢業面試的時候基本上都被面試官問過各類設計模式, design patterns. 不知道大家有沒有思考過一個哲學問題: 什麼是模式? 什麼是pattern? 爲何咱們須要設計模式?
我在這裏給出個人理解: 模式並不高大上, 模式其實就是"套路". 所謂的設計模式就是在面向對象程序設計架構中, 前人總結出來的一些慣用套路.
網絡編程中也有這樣的套路, 也被稱之爲模式, pattern. ZMQ做爲一個像消息庫的網絡庫, 致力於向你提供套路, 或者說, 向你提供一些便於實現套路的工具集. 下面, 咱們來看咱們接觸的第二個套路: 發佈-訂閱套路. (第一個套路是 請求-應答 套路)
發佈-訂閱套路中有兩個角色: 發佈者, 訂閱者. 或者通俗一點: 村口的大喇叭, 與村民.
發佈者, 與村口的大喇叭的共性是: 只生產消息, 不接收消息. 而訂閱者與村民的共性是: 只接收消息, 而不生產消息(好嗎, 村民會生產八卦消息, 擡槓就沒意思了). ZMQ提供了兩種特殊的socket用於實現這個模式, 這個套路, 下面是一個例子:
村口的大喇叭循環播放天氣預報, 播放的內容很簡單: 郵編+溫度+相對溫度. 各個村民只關心本身村的天氣狀況, 他們村的郵編是10001, 對於其它地區的天氣, 村民不關心.
發佈者/村口的大喇叭:
#include <zmq.h> #include <stdio.h> #include <stdlib.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket = zmq_socket(context, ZMQ_PUB); zmq_bind(socket, "tcp://*:5556"); srandom((unsigned)time(NULL)); while(1) { int zipcode = randof(100000); // 郵編: 0 ~ 99999 int temp = randof(84) - 42; // 溫度: -42 ~ 41 int relhumidity = randof(50) + 10; // 相對溼度: 10 ~ 59 char msg[20]; snprintf(msg, sizeof(msg), "%5d %d %d", zipcode, temp, relhumidity); s_send(socket, msg); } zmq_close(socket); zmq_ctx_destroy(context); return 0; }
訂閱者/村民:
#include <zmq.h> #include <stdio.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket = zmq_socket(context, ZMQ_SUB); zmq_connect(socket, "tcp://localhost:5556"); char * zipcode = "10001"; zmq_setsockopt(socket, ZMQ_SUBSCRIBE, zipcode, strlen(zipcode)); for(int i = 0; i < 50; ++i) { char * string = s_recv(socket); printf("[Subscriber] Received weather report msg: %s\n", string); free(string); } zmq_close(socket); zmq_ctx_destroy(context); return 0; }
makefile
all: publisher subscriber %: %.c gcc -std=c99 $^ -o $@ -lzmq
這個例子中須要特別注意的點有:
zmq_setsockopt
函數設置一個過濾器, 以說明關心哪些消息. 若是不設置過濾器, 那麼什麼消息都不會收到另外, 關於這個例子中的兩種socket類型, 有如下特色
ZMQ_PUB
類型的socket, 若是沒有任何村民與其相連, 其全部消息都將被簡單就地拋棄ZMQ_SUB
類型的socket, 便是村民, 能夠與多個ZMQ_PUB
類型的socket相連, 即村民能夠同時收聽多個喇叭, 但必須爲每一個喇叭都設置過濾器. 不然默認狀況下, zmq認爲村民不關心喇叭裏的全部內容.tcp
或ipc
這種面向鏈接的協議, 則堆積的消息緩存在喇叭裏, 當使用epgm
這種協議時, 堆積的消息緩存了村民裏. 在ZMQ 大版本號爲2的版本中, 全部狀況下, 消息都將堆積在村民裏. 後續章節咱們會學習到, 如何以"高水位閾值"來保護喇叭.ZMQ裏的ZMQ_PUB
型的發佈者, 也就是喇叭, 其發送消息的能力是很炸的, zmq的做者在官方的guide裏講到, 發佈者與訂閱者位於同臺機器上, 經過tcp://locahost鏈接, 發佈者發佈一千萬條消息, 大概用時4秒多. 這仍是一臺2011年的i5處理器的筆記本電腦. 還不是IDC機房裏的服務器...你大體感覺一下..這個時候有人就跳出來講了, 這同臺機器走了loopback, 確定效率高啊.
若是你也冒出這樣的想法, pong友, 看來你沒理解zmq的做者想表達的意思. 顯然, 若是採用以太網做鏈路層, 這個數據不可能這麼炸裂, 但做者只是想向你表達: ZMQ自己絕對不會成爲性能的瓶頸, 瓶頸確定在網絡IO上, 而不是ZMQ庫, 甚至於說操做系統協議棧上. 應用程序的性能瓶頸, 99.9999%都不在協議棧與網絡庫上, 而是受限於物理規格的網絡IO.
性能低? 你不買個幾百張82599武裝你的機房, 性能低你怪誰? 內心沒一點i3數嗎?
分治套路里有三個角色:
在介紹這一節的示例代碼以前, 咱們先引入了兩個工具函數:
/* * 獲取當時時間戳, 單位ms */ static inline int64_t s_clock(void) { struct timeval tv; gettimeofday(&tv, NULL); return (int64_t)(tv.tv_sec * 1000 + tv.tv_usec / 1000); } /* * 使當前進程睡眠指定毫秒 */ static inline void s_sleep(int ms) { struct timespec t; t.tv_sec = ms/1000; t.tv_nsec = (ms % 1000) * 1000000; nanosleep(&t, NULL); }
分治套路也被稱爲流水線套路. 下面是示例代碼:
包工頭代碼:
#include <zmq.h> #include <stdio.h> #include <time.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket_to_sink = zmq_socket(context, ZMQ_PUSH); void * socket_to_worker = zmq_socket(context, ZMQ_PUSH); zmq_connect(socket_to_sink, "tcp://localhost:5558"); zmq_bind(socket_to_worker, "tcp://*:5557"); printf("Press Enter when all workers get ready:"); getchar(); printf("Sending tasks to workers...\n"); s_send(socket_to_sink, "Get ur ass up"); // 通知監理, 幹活了 srandom((unsigned)time(NULL)); int total_ms = 0; for(int i = 0; i < 100; ++i) { int workload = randof(100) + 1; // 工做須要的耗時, 單位ms total_ms += workload; char string[10]; snprintf(string, sizeof(string), "%d", workload); s_send(socket_to_worker, string); // 將工做分派給工程隊 } printf("Total expected cost: %d ms\n", total_ms); zmq_close(socket_to_sink); zmq_close(socket_to_worker); zmq_ctx_destroy(context); return 0; }
工程隊代碼:
#include <zmq.h> #include <stdio.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket_to_ventilator = zmq_socket(context, ZMQ_PULL); void * socket_to_sink = zmq_socket(context, ZMQ_PUSH); zmq_connect(socket_to_ventilator, "tcp://localhost:5557"); zmq_connect(socket_to_sink, "tcp://localhost:5558"); while(1) { char * msg = s_recv(socket_to_ventilator); printf("Received msg: %s\n", msg); fflush(stdout); s_sleep(atoi(msg)); // 幹活, 即睡眠指定毫秒 free(msg); s_send(socket_to_sink, "DONE"); // 活幹完了通知監理 } zmq_close(socket_to_ventilator); zmq_close(socket_to_sink); zmq_ctx_destroy(context); return 0; }
監理代碼:
#include <zmq.h> #include <stdio.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket_to_worker_and_ventilator = zmq_socket(context, ZMQ_PULL); zmq_bind(socket_to_worker_and_ventilator, "tcp://*:5558"); char * msg = s_recv(socket_to_worker_and_ventilator); printf("Received msg: %s", msg); // 接收來自包工頭的開始幹活的消息 free(msg); int64_t start_time = s_clock(); for(int i = 0; i < 100; ++i) { // 接收100個worker幹完活的消息 char * msg = s_recv(socket_to_worker_and_ventilator); free(msg); if(i / 10 * 10 == i) printf(":"); else printf("."); fflush(stdout); } printf("Total elapsed time: %d ms]\n", (int)(s_clock() - start_time)); zmq_close(socket_to_worker_and_ventilator); zmq_ctx_destroy(context); return 0; }
這個示例程序的邏輯流程是這樣的:
包工頭裏輸出的預計耗時是100個任務的共計耗時, 在監理那裏統計的實際耗時則是由多個工程隊並行處理100個任務實際的耗時.
這裏個例子中須要注意的點有:
ZMQ_PULL
與ZMQ_PUSH
兩種socket. 分別供消息分發方與消息接收方使用. 看起來略微有點相似於發佈-訂閱套路, 具體之間的區別後續章節會講到.PUSH/PULL
模式雖然和PUB/SUB
不同, 不會丟失消息. 但若是不手動同步的話, 最早創建鏈接的工程隊將幾乎把全部任務都接收到手, 致使後續完成鏈接的工程隊拿不到任務, 任務分配不平衡.因此, 你大體能看出來, 分治套路里有一個核心問題, 就是任務分發者與任務執行者之間的同步. 若是在全部執行者均與分發者創建鏈接後, 進行分發, 那麼任務分發是比較公平的. 這就須要應用程序開發者本身負責同步事宜. 關於這個話題進一步的技巧將在第三章進一步討論.
如今咱們寫了三個例子, 分別是請求-迴應套路, 發佈-訂閱套路, 流水線套路. 在繼續進一步學習以前, 有必要對一些點進行強調
你大體注意到了, 在上面的全部示例代碼中, 每次都以zmq_ctx_new()
函數建立出一個名爲context
的變量, 目前你不須要了解它的細節, 這只是ZMQ庫的標準套路. 甚至於你未來都不須要了解這個context裏面究竟是什麼. 但你必需要遵循zmq中關於這個context的一些編程規定:
zmq_ctx_new()
建立contextzmq_ctx_destroy()
銷燬掉它每一個進程, 應該持有, 且應該只持有, 一個context. 固然, 目前來講, 你這樣理解就好了, 後續章節或許咱們會深刻探索一下context, 但目前, 請謹記, one context per process.
若是你在代碼中調用了fork
系統調用, 那麼請在子進程代碼區的開始處調用zmq_ctx_new()
, 爲子進程建立本身的context
網絡編程和內存泄漏簡直就是一對狗男女, 要避免這些狗血的場景, 寫代碼的時候, 時刻要謹記: 把屁股擦乾淨.在使用ZMQ編程的過程當中, 我建議你:
zmq_ctx_destroy()
以前, 先調用zmq_close()
關閉掉全部的zmq socket. 不然zmq_ctx_destroy
可能會被一直阻塞着zmq_send()
與zmq_recv()
來收發消息, 儘可能避免使用與zmq_msg_t
相關的API接口. 是的, 那些接口有額外的特性, 有額外的性能提高, 但在性能瓶頸不在這些細枝末節的時候, 不要過分造做.zmq_msg_t
相關的接口收發消息, 那麼請在調用zmq_msg_recv()
以後, 儘快的調用zmq_msg_close()
釋放掉消息對象固然, 上面主要是對C語言做者的一些建議, 對於其它語言, 特別是有GC的語言, 使用ZMQ相關接口以前建議確認相關的binding接口是否正確處理了資源句柄.
網絡編程, 特別是*nix平臺的網絡編程, 99%程序員的啓蒙始於<Unix網絡編程>這本書, 90%裏的項目充斥着linux socket, epoll與fd. 是的, 2018年了, 他們仍是這麼幹的. 咱們就從這個視角來列舉一下, 使用*nix平臺原生的網絡API與多路IO接口, 你在寫服務端程序時須要頭疼的事情:
我問你, 你頭大不大? 想不想死?
讀過開源項目嗎? 好比Hadoop Zookeeper, 你去觀摩一下zookeeper.c, 真是看的人頭大想死. 你再翻翻其它開源項目, 特別是用C/C++寫的Linux端程序, 每一個都要把網絡庫事件庫從新寫一遍.
因此矛盾很突出, 爲何不能造一個你們都用的輪子呢? 緣由很簡單, 有兩個方面:
那麼ZMQ解決了什麼問題呢? ZMQ給上面提出的問題都給了完美答案嗎? 理性的說, 確定沒有, 可是ZMQ是這樣回答這些問題的:
總之, 就是很好, 固然了沒有一個框架庫的做者會說本身的產品很差, 而具體好很差, 學了用了以後纔會知道, 上面的點看一看得了, 別當真.
在發佈-訂閱套路由, 當你開啓多個村民的時候, 你會發現, 全部村民都能收到消息, 而村口的喇叭也工做正常. 這就是zmq socket的可擴展性. 對於發佈端來說, 開發人員始終面對的是一個socket, 而不用去管鏈接我到底下面會有多少訂閱用戶. 這樣極大簡化了開發人員的工做, 實際發佈端程序跑起來的時候, 會自主進行適應, 並執行最合理的行爲. 更深層次一點, 你可能會說, 這樣的功能, 我用epoll在linux socket上也能實現, 可是, 當多個訂閱者開始接收數據的時候, 你仔細觀察你cpu的負載, 你會發現發佈端進程不光正確接納了全部訂閱者, 更重要的是把工做負載經過多線程均衡到了你電腦的多個核心上. 日最大程度的榨乾了你的cpu性能. 若是你單純的用epoll和linux socket來實現這個功能, 發佈端只會佔用一個核心, 除非你再寫一坨代碼以實現多線程或多進程版的村口大喇叭.