基於Active MQ的系統間通訊示例

一、使用到的jar包

activemq-all-5.15.1.jar
commons-lang3-3.0.jar
commons-logging-1.1.3.jar
fastjson-1.1.31.jar
jackson-core-asl-1.9.13.jar
jackson-mapper-asl-1.9.13.jar
jboss-logging-3.1.3.GA.jar
jconsole.jar
log4j-1.2.16.jar
slf4j-api-1.6.2.jar
slf4j-log4j12-1.6.2.jarjava

二、項目結構

三、System1-MQ-topic-S1完整代碼

GlobalVar.javaapache

package com.system1.mq.p2p;

public interface GlobalVar {
	public static final class Common {
		public static int loggerDebug = 1;
	}
}

JMSTest.javajson

package com.system1.mq.p2p;

import org.junit.Test;

public class JMSTest {
	@Test
	public void testMyMQ() {
		MQ.getInstance().sendMsg("TOP1-TOP2:你好,我不是一條單純的消息");
	}

	public static void main(String[] args) {
		MQ.getInstance().new ConsumerListener().start();
	}
}

Logger.javaapi

package com.system1.mq.p2p;

import org.slf4j.LoggerFactory;

public class Logger {
	private static final org.slf4j.Logger logger = LoggerFactory.getLogger(Logger.class);

	public static void trace(String format, Object... args) {
		if (GlobalVar.Common.loggerDebug == 1) {
			logger.trace(format, args);
		}
	}

	public static void trace(String msg, Throwable e) {
		if (GlobalVar.Common.loggerDebug == 1) {
			logger.trace(msg, e);
		}
	}

	public static void debug(String format, Object... args) {
		if (GlobalVar.Common.loggerDebug == 1) {
			logger.debug(format, args);
		}
	}

	public static void debug(String msg, Throwable e) {
		if (GlobalVar.Common.loggerDebug == 1) {
			logger.debug(msg, e);
		}
	}

	public static void info(String format, Object... args) {
		if (GlobalVar.Common.loggerDebug == 1) {
			logger.info(format, args);
		}
	}

	public static void info(String msg, Throwable e) {
		if (GlobalVar.Common.loggerDebug == 1) {
			logger.info(msg, e);
		}
	}

	public static void warn(String format, Object... args) {
		if (GlobalVar.Common.loggerDebug == 1) {
			logger.warn(format, args);
		}
	}

	public static void warn(String msg, Throwable e) {
		if (GlobalVar.Common.loggerDebug == 1) {
			logger.warn(msg, e);
		}
	}

	public static void error(String format, Object... args) {
		if (GlobalVar.Common.loggerDebug == 1) {
			logger.error(format, args);
		}
	}

	public static void error(String msg, Throwable e) {
		if (GlobalVar.Common.loggerDebug == 1) {
			logger.error(msg, e);
		}
	}

	public static void exception(String msg, Throwable e) {
		if (GlobalVar.Common.loggerDebug == 1) {
			logger.error(msg, e);
		}
	}

	public static void exception(Throwable e) {
		logger.error(e.getMessage(), e);
	}
}

MQ.java服務器

package com.system1.mq.p2p;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.StringUtils;

public class MQ {
	/**
	 * 單例模式
	 */
	private static MQ instance;

	private MQ() {
	}

	public static MQ getInstance() {
		if (instance == null) {
			instance = new MQ();
		}
		return instance;
	}

	// 默認鏈接用戶名
	private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
	// 默認鏈接密碼
	private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
	// 默認鏈接地址
	private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
	// 鏈接工廠
	static ConnectionFactory connectionFactory;
	// 鏈接
	static Connection connection = null;

	// 會話 接受或者發送消息的線程
	static Session session;
	// 消息的目的地
	static Destination dest1, dest2;
	// topic
	static Topic top1, top2;
	// 消費者
	static MessageConsumer consumer;
	// 生產者
	static MessageProducer producer;

	// 隊列名稱
	public interface QUENENAME {
		// quene目的地
		public static final String DESTINATION1 = "S1-S2";
		public static final String DESTINATION2 = "S2-S1";
		// topic地址
		public static final String TOP1 = "TOP1-TOP2";
		public static final String TOP2 = "TOP2-TOP1";
	}

	// 建立鏈接
	static {
		// 實例化鏈接工廠
		connectionFactory = new ActiveMQConnectionFactory(MQ.USERNAME, MQ.PASSWORD, MQ.BROKEURL);
		try {
			// 經過鏈接工廠獲取鏈接
			connection = connectionFactory.createConnection();
			// 啓動鏈接
			connection.start();
			// 建立session
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 自動通知MQ服務器消息已收到
			// 第一個參數表示是否開啓事務,第二個參數表示由客戶端自行決定通知MQ服務器消息已經收到,須要調用message.acknowledge()來通知服務器.
			// session = connection.createSession(true,
			// Session.CLIENT_ACKNOWLEDGE);
			// 生產者
			// dest1 = session.createQueue(MQ.QUENENAME.DESTINATION1);
			// producer = session.createProducer(dest1);
			top1 = session.createTopic(MQ.QUENENAME.TOP1);
			producer = session.createProducer(top1);
			// 消費者
			// dest2 = session.createQueue(MQ.QUENENAME.DESTINATION2);
			// consumer = session.createConsumer(dest2);
			top2 = session.createTopic(MQ.QUENENAME.TOP2);
			consumer = session.createConsumer(top2);
		} catch (JMSException e) {
			System.out.println("建立鏈接異常");
		}
	}

