續談ActiveMQ之java如何操做ActiveMQ(springBoot項目)

 

引入maven依賴
         <!-- activemq -->
		 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

爲了便於管理mq這裏統一在xml中配置:java

<mq-clients>
	<producer>
		<id>demo.test</id>
		<topic>MQ_TEST</topic>
		<mq.type>1</mq.type>
		<delivery.mode>1</delivery.mode>
		<acknowledge>1</acknowledge>
	</producer>
	<producer>
		<id>demo.test2</id>
		<topic>MQ_TEST2</topic>
		<mq.type>2</mq.type>
		<delivery.mode>1</delivery.mode>
		<acknowledge>1</acknowledge>
	</producer>
	<consumer>
		<id>demo.consumer.test1</id>
		<topic>MQ_TEST</topic>
		<mq.type>1</mq.type>
		<message.listener>com.ule.microtopup.mq.listener.ActiveMQMessageListener</message.listener>
	</consumer>
	<consumer>
		<id>demo.consumer.test2</id>
		<topic>MQ_TEST2</topic>
		<mq.type>2</mq.type>
		<message.listener>com.ule.microtopup.mq.listener.ActiveMQMessageListener2</message.listener>
	</consumer>
</mq-clients>

XMLUtil:用來讀取xmlnode

/**
	 * 獲取全部節點
	 * @param root 根節點
	 * @param map  記錄每一個節點及值
	 */
	@SuppressWarnings("unchecked")
	private static void getNode(Element root, LinkedHashMap<String, String> map) {
		List<Element> list = root.elements();
		Iterator<Element> iterator = list.iterator();
		while (iterator.hasNext()) {
			Element element = iterator.next();
			if (element.elements() != null && element.elements().size() > 0) {
				System.out.println("element:"+element.getName());
				getNode(element, map);

			} else {
				map.put(element.getParent().getName() + "." + element.getName(),
				        element.getTextTrim());
			}
		}

	}
	/**
	 * 讀XML文件指定節點內容
	 * @param xmlName  xml文件名
	 * @param nodeName 指定節點
	 * @return
	 * @throws Exception
	 */
	public static Map<String, String> reader(String xmlName,String nodeName)throws Exception{
		if(StringUtils.isEmpty(xmlName)){
			throw new NullPointerException("xmlName cannot be null!");
		}
		
		LinkedHashMap<String, String> returnValue = new LinkedHashMap<String, String>();
		InputStream in = XMLUtil.class.getClassLoader().getResourceAsStream(xmlName);
		SAXReader reader = new SAXReader();
		Document document = reader.read(in);
		Element root = document.getRootElement();
		if(StringUtils.isNotEmpty(nodeName)){
			root = document.getRootElement().element(nodeName);
		}
		//獲取節點
		getNode(root, returnValue);
		
		if (returnValue.size()>0) {
			for (String key : returnValue.keySet()) {
				System.out.println("key:" + key + " ,value:" + returnValue.get(key));
			}

		}
		return returnValue;
	}
	
	
	/**
	 * 讀XML文件全部內容,並將文件轉成對象
	 * @param xmlName 文件名
	 * @param cls
	 * @return
	 * @throws Exception
	 */
	@SuppressWarnings("unchecked")
    public static <T> T readerXmlToBean(String xmlName ,Class<?>...cls)throws Exception{
		if(StringUtils.isEmpty(xmlName)){
			throw new NullPointerException("xmlName cannot be null!");
		}
		InputStream in = XMLUtil.class.getClassLoader().getResourceAsStream(xmlName);
		JAXBContext context = JAXBContext.newInstance(cls);// 獲取上下文對象  
        Unmarshaller unmarshaller = context.createUnmarshaller();
        T t =  (T)unmarshaller.unmarshal(in);
		return t;
	}

Producer:git

@XmlRootElement(name="producer")  
public class Producer {
	private String id;
	// 主題
	private String topic;
	// 類型,1-queue,2-topic
	private Integer mqType;
	// 持久化方式 :1-非持久,2-持久化
	private Integer deliveryMode;
	// 簽收方式:1-自動簽收,2-客戶端確認,3-自動批量確認,0-事務提交併確認
	private Integer acknowledge;

//省略get set
}

Consumer:spring

@XmlRootElement(name = "consumer")
public class Consumer {
	private String id;
	private String topic;
	private Integer mqType;
	private Class<? extends MessageListener> messageListener;
...
}

MessageUtil:mq消息集中處理類,包括髮送消息,啓動消費監聽等springboot

