ZeroMQ初探

概述

ZeroMQ(也稱爲 ØMQ,0MQ 或 zmq)是一個可嵌入的網絡通信庫(對 Socket 進行了封裝)。 它提供了攜帶跨越多種傳輸協議(如:進程內,進程間,TCP 和多播)的原子消息的 sockets 。 有了ZeroMQ,咱們能夠經過發佈-訂閱、任務分發、和請求-回覆等模式來創建 N-N 的 socket 鏈接。 ZeroMQ 的異步 I / O 模型爲咱們提供可擴展的基於異步消息處理任務的多核應用程序。 它有一系列語言API(幾乎囊括全部編程語言),並可以在大多數操做系統上運行。java

傳統的 TCP Socket 的鏈接是1對1的,能夠認爲「1個 socket = 1個鏈接」,每個線程獨立維護一個 socket 。可是 ZMQ 摒棄了這種1對1的模式,ZMQ的 Socket 能夠很輕鬆地實現1對N和N對N的鏈接模式,一個 ZMQ 的 socket 能夠自動地維護一組鏈接,用戶沒法操做這些鏈接,用戶只能操做套接字而不是鏈接自己。因此說在 ZMQ 的世界裏,鏈接是私有的。python

三種基本模型

ZMQ 提供了三種基本的通訊模型,分別是 Request-Reply 、Publish-Subscribe 和 Parallel Pipeline ,接下來舉例說明三種模型並給出相應的代碼實現。編程

Request-Reply(請求-回覆)

以 「Hello World」 爲例。客戶端發起請求,並等待服務端迴應請求。客戶端發送一個簡單的 「Hello」,服務端則迴應一個 「World」。能夠有 N 個客戶端,一個服務端,所以是 1-N 鏈接。json

Request-Reply

服務端代碼以下:服務器

  • hwserver.java
import org.zeromq.ZMQ;
public class hwserver {
    public static void main(String[] args) throws InterruptedException {
        ZMQ.Context context = ZMQ.context(1);
        ZMQ.Socket responder = context.socket(ZMQ.REP);
        responder.bind("tcp://*:5555");
        while (!Thread.currentThread().isInterrupted()) {
            byte[] request = responder.recv(0);
            System.out.println("Received" + new String(request));
            Thread.sleep(1000);
            String reply = "World";
            responder.send(reply.getBytes(),0);
        }
        responder.close();
        context.term();
    }
}
  • hwserver.py
import time
import zmq
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
while True:
    message = socket.recv()
    print("Received request: %s" % message)
    # Do some 'work'
    time.sleep(1)
    socket.send(b"World")

客戶端代碼以下:網絡

  • hwclient.java
import org.zeromq.ZMQ;
public class hwclient {
    public static void main(String[] args) {
        ZMQ.Context context = ZMQ.context(1);
        ZMQ.Socket requester = context.socket(ZMQ.REQ);
        requester.connect("tcp://localhost:5555");
        for (int requestNbr = 0; requestNbr != 10; requestNbr++) {
            String request = "Hello";
            System.out.println("Sending Hello" + requestNbr);
            requester.send(request.getBytes(),0);
            byte[] reply = requester.recv(0);
            System.out.println("Reveived " + new String(reply) + " " + requestNbr);
        }
        requester.close();
        context.term();
    }
}
  • hwclient.py
import zmq
context = zmq.Context()
print("Connecting to hello world server...")
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
for request in range(10):
    print("Sending request %s ..." % request)
    socket.send(b"Hello")
    message = socket.recv()
    print("Received reply %s [ %s ]" % (request,message))

從以上的過程,咱們能夠了解到使用 ZMQ 寫基本的程序的方法,須要注意的是:dom

  1. 服務端和客戶端不管誰先啓動,效果是相同的,這點不一樣於 Socket。
  2. 在服務端收到信息之前,程序是阻塞的,會一直等待客戶端鏈接上來。
  3. 服務端收到信息後,會發送一個 「World」 給客戶端。值得注意的是必定是客戶端鏈接上來之後,發消息給服務端,服務端接受消息而後響應客戶端,這樣一問一答。
  4. ZMQ 的通訊單元是消息,它只知道消息的長度,並不關心消息格式。所以,你可使用任何你以爲好用的數據格式,如 Xml、Protocol Buffers、Thrift、json 等等。

Publish-Subscribe(發佈-訂閱)

下面以一個天氣預報的例子來介紹該模式。異步

