引入maven依賴 <!-- activemq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>爲了便於管理mq這裏統一在xml中配置:java
<mq-clients> <producer> <id>demo.test</id> <topic>MQ_TEST</topic> <mq.type>1</mq.type> <delivery.mode>1</delivery.mode> <acknowledge>1</acknowledge> </producer> <producer> <id>demo.test2</id> <topic>MQ_TEST2</topic> <mq.type>2</mq.type> <delivery.mode>1</delivery.mode> <acknowledge>1</acknowledge> </producer> <consumer> <id>demo.consumer.test1</id> <topic>MQ_TEST</topic> <mq.type>1</mq.type> <message.listener>com.ule.microtopup.mq.listener.ActiveMQMessageListener</message.listener> </consumer> <consumer> <id>demo.consumer.test2</id> <topic>MQ_TEST2</topic> <mq.type>2</mq.type> <message.listener>com.ule.microtopup.mq.listener.ActiveMQMessageListener2</message.listener> </consumer> </mq-clients>XMLUtil:用來讀取xmlnode
/** * 獲取全部節點 * @param root 根節點 * @param map 記錄每一個節點及值 */ @SuppressWarnings("unchecked") private static void getNode(Element root, LinkedHashMap<String, String> map) { List<Element> list = root.elements(); Iterator<Element> iterator = list.iterator(); while (iterator.hasNext()) { Element element = iterator.next(); if (element.elements() != null && element.elements().size() > 0) { System.out.println("element:"+element.getName()); getNode(element, map); } else { map.put(element.getParent().getName() + "." + element.getName(), element.getTextTrim()); } } } /** * 讀XML文件指定節點內容 * @param xmlName xml文件名 * @param nodeName 指定節點 * @return * @throws Exception */ public static Map<String, String> reader(String xmlName,String nodeName)throws Exception{ if(StringUtils.isEmpty(xmlName)){ throw new NullPointerException("xmlName cannot be null!"); } LinkedHashMap<String, String> returnValue = new LinkedHashMap<String, String>(); InputStream in = XMLUtil.class.getClassLoader().getResourceAsStream(xmlName); SAXReader reader = new SAXReader(); Document document = reader.read(in); Element root = document.getRootElement(); if(StringUtils.isNotEmpty(nodeName)){ root = document.getRootElement().element(nodeName); } //獲取節點 getNode(root, returnValue); if (returnValue.size()>0) { for (String key : returnValue.keySet()) { System.out.println("key:" + key + " ,value:" + returnValue.get(key)); } } return returnValue; } /** * 讀XML文件全部內容,並將文件轉成對象 * @param xmlName 文件名 * @param cls * @return * @throws Exception */ @SuppressWarnings("unchecked") public static <T> T readerXmlToBean(String xmlName ,Class<?>...cls)throws Exception{ if(StringUtils.isEmpty(xmlName)){ throw new NullPointerException("xmlName cannot be null!"); } InputStream in = XMLUtil.class.getClassLoader().getResourceAsStream(xmlName); JAXBContext context = JAXBContext.newInstance(cls);// 獲取上下文對象 Unmarshaller unmarshaller = context.createUnmarshaller(); T t = (T)unmarshaller.unmarshal(in); return t; }Producer:git
@XmlRootElement(name="producer") public class Producer { private String id; // 主題 private String topic; // 類型,1-queue,2-topic private Integer mqType; // 持久化方式 :1-非持久,2-持久化 private Integer deliveryMode; // 簽收方式:1-自動簽收,2-客戶端確認,3-自動批量確認,0-事務提交併確認 private Integer acknowledge; //省略get set }Consumer:spring
@XmlRootElement(name = "consumer") public class Consumer { private String id; private String topic; private Integer mqType; private Class<? extends MessageListener> messageListener; ... }MessageUtil:mq消息集中處理類,包括髮送消息,啓動消費監聽等springboot
private static MqConnectionFactory mqFactory = MqConnectionFactory.INSTANCE; private static Connection conn = null; private static Session session = null; public static void init() { try { // 獲取一個鏈接 if (conn == null) { conn = mqFactory.getConnection(); } conn.start(); // 自動提交事務 if (session == null) { /* * Session.AUTO_ACKNOWLEDGE 消息自動簽收 * Session.CLIENT_ACKNOWLEDGE 客戶端調用acknowledge方法手動簽收 * Session.DUPS_OK_ACKNOWLEDGE 不是必須簽收,消息可能會重複發送。在第二次從新傳送消息的時候,消息 * 頭的JmsDelivered會被置爲true標示當前消息已經傳送過一次,客戶端須要進行消息的重複處理控制。 */ session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); } } catch (Exception e) { e.printStackTrace(); } } /** * * @param obj 序列化對象 * @param topic * @param isQueue * @throws Exception */ public static void sendObjectMessage(Serializable obj, String id) throws Exception { init(); Producer p = getProducerById(id); MessageProducer producer = getMessageProducer(getDestination(p), p.getDeliveryMode()); producer.send(session.createObjectMessage(obj)); destroy(producer); } private static Producer getProducerById(String id) { Producer p = MQUtil.getProducerById(id); if (p == null) { throw new NullPointerException("according to id:" + id + ", not found produer."); } return p; } public static void sendTextMessage(String mes, String id) throws Exception { init(); Producer p = getProducerById(id); MessageProducer producer = getMessageProducer(getDestination(p), p.getDeliveryMode()); producer.send(session.createTextMessage(mes)); destroy(producer); } private static MessageProducer getMessageProducer(Destination destination, Integer deliveryMode) throws Exception { MessageProducer producer = session.createProducer(destination); /** * PERSISTENT(持久性消息): * 這是ActiveMQ的默認傳送模式,此模式保證這些消息只被傳送一次和成功使用一次。對於這些消息,可靠性是優先考慮的因素。 * 可靠性的另外一個重要方面是確保持久性消息傳送至目標後,消息服務在向消費者傳送它們以前不會丟失這些消息。這意味着在持久性消息傳送至目標時, * 消息服務將其放入持久性數據存儲。若是消息服務因爲某種緣由致使失敗, * 它能夠恢復此消息並將此消息傳送至相應的消費者。雖然這樣增長了消息傳送的開銷,但卻增長了可靠性。 * NON_PERSISTENT(非持久性消息): * 保證這些消息最多被傳送一次。對於這些消息,可靠性並不是主要的考慮因素。 * 此模式並不要求持久性的數據存儲,也不保證消息服務因爲某種緣由致使失敗後消息不會丟失。 * */ producer.setDeliveryMode(deliveryMode); return producer; } private static Destination getDestination(Producer p) throws Exception { return getDestination(p.getMqType(), p.getTopic()); } private static Destination getDestination(Consumer c) throws Exception { return getDestination(c.getMqType(), c.getTopic()); } private static Destination getDestination(Integer mqType, String topic) throws Exception { Destination destination = null; if (ActiveMqType.QUEUE == mqType) destination = session.createQueue(topic); else if (ActiveMqType.TOPIC == mqType) destination = session.createTopic(topic); else throw new IllegalArgumentException("mqType must be 1 or 2."); return destination; } /** * 啓動全部監聽 * @param c * @throws Exception */ public static void startConsumer(Consumer c) throws Exception { init(); MessageConsumer consumer = session.createConsumer(MessageUtil.getDestination(c)); MessageListener listener = c.getMessageListener().newInstance(); consumer.setMessageListener(listener); } private static void destroy(MessageProducer producer) throws JMSException { if (producer != null) { producer.close(); } if (session != null) { session.close(); session = null; } if (conn != null) { conn.close(); conn = null; } } public static void destroy(MessageConsumer consumer) throws JMSException { if (consumer != null) { consumer.close(); consumer = null; } if (session != null) { session.close(); session = null; } if (conn != null) { conn.close(); conn = null; } }細節不在贅述,具體代碼已上傳至碼雲:https://gitee.com/savage_xiao/boot.demo/tree/mastersession
有興趣能夠下載下來看一下,其中有包含其餘springboot的研究maven
測試代碼:spring-boot
public static void main(String[] args) { try { for(int i = 101; i<200;i++){ MessageUtil.sendTextMessage("hello world!"+","+(i+1), "demo.test"); } } catch (Exception e) { e.printStackTrace(); } } ----------------------- public static void main(String[] args) { try { for(int i = 0; i<100;i++){ MessageUtil.sendTextMessage("hello world2!"+","+(i+1), "demo.test2"); } } catch (Exception e) { e.printStackTrace(); } }
測試結果:測試
listener2:hello world2!,1 listener2:hello world2!,2 listener2:hello world2!,3 listener2:hello world2!,4 listener2:hello world2!,5 .... ....略 listener2:hello world2!,98 listener2:hello world2!,99 listener2:hello world2!,100 listener:hello world!,102 listener:hello world!,103 ... ...略 listener:hello world!,197 listener:hello world!,198 listener:hello world!,199 listener:hello world!,200ActiveMq如何搭建如何使用請看上一篇:http://www.javashuo.com/article/p-gfwpvgty-dq.html.net