private static MqConnectionFactory mqFactory = MqConnectionFactory.INSTANCE;
	private static Connection conn = null;
	private static Session session = null;

	public static void init() {
		try {
			// 獲取一個鏈接
			if (conn == null) {
				conn = mqFactory.getConnection();
			}
			conn.start();
			// 自動提交事務
			if (session == null) {
				/*
				 * Session.AUTO_ACKNOWLEDGE 消息自動簽收
				 * Session.CLIENT_ACKNOWLEDGE 客戶端調用acknowledge方法手動簽收
				 * Session.DUPS_OK_ACKNOWLEDGE 不是必須簽收,消息可能會重複發送。在第二次從新傳送消息的時候,消息
				 * 頭的JmsDelivered會被置爲true標示當前消息已經傳送過一次,客戶端須要進行消息的重複處理控制。
				 */
				session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * 
	 * @param obj 序列化對象
	 * @param topic
	 * @param isQueue
	 * @throws Exception
	 */
	public static void sendObjectMessage(Serializable obj, String id)
	        throws Exception {
		init();
		Producer p = getProducerById(id);
		MessageProducer producer = getMessageProducer(getDestination(p), p.getDeliveryMode());
		producer.send(session.createObjectMessage(obj));
		destroy(producer);
	}

	private static Producer getProducerById(String id) {
		Producer p = MQUtil.getProducerById(id);
		if (p == null) {
			throw new NullPointerException("according to id:" + id + ", not found produer.");
		}
		return p;
	}

	public static void sendTextMessage(String mes, String id)
	        throws Exception {
		init();
		Producer p = getProducerById(id);
		MessageProducer producer = getMessageProducer(getDestination(p), p.getDeliveryMode());
		producer.send(session.createTextMessage(mes));
		destroy(producer);
	}

	private static MessageProducer getMessageProducer(Destination destination, Integer deliveryMode)
	        throws Exception {
		MessageProducer producer = session.createProducer(destination);
		/**
		 * PERSISTENT(持久性消息):
		 * 這是ActiveMQ的默認傳送模式,此模式保證這些消息只被傳送一次和成功使用一次。對於這些消息,可靠性是優先考慮的因素。
		 * 可靠性的另外一個重要方面是確保持久性消息傳送至目標後,消息服務在向消費者傳送它們以前不會丟失這些消息。這意味着在持久性消息傳送至目標時,
		 * 消息服務將其放入持久性數據存儲。若是消息服務因爲某種緣由致使失敗,
		 * 它能夠恢復此消息並將此消息傳送至相應的消費者。雖然這樣增長了消息傳送的開銷,但卻增長了可靠性。
		 * NON_PERSISTENT(非持久性消息):
		 * 保證這些消息最多被傳送一次。對於這些消息,可靠性並不是主要的考慮因素。
		 * 此模式並不要求持久性的數據存儲,也不保證消息服務因爲某種緣由致使失敗後消息不會丟失。
		 * 
		 */
		producer.setDeliveryMode(deliveryMode);
		return producer;
	}

	private static Destination getDestination(Producer p) throws Exception {
		return getDestination(p.getMqType(), p.getTopic());
	}

	private static Destination getDestination(Consumer c) throws Exception {
		return getDestination(c.getMqType(), c.getTopic());
	}

	private static Destination getDestination(Integer mqType, String topic) throws Exception {
		Destination destination = null;
		if (ActiveMqType.QUEUE == mqType)
			destination = session.createQueue(topic);
		else if (ActiveMqType.TOPIC == mqType)
			destination = session.createTopic(topic);
		else
			throw new IllegalArgumentException("mqType must be 1 or 2.");
		return destination;
	}
	/**
	 * 啓動全部監聽
	 * @param c
	 * @throws Exception
	 */
	public static void startConsumer(Consumer c) throws Exception {
		init();
		MessageConsumer consumer = session.createConsumer(MessageUtil.getDestination(c));
		MessageListener listener = c.getMessageListener().newInstance();
		consumer.setMessageListener(listener);
	}

	private static void destroy(MessageProducer producer) throws JMSException {
		if (producer != null) {
			producer.close();
		}
		if (session != null) {
			session.close();
			session = null;
		}
		if (conn != null) {
			conn.close();
			conn = null;
		}
	}

	public static void destroy(MessageConsumer consumer) throws JMSException {
		if (consumer != null) {
			consumer.close();
			consumer = null;
		}
		if (session != null) {
			session.close();
			session = null;
		}
		if (conn != null) {
			conn.close();
			conn = null;
		}
	}

細節不在贅述,具體代碼已上傳至碼雲:https://gitee.com/savage_xiao/boot.demo/tree/mastersession

有興趣能夠下載下來看一下,其中有包含其餘springboot的研究maven

測試代碼:spring-boot

public static void main(String[] args) {
		try {
			for(int i = 101; i<200;i++){
				MessageUtil.sendTextMessage("hello world!"+","+(i+1), "demo.test");
			}
        } catch (Exception e) {
	        e.printStackTrace();
        }
	}

-----------------------

public static void main(String[] args) {
		try {
			for(int i = 0; i<100;i++){
				MessageUtil.sendTextMessage("hello world2!"+","+(i+1), "demo.test2");
			}
        } catch (Exception e) {
	        e.printStackTrace();
        }
	}

 

測試結果:測試

listener2:hello world2!,1
listener2:hello world2!,2
listener2:hello world2!,3
listener2:hello world2!,4
listener2:hello world2!,5
....
....略
listener2:hello world2!,98
listener2:hello world2!,99
listener2:hello world2!,100
listener:hello world!,102
listener:hello world!,103
...
...略
listener:hello world!,197
listener:hello world!,198
listener:hello world!,199
listener:hello world!,200

ActiveMq如何搭建如何使用請看上一篇:http://www.javashuo.com/article/p-gfwpvgty-dq.html.net

相關文章
相關標籤/搜索