java調用zeromq PUB-SUB模式

直接上代碼socket

package com.guo.server;

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

public class Publisher {

   public void start(){
       System.out.println("===========publisher start=============");
       Context context = ZMQ.context(1);
       Socket socket = context.socket(ZMQ.PUB);
       socket.setLinger(5000);
       socket.setSndHWM(0);
       socket.bind("tcp://192.168.124.130:6666");
       try {
           Thread.sleep(10000);
       } catch (InterruptedException e) {
       }
       for(int i=0;i<10;i++){
           String pubstr ="WORK task"+i;
           socket.send(pubstr,0);
           try {
               Thread.sleep(1000);
           } catch (InterruptedException e) {
           }
       }
       socket.send("END", 0);
       System.out.println("===========publisher end=============");
       socket.close();
       context.term();
   }

}

tcp

package com.guo.client;

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

public class Subscriber {

   public void start(){
       System.out.println("===========subscriber start=============");
       Context context = ZMQ.context(1);
       Socket socket = context.socket(ZMQ.SUB);
       socket.connect("tcp://192.168.124.130:6666");
       socket.subscribe("".getBytes());
       while(true){
           byte[] res =socket.recv(0);
           String resStr = new String(res);
           System.out.println("substring is ="+resStr);
           if("END".equalsIgnoreCase(resStr)){
               break;
           }
       }
       System.out.println("===========subscriber end=============");
       socket.close();
       context.term();
   }

}ide


說明:server

     在啓動Subscriber程序後,啓動publisher程序。重要的是這個模式下publisher的消息能夠被同時多個subscriber程序接收並處理。實現廣播的方式。
get

相關文章
相關標籤/搜索