	/**
	 * 生產者
	 */
	public void sendMsg(String message) {
		try {
			TextMessage txtMsg = session.createTextMessage(message);
			producer.send(txtMsg);
		} catch (JMSException e) {
			System.err.println("發送消息異常");
		}
	}

	/**
	 * 消費者
	 */
	public class ConsumerListener extends Thread {
		@Override
		public void run() {
			super.run();
			try {
				while (true) {
					TextMessage textMessage = (TextMessage) consumer.receive();
					String msg = textMessage.getText();
					if (StringUtils.isNotBlank(msg)) {
						try {
							new ProtocolParse(msg).handler();
						} catch (Exception e) {
							System.err.println("協議處理異常!");
						}
					} else {
						System.err.println("消費者接收消息爲空!");
					}
				}
			} catch (JMSException e) {
				System.err.println("消費者接收消息異常");
			}
		}
	}
}

ProtocolConstant.javasession

package com.system1.mq.p2p;

public class ProtocolConstant {
	// 命令字
	public interface CMD {
		public static final String HEARTBEAT = "heartbeat";
	}
}

ProtocolParse.javaapp

package com.system1.mq.p2p;

import com.alibaba.fastjson.JSONObject;
import com.system1.mq.handler.ExceptionHandler;
import com.system1.mq.handler.HeartbeatHandler;

public class ProtocolParse {
	private String protocol;

	public ProtocolParse(String protocol) {
		super();
		this.protocol = protocol;
	}

	public void handler() throws Exception {
		JSONObject jsonObj = JSONObject.parseObject(protocol);
		if (jsonObj == null) {
			throw new Exception("不符合json格式的數據!");
		}
		// 命令字
		String cmd = jsonObj.getString("cmd");
		switch (cmd) {
		case ProtocolConstant.CMD.HEARTBEAT:
			new HeartbeatHandler(protocol).start();
			break;
		default:
			new ExceptionHandler(protocol).start();
			break;
		}

	}

	public String getProtocol() {
		return protocol;
	}

	public void setProtocol(String protocol) {
		this.protocol = protocol;
	}

}

BaseMessageHandler.javaide

package com.system1.mq.handler;

public abstract class BaseMessageHandler extends Thread {
	private String message;

	public BaseMessageHandler(String message) {
		super();
		this.message = message;
	}

	public String getMessage() {
		return message;
	}

	public void setMessage(String message) {
		this.message = message;
	}
}

ExceptionHandler.java測試

package com.system1.mq.handler;

import com.system1.mq.p2p.Logger;

public class ExceptionHandler extends BaseMessageHandler {

	public ExceptionHandler(String message) {
		super(message);
	}

	@Override
	public void run() {
		super.run();
		Logger.info("===異常的協議數據===" + super.getMessage());
	}

}

HeartbeatHandler.javathis

package com.system1.mq.handler;

import com.system1.mq.p2p.Logger;
import com.system1.mq.p2p.MQ;

public class HeartbeatHandler extends BaseMessageHandler {

	public HeartbeatHandler(String message) {
		super(message);
	}
	@Override
	public void run() {
		super.run();
		Logger.info("===HEARTBEAT===");
		MQ.getInstance().sendMsg("收到心跳");
	}

}

 

四、System2-MQ-topic-S2完整代碼

GlobalVar.java

package com.system1.mq.p2p;

public interface GlobalVar {
	public static final class Common {
		public static int loggerDebug = 1;
	}
}

JMSTest.java

package com.system1.mq.p2p;

import org.junit.Test;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

public class JMSTest {
	@Test
	public void testMyMQ() {
		JSONObject jsonObj = new JSONObject();
		jsonObj.put("cmd", "heartbeat");
		MQ.getInstance().sendMsg(JSON.toJSONString(jsonObj));
	}

	public static void main(String[] args) {
		MQ.getInstance().new ConsumerListener().start();
	}
}

Logger.java

package com.system1.mq.p2p;

import org.slf4j.LoggerFactory;

public class Logger {
	private static final org.slf4j.Logger logger = LoggerFactory.getLogger(Logger.class);

	public static void trace(String format, Object... args) {
		if (GlobalVar.Common.loggerDebug == 1) {
			logger.trace(format, args);
		}
	}

	public static void trace(String msg, Throwable e) {
		if (GlobalVar.Common.loggerDebug == 1) {
			logger.trace(msg, e);
		}
	}

