1、環境搭建 html
wget http://download.zeromq.org/zeromq-2.1.7.tar.gz tar -xzf zeromq-2.1.7.tar.gz cd zeromq-2.1.7 ./configure make sudo make install git clone https://github.com/nathanmarz/jzmq.git cd jzmq ./autogen.sh ./configure make sudo make install 若是沒有安裝libtool、libuuid-devel則須要先安裝,不然安裝失敗 yum install libtool yum install libuuid-devel
常見問題: java
出現java.lang.UnsatisfiedLinkError: /usr/local/lib/libjzmq.so.0.0.0: libzmq.so.1: cannot open shared object file: No such file or directory異常2、使用jzmq進行編程 git
1.建立maven項目,pom.xml的內容參見pom.xml github
注意:jzmq的版本不能過高,建議使用2.1.0,目前storm也是使用這個版本的jzmq-2.1.0.jar apache
不然報: java.lang.UnsatisfiedLinkError: org.zeromq.ZMQ$Socket.nativeInit()V 編程
2.編寫Publisher.java,Subscriber.java,參見源代碼 eclipse
Publisher.java jvm
package com.catt.mqtest.pubsub; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; public class Publisher { // 等待10個訂閱者 private static final int SUBSCRIBERS_EXPECTED = 10; // 定義一個全局的記錄器,經過LoggerFactory獲取 private final static Logger log = LoggerFactory.getLogger(Publisher.class); public static void main(String[] args) throws InterruptedException{ Context context = ZMQ.context(1); Socket publisher = context.socket(ZMQ.PUB); publisher.bind("tcp://*:5557"); try { // zmq發送速度太快,在訂閱者還沒有與發佈者創建聯繫時,已經開始了數據發佈 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } publisher.send("send start......".getBytes(), 0); for (int i = 0; i < 10; i++) { publisher.send(("Hello world "+i).getBytes(), ZMQ.NOBLOCK); } publisher.send("send end......".getBytes(), 0); publisher.close(); context.term(); } }
Subscriber.java socket
package com.catt.mqtest.pubsub; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; public class Subscriber { // 定義一個全局的記錄器,經過LoggerFactory獲取 private final static Logger log = LoggerFactory.getLogger(Subscriber.class); public static void main(String[] args) { Context context = ZMQ.context(1); Socket subscriber = context.socket(ZMQ.SUB); subscriber.connect("tcp://192.168.230.128:5557"); subscriber.subscribe("".getBytes()); int total = 0; while (true) { byte[] stringValue = subscriber.recv(0); String string = new String(stringValue); if (string.equals("send end......")) { break; } total++; System.out.println("Received " + total + " updates. :" + string); } subscriber.close(); context.term(); } }
pom.xml maven
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.catt</groupId> <artifactId>mqtest</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>mqtest</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.zeromq</groupId> <artifactId>jzmq</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> <scope>test</scope> </dependency> </dependencies> </project>