ZeroMQ(也稱爲 ØMQ,0MQ 或 zmq)是一個可嵌入的網絡通信庫(對 Socket 進行了封裝)。 它提供了攜帶跨越多種傳輸協議(如:進程內,進程間,TCP 和多播)的原子消息的 sockets 。 有了ZeroMQ,咱們能夠經過發佈-訂閱、任務分發、和請求-回覆等模式來創建 N-N 的 socket 鏈接。 ZeroMQ 的異步 I / O 模型爲咱們提供可擴展的基於異步消息處理任務的多核應用程序。 它有一系列語言API(幾乎囊括全部編程語言),並可以在大多數操做系統上運行。java
傳統的 TCP Socket 的鏈接是1對1的,能夠認爲「1個 socket = 1個鏈接」,每個線程獨立維護一個 socket 。可是 ZMQ 摒棄了這種1對1的模式,ZMQ的 Socket 能夠很輕鬆地實現1對N和N對N的鏈接模式,一個 ZMQ 的 socket 能夠自動地維護一組鏈接,用戶沒法操做這些鏈接,用戶只能操做套接字而不是鏈接自己。因此說在 ZMQ 的世界裏,鏈接是私有的。python
ZMQ 提供了三種基本的通訊模型,分別是 Request-Reply 、Publish-Subscribe 和 Parallel Pipeline ,接下來舉例說明三種模型並給出相應的代碼實現。編程
以 「Hello World」 爲例。客戶端發起請求,並等待服務端迴應請求。客戶端發送一個簡單的 「Hello」,服務端則迴應一個 「World」。能夠有 N 個客戶端,一個服務端,所以是 1-N 鏈接。json
服務端代碼以下:服務器
import org.zeromq.ZMQ; public class hwserver { public static void main(String[] args) throws InterruptedException { ZMQ.Context context = ZMQ.context(1); ZMQ.Socket responder = context.socket(ZMQ.REP); responder.bind("tcp://*:5555"); while (!Thread.currentThread().isInterrupted()) { byte[] request = responder.recv(0); System.out.println("Received" + new String(request)); Thread.sleep(1000); String reply = "World"; responder.send(reply.getBytes(),0); } responder.close(); context.term(); } }
import time import zmq context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") while True: message = socket.recv() print("Received request: %s" % message) # Do some 'work' time.sleep(1) socket.send(b"World")
客戶端代碼以下:網絡
import org.zeromq.ZMQ; public class hwclient { public static void main(String[] args) { ZMQ.Context context = ZMQ.context(1); ZMQ.Socket requester = context.socket(ZMQ.REQ); requester.connect("tcp://localhost:5555"); for (int requestNbr = 0; requestNbr != 10; requestNbr++) { String request = "Hello"; System.out.println("Sending Hello" + requestNbr); requester.send(request.getBytes(),0); byte[] reply = requester.recv(0); System.out.println("Reveived " + new String(reply) + " " + requestNbr); } requester.close(); context.term(); } }
import zmq context = zmq.Context() print("Connecting to hello world server...") socket = context.socket(zmq.REQ) socket.connect("tcp://localhost:5555") for request in range(10): print("Sending request %s ..." % request) socket.send(b"Hello") message = socket.recv() print("Received reply %s [ %s ]" % (request,message))
從以上的過程,咱們能夠了解到使用 ZMQ 寫基本的程序的方法,須要注意的是:dom
下面以一個天氣預報的例子來介紹該模式。異步
服務端不斷地更新各個城市的天氣,客戶端能夠訂閱本身感興趣(經過一個過濾器)的城市的天氣信息。socket
服務端代碼以下:tcp
import org.zeromq.ZMQ; import java.util.Random; public class wuserver { public static void main(String[] args) { ZMQ.Context context = ZMQ.context(1); ZMQ.Socket publisher = context.socket(ZMQ.PUB); publisher.bind("tcp://*:5556"); publisher.bind("icp://weather"); Random srandom = new Random(System.currentTimeMillis()); while (!Thread.currentThread().isInterrupted()) { int zipcode, temperature, relhumidity; zipcode = 10000 + srandom.nextInt(10000); temperature = srandom.nextInt(215) - 80 + 1; relhumidity = srandom.nextInt(50) + 10 + 1; String update = String.format("%05d %d %d", zipcode, temperature, relhumidity); } publisher.close(); context.term(); } }
from random import randrange import zmq context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://*:5556") while True: zipcode = randrange(1, 100000) temperature = randrange(-80, 135) relhumidity = randrange(10, 60) socket.send_string("%i %i %i" % (zipcode, temperature, relhumidity))
客戶端代碼以下:
import org.zeromq.ZMQ; import java.util.StringTokenizer; public class wuclient { public static void main(String[] args) { ZMQ.Context context = ZMQ.context(1); ZMQ.Socket suscriber = context.socket(ZMQ.SUB); suscriber.connect("tcp://localhost:5556"); String filter = (args.length > 0) ? args[0] : "10001"; suscriber.suscribe(filter.getBytes()); //過濾條件 int update_nbr; long total_temp = 0; for (update_nbr = 0; update_nbr < 100; update_nbr++) { String string = suscriber.recvStr(0).trim(); StringTokenizer sscanf = new StringTokenizer(string, " "); int zipcode = Integer.valueOf(sscanf.nextToken()); int temperature = Integer.valueOf(sscanf.nextToken()); int relhumidity = Integer.valueOf(sscanf.nextToken()); total_temp += temperature; } System.out.println("Average temperature for zipcode '" + filter + "' was " + (int) (total_temp / update_nbr)); suscriber.close(); context.term(); } }
import sys import zmq context = zmq.Context() socket = context.socket(zmq.SUB) print("Collecting updates from weather server...") socket.connect("tcp://localhost:5556") zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10001" if isinstance(zip_filter, bytes): zip_filter = zip_filter.decode('ascii') socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter) total_temp = 0 for update_nbr in range(5): string = socket.recv_string() zipcode, temperature, relhumidity = string.split() total_temp += int(temperature) print("Average temperature for zipcode '%s' was %dF" % (zip_filter, total_temp / (update_nbr + 1)))
服務器端生成隨機數 zipcode、temperature、relhumidity 分別表明城市代碼、溫度值和溼度值,而後不斷地廣播信息。而客戶端經過設置過濾參數,接受特定城市代碼的信息,最終將收集到的溫度求平均值。
須要注意的是:
Parallel Pipeline 處理模式以下:
import org.zeromq.ZMQ; import java.io.IOException; import java.util.Random; import java.util.StringTokenizer; public class taskvent { public static void main(String[] args) throws IOException { ZMQ.Context context = new ZMQ.context(1); ZMQ.Socket sender = context.socket(ZMQ.PUSH); sender.bind("tcp://*:5557"); ZMQ.Socket sink = context.socket(ZMQ.PUSH); sink.connect("tcp://localhost:5558"); System.out.println("Please enter when the workers are ready: "); System.in.read(); System.out.println("Sending task to workes\n"); sink.send("0",0); Random srandom = new Random(System.currentTimeMillis()); int task_nbr; int total_msec = 0; for (task_nbr = 0; task_nbr < 100; task_nbr++) { int workload; workload = srandom.nextInt(100) + 1; total_msec += workload; System.out.print(workload + "."); String string = String.format("%d", workload); sender.send(string, 0); } System.out.println("Total expected cost: " + total_msec + " msec"); sink.close(); sender.close(); context.term(); } }
import zmq import time import random try: raw_input except NameError: raw_input = input context = zmq.Context() sender = context.socket(zmq.PUSH) sender.bind("tcp://*:5557") sink = context.socket(zmq.PUSH) sink.connect("tcp://localhost:5558") print("Please enter when workers are ready: ") _ = raw_input() print("Sending tasks to workers...") sink.send(b'0') random.seed() total_msec = 0 for task_nbr in range(100): workload = random.randint(1, 100) total_msec += workload sender.send_string(u'%i' % workload) print("Total expected cost: %s msec" % total_msec) time.sleep(1)
import org.zeromq.ZMQ; public class taskwork { public static void main(String[] args) { ZMQ.Context context = ZMQ.context(1); ZMQ.Socket receiver = context.socket(ZMQ.PULL); receiver.connect("tcp://localhost:5557"); ZMQ.Socket sender = context.socket(ZMQ.PUSH); sender.connect("tcp://localhost:5558"); while (!Thread.currentThread().isInterrupted()) { String string = receiver.recv(0).trim(); Long mesc = Long.parseLong(string); System.out.flush(); System.out.print(string + "."); Sleep(mesc); sender.send("".getBytes(), 0); } sender.close(); receiver.close(); context.term(); } }
import zmq import time import sys context = zmq.Context() receiver = context.socket(zmq.PULL) receiver.connect("tcp://localhost:5557") sender = context.socket(zmq.PUSH) sender.connect("tcp://localhost:5558") while True: s = receiver.recv() sys.stdout.write('.') sys.stdout.flush() time.sleep(int(s) * 0.001) sender.send(b'')
import org.zeromq.ZMQ; public class tasksink { public static void main(String[] args) { ZMQ.Context context = ZMQ.context(1); ZMQ.Socket receiver = context.socket(ZMQ.PULL); receiver.bind("tcp://*:5558"); String string = new String(receiver.recv(0)); long tstart = System.currentTimeMillis(); int task_nbr; int total_mesc = 0; for (task_nbr = 0; task_nbr < 100; task_nbr++) { string = new String(receiver.recv(0).trim()); if ((task_nbr / 10) * 10 == task_nbr) { System.out.print(":"); } else { System.out.print("."); } } long tend = System.currentTimeMillis(); System.out.println("\nTotal elapsed time: " + (tend - tstart) + "msec"); receiver.close(); context.term(); } }
import time import zmq import sys context = zmq.Context() receiver = context.socket(zmq.PULL) receiver.bind("tcp://*:5558") s = receiver.recv() tstart = time.time() for task_nbr in range(1, 100): s = receiver.recv() if task_nbr % 10 == 0: sys.stdout.write(':') else: sys.stdout.write('.') sys.stdout.flush() tend = time.time() print("Total elapsed time: %d msec" % ((tend - tstart) * 1000))
如下兩點須要注意:
歡迎進入博客 :linbingdong.com 獲取最新文章哦~
歡迎關注公衆號: FullStackPlan 獲取更多幹貨哦~