	public static void debug(String format, Object... args) {
		if (GlobalVar.Common.loggerDebug == 1) {
			logger.debug(format, args);
		}
	}

	public static void debug(String msg, Throwable e) {
		if (GlobalVar.Common.loggerDebug == 1) {
			logger.debug(msg, e);
		}
	}

	public static void info(String format, Object... args) {
		if (GlobalVar.Common.loggerDebug == 1) {
			logger.info(format, args);
		}
	}

	public static void info(String msg, Throwable e) {
		if (GlobalVar.Common.loggerDebug == 1) {
			logger.info(msg, e);
		}
	}

	public static void warn(String format, Object... args) {
		if (GlobalVar.Common.loggerDebug == 1) {
			logger.warn(format, args);
		}
	}

	public static void warn(String msg, Throwable e) {
		if (GlobalVar.Common.loggerDebug == 1) {
			logger.warn(msg, e);
		}
	}

	public static void error(String format, Object... args) {
		if (GlobalVar.Common.loggerDebug == 1) {
			logger.error(format, args);
		}
	}

	public static void error(String msg, Throwable e) {
		if (GlobalVar.Common.loggerDebug == 1) {
			logger.error(msg, e);
		}
	}

	public static void exception(String msg, Throwable e) {
		if (GlobalVar.Common.loggerDebug == 1) {
			logger.error(msg, e);
		}
	}

	public static void exception(Throwable e) {
		logger.error(e.getMessage(), e);
	}
}

MQ.java

package com.system1.mq.p2p;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.StringUtils;

public class MQ {
	/**
	 * 單例模式
	 */
	private static MQ instance;

	private MQ() {
	}

	public static MQ getInstance() {
		if (instance == null) {
			instance = new MQ();
		}
		return instance;
	}

	// 默認鏈接用戶名
	private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
	// 默認鏈接密碼
	private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
	// 默認鏈接地址
	private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
	// 鏈接工廠
	static ConnectionFactory connectionFactory;
	// 鏈接
	static Connection connection = null;

	// 會話 接受或者發送消息的線程
	static Session session;
	// 消息的目的地
	static Destination dest1, dest2;
	// topic
	static Topic top1, top2;
	// 消費者
	static MessageConsumer consumer;
	// 生產者
	static MessageProducer producer;

	// 隊列名稱
	public interface QUENENAME {
		public static final String DESTINATION1 = "S1-S2";
		public static final String DESTINATION2 = "S2-S1";
		// topic地址
		public static final String TOP1 = "TOP1-TOP2";
		public static final String TOP2 = "TOP2-TOP1";
	}

	// 建立鏈接
	static {
		// 實例化鏈接工廠
		connectionFactory = new ActiveMQConnectionFactory(MQ.USERNAME, MQ.PASSWORD, MQ.BROKEURL);
		try {
			// 經過鏈接工廠獲取鏈接
			connection = connectionFactory.createConnection();
			// 啓動鏈接
			connection.start();
			// 建立session
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			// 生產者
			// dest1 = session.createQueue(MQ.QUENENAME.DESTINATION2);
			// producer = session.createProducer(dest1);
			top1 = session.createTopic(MQ.QUENENAME.TOP2);
			producer = session.createProducer(top1);
			// 消費者
			// dest2 = session.createQueue(MQ.QUENENAME.DESTINATION1);
			// consumer = session.createConsumer(dest2);
			top2 = session.createTopic(MQ.QUENENAME.TOP1);
			consumer = session.createConsumer(top2);
		} catch (JMSException e) {
			System.out.println("建立鏈接異常");
		}
	}

	/**
	 * 生產者
	 */
	public void sendMsg(String message) {
		try {
			TextMessage txtMsg = session.createTextMessage(message);
			producer.send(txtMsg);
		} catch (JMSException e) {
			System.err.println("發送消息異常");
		}
	}

	/**
	 * 消費者
	 */
	public class ConsumerListener extends Thread {
		@Override
		public void run() {
			super.run();
			try {
				while (true) {
					TextMessage textMessage = (TextMessage) consumer.receive();
					String msg = textMessage.getText();
					if (StringUtils.isNotBlank(msg)) {
						System.out.println(msg);
					} else {
						System.err.println("消費者接收消息爲空!");
					}
				}
			} catch (JMSException e) {
				System.err.println("消費者接收消息異常");
			}
		}
	}
}

 

五、測試

(1)打開ActiveMQ服務器

(2)運行System1-MQ-topic-S1下JMSTest.java的main方法

(3)運行System2-MQ-topic-S2下JMSTest.java的testMyMQ()方法發送消息

六、改用p2p方式發送消息

將System1-MQ-topic-S1和System2-MQ-topic-S2中的生產者和消費者的隊列建立方式改變便可

七、系統間通訊模型圖見

https://my.oschina.net/u/3416597/blog/1554950

相關文章
相關標籤/搜索