服務端不斷地更新各個城市的天氣,客戶端能夠訂閱本身感興趣(經過一個過濾器)的城市的天氣信息。socket

Publish-Subscribe

服務端代碼以下:tcp

  • wuserver.java
import org.zeromq.ZMQ;
import java.util.Random;
public class wuserver {
    public static void main(String[] args) {
        ZMQ.Context context = ZMQ.context(1);
        ZMQ.Socket publisher = context.socket(ZMQ.PUB);
        publisher.bind("tcp://*:5556");
        publisher.bind("icp://weather");
        Random srandom = new Random(System.currentTimeMillis());
        while (!Thread.currentThread().isInterrupted()) {
            int zipcode, temperature, relhumidity;
            zipcode = 10000 + srandom.nextInt(10000);
            temperature = srandom.nextInt(215) - 80 + 1;
            relhumidity = srandom.nextInt(50) + 10 + 1;
            String update = String.format("%05d %d %d", zipcode, temperature, relhumidity);
        }
        publisher.close();
        context.term();
    }
}
  • wuserver.py
from random import randrange
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")
while True:
    zipcode = randrange(1, 100000)
    temperature = randrange(-80, 135)
    relhumidity = randrange(10, 60)
    socket.send_string("%i %i %i" % (zipcode, temperature, relhumidity))

客戶端代碼以下:

  • wuclient.java
import org.zeromq.ZMQ;
import java.util.StringTokenizer;
public class wuclient {
    public static void main(String[] args) {
        ZMQ.Context context = ZMQ.context(1);
        ZMQ.Socket suscriber = context.socket(ZMQ.SUB);
        suscriber.connect("tcp://localhost:5556");
        String filter = (args.length > 0) ? args[0] : "10001";
        suscriber.suscribe(filter.getBytes());  //過濾條件
        int update_nbr;
        long total_temp = 0;
        for (update_nbr = 0; update_nbr < 100; update_nbr++) {
            String string = suscriber.recvStr(0).trim();
            StringTokenizer sscanf = new StringTokenizer(string, " ");
            int zipcode = Integer.valueOf(sscanf.nextToken());
            int temperature = Integer.valueOf(sscanf.nextToken());
            int relhumidity = Integer.valueOf(sscanf.nextToken());
            total_temp += temperature;
        }
        System.out.println("Average temperature for zipcode '" + filter + "' was " + (int) (total_temp / update_nbr));
        suscriber.close();
        context.term();
    }
}
  • wuclient.py
import sys
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
print("Collecting updates from weather server...")
socket.connect("tcp://localhost:5556")
zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10001"
if isinstance(zip_filter, bytes):
    zip_filter = zip_filter.decode('ascii')
socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter)
total_temp = 0
for update_nbr in range(5):
    string = socket.recv_string()
    zipcode, temperature, relhumidity = string.split()
    total_temp += int(temperature)
print("Average temperature for zipcode '%s' was %dF" % (zip_filter, total_temp / (update_nbr + 1)))

服務器端生成隨機數 zipcode、temperature、relhumidity 分別表明城市代碼、溫度值和溼度值,而後不斷地廣播信息。而客戶端經過設置過濾參數,接受特定城市代碼的信息,最終將收集到的溫度求平均值。

須要注意的是:

  1. 與 「Hello World」 例子不一樣的是,Socket 的類型變成 ZMQ.PUB 和 ZMQ.SUB 。
  2. 客戶端須要設置一個過濾條件,接收本身感興趣的消息。
  3. 發佈者一直不斷地發佈新消息,若是中途有訂閱者退出,其餘均不受影響。當訂閱者再鏈接上來的時候,收到的就是後來發送的消息了。這樣比較晚加入的或者是中途離開的訂閱者必然會丟失掉一部分信息。這是該模式的一個問題,即所謂的 "Slow joiner" 。

Parallel Pipeline

Parallel Pipeline 處理模式以下:

  • ventilator 分發任務到各個 worker
  • 每一個 worker 執行分配到的任務
  • 最後由 sink 收集從 worker 發來的結果

Parallel Pipeline

  • taskvent.java
import org.zeromq.ZMQ;
import java.io.IOException;
import java.util.Random;
import java.util.StringTokenizer;
public class taskvent {
    public static void main(String[] args) throws IOException {
        ZMQ.Context context = new ZMQ.context(1);
        ZMQ.Socket sender = context.socket(ZMQ.PUSH);
        sender.bind("tcp://*:5557");
        ZMQ.Socket sink = context.socket(ZMQ.PUSH);
        sink.connect("tcp://localhost:5558");
        System.out.println("Please enter when the workers are ready: ");
        System.in.read();
        System.out.println("Sending task to workes\n");
        sink.send("0",0);
        Random srandom = new Random(System.currentTimeMillis());
        int task_nbr;
        int total_msec = 0;
        for (task_nbr = 0; task_nbr < 100; task_nbr++) {
            int workload;
            workload = srandom.nextInt(100) + 1;
            total_msec += workload;
            System.out.print(workload + ".");
            String string = String.format("%d", workload);
            sender.send(string, 0);
        }
        System.out.println("Total expected cost: " + total_msec + " msec");
        sink.close();
        sender.close();
        context.term();
    }
}
  • taskvent.py
import zmq
import time
import random
try:
    raw_input
except NameError:
    raw_input = input
context = zmq.Context()
sender = context.socket(zmq.PUSH)
sender.bind("tcp://*:5557")
sink = context.socket(zmq.PUSH)
sink.connect("tcp://localhost:5558")
print("Please enter when workers are ready: ")
_ = raw_input()
print("Sending tasks to workers...")
sink.send(b'0')
random.seed()
total_msec = 0
for task_nbr in range(100):
    workload = random.randint(1, 100)
    total_msec += workload
    sender.send_string(u'%i' % workload)
print("Total expected cost: %s msec" % total_msec)
time.sleep(1)
  • taskwork.java
import org.zeromq.ZMQ;
public class taskwork {
    public static void main(String[] args) {
        ZMQ.Context context = ZMQ.context(1);
        ZMQ.Socket receiver = context.socket(ZMQ.PULL);
        receiver.connect("tcp://localhost:5557");
        ZMQ.Socket sender = context.socket(ZMQ.PUSH);
        sender.connect("tcp://localhost:5558");
        while (!Thread.currentThread().isInterrupted()) {
            String string = receiver.recv(0).trim();
            Long mesc = Long.parseLong(string);
            System.out.flush();
            System.out.print(string + ".");
            Sleep(mesc);
            sender.send("".getBytes(), 0);
        }
        sender.close();
        receiver.close();
        context.term();
    }
}
  • taskwork.py
import zmq
import time
import sys
context = zmq.Context()
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")
while True:
    s = receiver.recv()
    sys.stdout.write('.')
    sys.stdout.flush()
    time.sleep(int(s) * 0.001)
    sender.send(b'')
  • tasksink.java
import org.zeromq.ZMQ;
public class tasksink {
    public static void main(String[] args) {
        ZMQ.Context context = ZMQ.context(1);
        ZMQ.Socket receiver = context.socket(ZMQ.PULL);
        receiver.bind("tcp://*:5558");
        String string = new String(receiver.recv(0));
        long tstart = System.currentTimeMillis();
        int task_nbr;
        int total_mesc = 0;
        for (task_nbr = 0; task_nbr < 100; task_nbr++) {
            string = new String(receiver.recv(0).trim());
            if ((task_nbr / 10) * 10 == task_nbr) {
                System.out.print(":");
            } else {
                System.out.print(".");
            }
        }
        long tend = System.currentTimeMillis();
        System.out.println("\nTotal elapsed time: " + (tend - tstart) + "msec");
        receiver.close();
        context.term();
    }
}
  • tasksink.py
import time
import zmq
import sys
context = zmq.Context()
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")
s = receiver.recv()
tstart = time.time()
for task_nbr in range(1, 100):
    s = receiver.recv()
    if task_nbr % 10 == 0:
        sys.stdout.write(':')
    else:
        sys.stdout.write('.')
    sys.stdout.flush()
tend = time.time()
print("Total elapsed time: %d msec" % ((tend - tstart) * 1000))

如下兩點須要注意:

  1. ventilator 使用 ZMQ.PUSH 來分發任務;worker 用 ZMQ.PULL 來接收任務,用 ZMQ.PUSH 來發送結果;sink 用 ZMQ.PULL 來接收 worker 發來的結果。
  2. ventilator 既是服務端,也是客戶端(此時服務端是 sink);worker 在兩個過程當中都是客戶端;sink 也一直都是服務端。

參考資料

linbingdong.com
歡迎進入博客 :linbingdong.com 獲取最新文章哦~

FullStackPlan 歡迎關注公衆號: FullStackPlan 獲取更多幹貨哦~

相關文章
相關標籤/搜索