技術博客:github.com/yongxinz/te…html
同時,也歡迎關注個人微信公衆號 AlwaysBeta,更多精彩內容等你來。 python
引用官方說法:ZMQ(如下 ZeroMQ 簡稱 ZMQ)是一個簡單好用的傳輸層,像框架同樣的一個 socket library,他使得 Socket 編程更加簡單、簡潔和性能更高。git
是一個消息處理隊列庫,可在多個線程、內核和主機盒之間彈性伸縮。github
ZMQ 的明確目標是「成爲標準網絡協議棧的一部分,以後進入 Linux 內核」。如今還未看到它們的成功。可是,它無疑是極具前景的、而且是人們更加須要的「傳統」 BSD 套接字之上的一 層封裝。ZMQ 讓編寫高性能網絡應用程序極爲簡單和有趣。算法
它跟 RabbitMQ,ActiveMQ 之類有着至關本質的區別,ZeroMQ 根本就不是一個消息隊列服務器,更像是一組底層網絡通信庫,對原有的 Socket API 加上一層封裝,使咱們操做更簡便。編程
說到「請求-應答」模式,不得不說的就是它的消息流動模型。消息流動模型指的是該模式下,必須嚴格遵照「一問一答」的方式。segmentfault
發出消息後,若沒有收到回覆,再發出第二條消息時就會拋出異常。一樣的,對於 Rep 也是,在沒有接收到消息前,不容許發出消息。緩存
基於此構成「一問一答」的響應模式。服務器
server:微信
# -*- coding=utf-8 -*-
import zmq
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
while True:
message = socket.recv()
print("Received: %s" % message)
socket.send("I am OK!")
複製代碼
client:
# -*- coding=utf-8 -*-
import zmq
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
socket.send('Are you OK?')
response = socket.recv()
print("response: %s" % response)
複製代碼
「發佈-訂閱」模式下,「發佈者」綁定一個指定的地址,例如「192.168.10.1:5500」,「訂閱者」鏈接到該地址。該模式下消息流是單向的,只容許從「發佈者」流向「訂閱者」。且「發佈者」只管發消息,不理會是否存在「訂閱者」。一個「發佈者」能夠擁有多個訂閱者,一樣的,一個「訂閱者」也可訂閱多個發佈者。
雖然咱們知道「發佈者」在發送消息時是不關心「訂閱者」的存在於否,因此先啓動「發佈者」,再啓動「訂閱者」是很容易致使部分消息丟失的。那麼可能會提出一個說法「我先啓動‘訂閱者’,再啓動‘發佈者’,就能解決這個問題了?」
對於 ZeroMQ 而言,這種作法也並不能保證 100% 的可靠性。在 ZeroMQ 領域中,有一個叫作「慢木匠」的術語,就是說即便我是先啓動了「訂閱者」,再啓動「發佈者」,「訂閱者」老是會丟失第一批數據。由於在「訂閱者」與端點創建 TCP 鏈接時,會包含幾毫秒的握手時間,雖然時間短,可是是存在的。再加上 ZeroMQ 後臺 IO 是以一部方式執行的,因此若不在雙方之間施加同步策略,消息丟失是不可避免的。
關於「發佈-訂閱」模式在 ZeroMQ 中的一些其餘特色:
server:
# -*- coding=utf-8 -*-
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")
for i in range(10):
print('send message...' + str(i))
socket.send('message' + str(i))
time.sleep(1)
複製代碼
client:
# -*- coding=utf-8 -*-
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
socket.setsockopt(zmq.SUBSCRIBE, '')
while True:
response = socket.recv()
print("response: %s" % response)
複製代碼
在說明「管道模式」前,須要明確的是在 ZeroMQ 中並無絕對的服務端與客戶端之分,全部的數據接收與發送都是以鏈接爲單位的,只區分 ZeroMQ 定義的類型。就像套接字綁定地址時,可使用 bind
,也可使用 connect
,只是一般咱們將理解中的服務端 bind
到一個地址,而理解中的客戶端 connec
到該地址。
「管道模式」通常用於任務分發與結果收集,由一個任務發生器來產生任務,「公平」的派發到其管轄下的全部 worker,完成後再由結果收集器來回收任務的執行結果。
總體流程比較好理解,worker 鏈接到任務發生器上,等待任務的產生,完成後將結果發送至結果收集器。若是要以客戶端服務端的概念來區分,這裏的任務發生器與結果收集器是服務端,而 worker 是客戶端。
前面說到了這裏任務的派發是「公平的」,由於內部採用了 LRU 的算法來找到最近最久未工做的閒置 worker。可是公平在這裏是相對的,當任務發生器啓動後,第一個鏈接到它的 worker 會在一瞬間承受整個任務發生器產生的 tasks。
總結來講由三部分組成,push 進行數據推送,work 進行數據緩存,pull 進行數據競爭獲取處理。區別於 Publish-Subscribe 存在一個數據緩存和處理負載。
當鏈接被斷開,數據不會丟失,重連後數據繼續發送到對端。
server:
# -*- coding=utf-8 -*-
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5557")
for i in range(10):
socket.send('message' + str(i))
# 沒啓 worker 時不會發消息
print('send message...' + str(i))
time.sleep(1)
複製代碼
work:
# -*- coding=utf-8 -*-
import zmq
context = zmq.Context()
receive = context.socket(zmq.PULL)
receive.connect('tcp://127.0.0.1:5557')
sender = context.socket(zmq.PUSH)
sender.connect('tcp://127.0.0.1:5558')
while True:
data = receive.recv()
print('transform...' + data)
sender.send(data)
複製代碼
client:
# -*- coding=utf-8 -*-
import zmq
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind("tcp://*:5558")
while True:
response = socket.recv()
print("response: %s" % response)
複製代碼
以上。
參考文檔: