在講ZeroMQ前先給你們講一下什麼是消息隊列。java
消息隊列中間件是分佈式系統中重要的組件,主要解決應用耦合,異步消息,流量削鋒等問題。實現高性能,高可用,可伸縮和最終一致性架構。是大型分佈式系統不可缺乏的中間件。目前在生產環境,使用較多的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。其實簡單點說,消息隊列就是如何使各分載器如何實現負載均衡使得完成分佈式目標。服務器
ZeroMQ是一種基於消息隊列的多線程網絡庫,其對套接字類型、鏈接處理、幀、甚至路由的底層細節進行抽象,提供跨越多種傳輸協議的套接字。ZeroMQ是網絡通訊中新的一層,介於應用層和傳輸層之間(按照TCP/IP劃分),其是一個可伸縮層,可並行運行,分散在分佈式系統間。ZeroMQ幾乎全部的I/O操做都是異步的,主線程不會被阻塞。ZeroMQ會根據用戶調用zmq_init函數時傳入的接口參數,建立對應數量的I/O Thread。每一個I/O Thread都有與之綁定的Poller,Poller採用經典的Reactor模式實現,Poller根據不一樣操做系統平臺使用不一樣的網絡I/O模型(select、poll、epoll、devpoll、kequeue等)。主線程與I/O線程經過Mail Box傳遞消息來進行通訊。Server開始監聽或者Client發起鏈接時,在主線程中建立zmq_connecter或zmq_listener,經過Mail Box發消息的形式將其綁定到I/O線程,I/O線程會把zmq_connecter或zmq_listener添加到Poller中用以偵聽讀/寫事件。Server與Client在第一次通訊時,會建立zmq_init來發送identity,用以進行認證。認證結束後,雙方會爲這次鏈接建立Session,之後雙方就經過Session進行通訊。每一個Session都會關聯到相應的讀/寫管道, 主線程收發消息只是分別從管道中讀/寫數據。Session並不實際跟kernel交換I/O數據,而是經過plugin到Session中的Engine來與kernel交換I/O數據。網絡
【1】Request-Response多線程
由請求端發起請求,而後等待迴應端應答。一個請求必須對應一個迴應,從請求端的角度來看是發-收配對,從迴應端的角度是收-發對。跟一對一結對模型的區別在於請求端能夠是1~N個。該模型主要用於遠程調用及任務分配等。Echo服務就是這種經典模型的應用。架構
下面經過Java實現這一模型:負載均衡
server port異步
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;socket
public class Server {tcp
public static void main(String[] args) throws InterruptedException { //實現服務器端的上下文及套接字 Context context = ZMQ.context(1); Socket responder = context.socket(ZMQ.REP); //使服務器端經過tcp協議通訊,監聽5555端口 responder.bind("tcp://*:5555"); while (!Thread.currentThread().isInterrupted()) { byte[] request = responder.recv(0); System.out.println("Received Hello"); Thread.sleep(1000); String reply = "World"; responder.send(reply.getBytes(), 0); } //關閉服務器端的上下文及套接字 responder.close(); context.close(); }
}分佈式
client port
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
public class Client {
public static void main(String[] args) { //創立客戶端的上下文捷套接字 Context context = ZMQ.context(1); System.out.println("Connecting to hello world server…"); Socket requester = context.socket(ZMQ.REQ); //講客戶端綁定在5555端口 requester.connect("tcp://localhost:5555"); for (int requestNbr = 0; requestNbr != 100; requestNbr++) { String request = "Hello"; System.out.println("Sending Hello " + requestNbr); requester.send(request.getBytes(), 0); byte[] reply = requester.recv(0); System.out.println("Received " + new String(reply) + " " + requestNbr); } //關閉客戶端的上下文套接字 requester.close(); context.term(); }
}
【2】Publisher/Subscriber model
發佈端單向分發數據,且不關心是否把所有信息發送給訂閱端。若是發佈端開始發佈信息時,訂閱端還沒有鏈接上來,則這些信息會被直接丟棄。訂閱端未鏈接致使信息丟失的問題,能夠經過與請求迴應模型組合來解決。訂閱端只負責接收,而不能反饋,且在訂閱端消費速度慢於發佈端的狀況下,會在訂閱端堆積數據。該模型主要用於數據分發。天氣預報、微博明星粉絲能夠應用這種經典模型。
Server Port
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
public class ZMQ_PUB {
public static void main(String[] args) throws InterruptedException { Context context = ZMQ.context(1); Socket publisher = context.socket(ZMQ.PUB); publisher.bind("tcp://*:5555"); Thread.sleep(3000); for(int i=0;i<100;i++){ publisher.send(("admin " + i).getBytes(), ZMQ.NOBLOCK); System.out.println("pub msg " + i); Thread.sleep(1000); } context.close(); publisher.close(); }
}
Client Port
import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; public class ZMQ_SUB { public static void main(String[] args) { Context context = ZMQ.context(1); Socket subscriber = context.socket(ZMQ.SUB); subscriber.connect("tcp://localhost:5555"); subscriber.subscribe("".getBytes()); for (int i=0;i<100;i++) { //Receive a message. String string = new String(subscriber.recv(0)); System.out.println("recv 1" + string); } //關閉套接字和上下文 subscriber.close(); context.term(); } }
【3】push/pull
push port import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; public class Push { public static void main(String[] args) { Context context = ZMQ.context(1); Socket push = context.socket(ZMQ.PUSH); push.bind("ipc://fjs"); for (int i = 0; i < 10000000; i++) { push.send("hello".getBytes(), i); } push.close(); context.term(); } }
pull port
import java.util.concurrent.atomic.AtomicInteger;
import org.zeromq.ZMQ;
public class Pull {
public static void main(String args[]) { final AtomicInteger number = new AtomicInteger(0); for (int i = 0; i < 5; i++) { new Thread(new Runnable(){ private int here = 0; public void run() { // TODO Auto-generated method stub ZMQ.Context context = ZMQ.context(1); ZMQ.Socket pull = context.socket(ZMQ.PULL); pull.connect("ipc://fjs"); //pull.connect("ipc://fjs"); while (true) { String message = new String(pull.recv()); int now = number.incrementAndGet(); here++; if (now % 1000000 == 0) { System.out.println(now + " here is : " + here); } } } }).start(); } }
}
備註說明:
【1】如何利用Java使用ZeroMQ
首先下載zmq所需的zip包,解壓之後將libzmq.dll和jzmq.dll文件放到本身電腦中的jdk安裝路徑中的bin文件夾下,最後須要將以前解壓後的zmq.jar包放在項目的lib中或者
zeromq資源下載:
連接:http://pan.baidu.com/s/1miuvSfQ 密碼:ttss
項目源碼下載連接:
連接:http://pan.baidu.com/s/1dE5Plr7 密碼:vqze