activemq-all-5.15.1.jar
commons-lang3-3.0.jar
commons-logging-1.1.3.jar
fastjson-1.1.31.jar
jackson-core-asl-1.9.13.jar
jackson-mapper-asl-1.9.13.jar
jboss-logging-3.1.3.GA.jar
jconsole.jar
log4j-1.2.16.jar
slf4j-api-1.6.2.jar
slf4j-log4j12-1.6.2.jarjava
GlobalVar.javaapache
package com.system1.mq.p2p; public interface GlobalVar { public static final class Common { public static int loggerDebug = 1; } }
JMSTest.javajson
package com.system1.mq.p2p; import org.junit.Test; public class JMSTest { @Test public void testMyMQ() { MQ.getInstance().sendMsg("TOP1-TOP2:你好,我不是一條單純的消息"); } public static void main(String[] args) { MQ.getInstance().new ConsumerListener().start(); } }
Logger.javaapi
package com.system1.mq.p2p; import org.slf4j.LoggerFactory; public class Logger { private static final org.slf4j.Logger logger = LoggerFactory.getLogger(Logger.class); public static void trace(String format, Object... args) { if (GlobalVar.Common.loggerDebug == 1) { logger.trace(format, args); } } public static void trace(String msg, Throwable e) { if (GlobalVar.Common.loggerDebug == 1) { logger.trace(msg, e); } } public static void debug(String format, Object... args) { if (GlobalVar.Common.loggerDebug == 1) { logger.debug(format, args); } } public static void debug(String msg, Throwable e) { if (GlobalVar.Common.loggerDebug == 1) { logger.debug(msg, e); } } public static void info(String format, Object... args) { if (GlobalVar.Common.loggerDebug == 1) { logger.info(format, args); } } public static void info(String msg, Throwable e) { if (GlobalVar.Common.loggerDebug == 1) { logger.info(msg, e); } } public static void warn(String format, Object... args) { if (GlobalVar.Common.loggerDebug == 1) { logger.warn(format, args); } } public static void warn(String msg, Throwable e) { if (GlobalVar.Common.loggerDebug == 1) { logger.warn(msg, e); } } public static void error(String format, Object... args) { if (GlobalVar.Common.loggerDebug == 1) { logger.error(format, args); } } public static void error(String msg, Throwable e) { if (GlobalVar.Common.loggerDebug == 1) { logger.error(msg, e); } } public static void exception(String msg, Throwable e) { if (GlobalVar.Common.loggerDebug == 1) { logger.error(msg, e); } } public static void exception(Throwable e) { logger.error(e.getMessage(), e); } }
MQ.java服務器
package com.system1.mq.p2p; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang3.StringUtils; public class MQ { /** * 單例模式 */ private static MQ instance; private MQ() { } public static MQ getInstance() { if (instance == null) { instance = new MQ(); } return instance; } // 默認鏈接用戶名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默認鏈接密碼 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默認鏈接地址 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 鏈接工廠 static ConnectionFactory connectionFactory; // 鏈接 static Connection connection = null; // 會話 接受或者發送消息的線程 static Session session; // 消息的目的地 static Destination dest1, dest2; // topic static Topic top1, top2; // 消費者 static MessageConsumer consumer; // 生產者 static MessageProducer producer; // 隊列名稱 public interface QUENENAME { // quene目的地 public static final String DESTINATION1 = "S1-S2"; public static final String DESTINATION2 = "S2-S1"; // topic地址 public static final String TOP1 = "TOP1-TOP2"; public static final String TOP2 = "TOP2-TOP1"; } // 建立鏈接 static { // 實例化鏈接工廠 connectionFactory = new ActiveMQConnectionFactory(MQ.USERNAME, MQ.PASSWORD, MQ.BROKEURL); try { // 經過鏈接工廠獲取鏈接 connection = connectionFactory.createConnection(); // 啓動鏈接 connection.start(); // 建立session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 自動通知MQ服務器消息已收到 // 第一個參數表示是否開啓事務,第二個參數表示由客戶端自行決定通知MQ服務器消息已經收到,須要調用message.acknowledge()來通知服務器. // session = connection.createSession(true, // Session.CLIENT_ACKNOWLEDGE); // 生產者 // dest1 = session.createQueue(MQ.QUENENAME.DESTINATION1); // producer = session.createProducer(dest1); top1 = session.createTopic(MQ.QUENENAME.TOP1); producer = session.createProducer(top1); // 消費者 // dest2 = session.createQueue(MQ.QUENENAME.DESTINATION2); // consumer = session.createConsumer(dest2); top2 = session.createTopic(MQ.QUENENAME.TOP2); consumer = session.createConsumer(top2); } catch (JMSException e) { System.out.println("建立鏈接異常"); } } /** * 生產者 */ public void sendMsg(String message) { try { TextMessage txtMsg = session.createTextMessage(message); producer.send(txtMsg); } catch (JMSException e) { System.err.println("發送消息異常"); } } /** * 消費者 */ public class ConsumerListener extends Thread { @Override public void run() { super.run(); try { while (true) { TextMessage textMessage = (TextMessage) consumer.receive(); String msg = textMessage.getText(); if (StringUtils.isNotBlank(msg)) { try { new ProtocolParse(msg).handler(); } catch (Exception e) { System.err.println("協議處理異常!"); } } else { System.err.println("消費者接收消息爲空!"); } } } catch (JMSException e) { System.err.println("消費者接收消息異常"); } } } }
ProtocolConstant.javasession
package com.system1.mq.p2p; public class ProtocolConstant { // 命令字 public interface CMD { public static final String HEARTBEAT = "heartbeat"; } }
ProtocolParse.javaapp
package com.system1.mq.p2p; import com.alibaba.fastjson.JSONObject; import com.system1.mq.handler.ExceptionHandler; import com.system1.mq.handler.HeartbeatHandler; public class ProtocolParse { private String protocol; public ProtocolParse(String protocol) { super(); this.protocol = protocol; } public void handler() throws Exception { JSONObject jsonObj = JSONObject.parseObject(protocol); if (jsonObj == null) { throw new Exception("不符合json格式的數據!"); } // 命令字 String cmd = jsonObj.getString("cmd"); switch (cmd) { case ProtocolConstant.CMD.HEARTBEAT: new HeartbeatHandler(protocol).start(); break; default: new ExceptionHandler(protocol).start(); break; } } public String getProtocol() { return protocol; } public void setProtocol(String protocol) { this.protocol = protocol; } }
BaseMessageHandler.javaide
package com.system1.mq.handler; public abstract class BaseMessageHandler extends Thread { private String message; public BaseMessageHandler(String message) { super(); this.message = message; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } }
ExceptionHandler.java測試
package com.system1.mq.handler; import com.system1.mq.p2p.Logger; public class ExceptionHandler extends BaseMessageHandler { public ExceptionHandler(String message) { super(message); } @Override public void run() { super.run(); Logger.info("===異常的協議數據===" + super.getMessage()); } }
HeartbeatHandler.javathis
package com.system1.mq.handler; import com.system1.mq.p2p.Logger; import com.system1.mq.p2p.MQ; public class HeartbeatHandler extends BaseMessageHandler { public HeartbeatHandler(String message) { super(message); } @Override public void run() { super.run(); Logger.info("===HEARTBEAT==="); MQ.getInstance().sendMsg("收到心跳"); } }
GlobalVar.java
package com.system1.mq.p2p; public interface GlobalVar { public static final class Common { public static int loggerDebug = 1; } }
JMSTest.java
package com.system1.mq.p2p; import org.junit.Test; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; public class JMSTest { @Test public void testMyMQ() { JSONObject jsonObj = new JSONObject(); jsonObj.put("cmd", "heartbeat"); MQ.getInstance().sendMsg(JSON.toJSONString(jsonObj)); } public static void main(String[] args) { MQ.getInstance().new ConsumerListener().start(); } }
Logger.java
package com.system1.mq.p2p; import org.slf4j.LoggerFactory; public class Logger { private static final org.slf4j.Logger logger = LoggerFactory.getLogger(Logger.class); public static void trace(String format, Object... args) { if (GlobalVar.Common.loggerDebug == 1) { logger.trace(format, args); } } public static void trace(String msg, Throwable e) { if (GlobalVar.Common.loggerDebug == 1) { logger.trace(msg, e); } } public static void debug(String format, Object... args) { if (GlobalVar.Common.loggerDebug == 1) { logger.debug(format, args); } } public static void debug(String msg, Throwable e) { if (GlobalVar.Common.loggerDebug == 1) { logger.debug(msg, e); } } public static void info(String format, Object... args) { if (GlobalVar.Common.loggerDebug == 1) { logger.info(format, args); } } public static void info(String msg, Throwable e) { if (GlobalVar.Common.loggerDebug == 1) { logger.info(msg, e); } } public static void warn(String format, Object... args) { if (GlobalVar.Common.loggerDebug == 1) { logger.warn(format, args); } } public static void warn(String msg, Throwable e) { if (GlobalVar.Common.loggerDebug == 1) { logger.warn(msg, e); } } public static void error(String format, Object... args) { if (GlobalVar.Common.loggerDebug == 1) { logger.error(format, args); } } public static void error(String msg, Throwable e) { if (GlobalVar.Common.loggerDebug == 1) { logger.error(msg, e); } } public static void exception(String msg, Throwable e) { if (GlobalVar.Common.loggerDebug == 1) { logger.error(msg, e); } } public static void exception(Throwable e) { logger.error(e.getMessage(), e); } }
MQ.java
package com.system1.mq.p2p; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang3.StringUtils; public class MQ { /** * 單例模式 */ private static MQ instance; private MQ() { } public static MQ getInstance() { if (instance == null) { instance = new MQ(); } return instance; } // 默認鏈接用戶名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默認鏈接密碼 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默認鏈接地址 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 鏈接工廠 static ConnectionFactory connectionFactory; // 鏈接 static Connection connection = null; // 會話 接受或者發送消息的線程 static Session session; // 消息的目的地 static Destination dest1, dest2; // topic static Topic top1, top2; // 消費者 static MessageConsumer consumer; // 生產者 static MessageProducer producer; // 隊列名稱 public interface QUENENAME { public static final String DESTINATION1 = "S1-S2"; public static final String DESTINATION2 = "S2-S1"; // topic地址 public static final String TOP1 = "TOP1-TOP2"; public static final String TOP2 = "TOP2-TOP1"; } // 建立鏈接 static { // 實例化鏈接工廠 connectionFactory = new ActiveMQConnectionFactory(MQ.USERNAME, MQ.PASSWORD, MQ.BROKEURL); try { // 經過鏈接工廠獲取鏈接 connection = connectionFactory.createConnection(); // 啓動鏈接 connection.start(); // 建立session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 生產者 // dest1 = session.createQueue(MQ.QUENENAME.DESTINATION2); // producer = session.createProducer(dest1); top1 = session.createTopic(MQ.QUENENAME.TOP2); producer = session.createProducer(top1); // 消費者 // dest2 = session.createQueue(MQ.QUENENAME.DESTINATION1); // consumer = session.createConsumer(dest2); top2 = session.createTopic(MQ.QUENENAME.TOP1); consumer = session.createConsumer(top2); } catch (JMSException e) { System.out.println("建立鏈接異常"); } } /** * 生產者 */ public void sendMsg(String message) { try { TextMessage txtMsg = session.createTextMessage(message); producer.send(txtMsg); } catch (JMSException e) { System.err.println("發送消息異常"); } } /** * 消費者 */ public class ConsumerListener extends Thread { @Override public void run() { super.run(); try { while (true) { TextMessage textMessage = (TextMessage) consumer.receive(); String msg = textMessage.getText(); if (StringUtils.isNotBlank(msg)) { System.out.println(msg); } else { System.err.println("消費者接收消息爲空!"); } } } catch (JMSException e) { System.err.println("消費者接收消息異常"); } } } }
(1)打開ActiveMQ服務器
(2)運行System1-MQ-topic-S1下JMSTest.java的main方法
(3)運行System2-MQ-topic-S2下JMSTest.java的testMyMQ()方法發送消息
將System1-MQ-topic-S1和System2-MQ-topic-S2中的生產者和消費者的隊列建立方式改變便可
https://my.oschina.net/u/3416597/blog/1554950