直接上代碼socket
package com.guo.server;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
public class Publisher {
public void start(){
System.out.println("===========publisher start=============");
Context context = ZMQ.context(1);
Socket socket = context.socket(ZMQ.PUB);
socket.setLinger(5000);
socket.setSndHWM(0);
socket.bind("tcp://192.168.124.130:6666");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
for(int i=0;i<10;i++){
String pubstr ="WORK task"+i;
socket.send(pubstr,0);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
socket.send("END", 0);
System.out.println("===========publisher end=============");
socket.close();
context.term();
}
}
tcp
package com.guo.client;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
public class Subscriber {
public void start(){
System.out.println("===========subscriber start=============");
Context context = ZMQ.context(1);
Socket socket = context.socket(ZMQ.SUB);
socket.connect("tcp://192.168.124.130:6666");
socket.subscribe("".getBytes());
while(true){
byte[] res =socket.recv(0);
String resStr = new String(res);
System.out.println("substring is ="+resStr);
if("END".equalsIgnoreCase(resStr)){
break;
}
}
System.out.println("===========subscriber end=============");
socket.close();
context.term();
}
}ide
說明:server
在啓動Subscriber程序後,啓動publisher程序。重要的是這個模式下publisher的消息能夠被同時多個subscriber程序接收並處理。實現廣播的方式。
get