ActiveMQ (一)


 今天給你們分享的是ActiveMQ,若有不足,敬請指教。html

 那麼咱們必須知道ActiveMQ是什麼。java

1、ActiveMQ簡介

1.1 ActiveMQ是什麼

  • ActiveMQ是一個消息隊列應用服務器。支持JMS規範。

1.1.1 JMS概述

  • 全稱:Java Message Service ,即爲Java消息服務,是一套java消息服務的API標準。(標準即接口)
  • 實現了JMS標準的系統,稱之爲JMS Provider。

1.1.2 消息隊列

1.1.2.1 概念

  • 消息隊列是在消息的傳輸過程當中保存消息的容器,提供一種不一樣進程或者同一進程不一樣線程直接通信的方式
圖示
  1. Producer:消息生產者,負責產生和發送消息到 Broker;
  2. Broker:消息處理中心。負責消息存儲、確認、重試等,通常其中會包含多個 queue;
  3. Consumer:消息消費者,負責從 Broker 中獲取消息,並進行相應處理;

1.2 ActiveMQ能作什麼

  • 實現兩個不一樣應用(程序)之間的消息通信。
  • 實現同一個應用,不一樣模塊之間的消息通信。

1.3 ActiveMQ下載

1.4 ActiveMQ主要特色

  1. 支持多語言、多協議客戶端。語言: Java,C,C++,C#,Ruby,Perl,Python,PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
  2. 對Spring的支持,ActiveMQ能夠很容易整合到Spring的系統裏面去。
  3. 支持高可用、高性能的集羣模式

2、示例

2.1 需求

  • 使用ActiveMQ實現消息隊列模型

2.2 配置步驟說明

  1. 搭建ActiveMQ消息服務器(略)。
  2. 建立一個java項目。
  3. 建立消息生產者,發送消息。
  4. 建立消息消費者,接收消息

2.3 第一部分 建立java項目,導入jar包

圖示

2.4 第二部分 建立消息生成者,發送消息

2.4.1 建立MyProducer類,定義sengMsg2MQ方法

package com.xkt.producer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * @author lzx
 *
 */
public class MyProducer {

	// 定義連接工廠
	private ConnectionFactory factory;
	// 定義連接
	private Connection connection;
	// 定義會話
	private Session session;
	// 定義目的地
	private Destination destination;
	// 定義消息
	private Message message;
	// 定義消息生生產者
	private MessageProducer producer;

	public void sengMsg2MQ(String msg) {

		try {
			/*
			 * 一、建立連接工廠
			 * 
			 * ActiveMQConnectionFactory(userName, password, brokerURL)
			 * 
			 * userName:用戶名 默認admin password:密碼 默認admin brokerURL:消息服務中心地址
			 * tcp://0.0.0.0:61616 基於tcp協議
			 */
			factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616");

			// 2.建立連接
			connection = factory.createConnection();
			// 開啓連接
			connection.start();

			/*
			 * 三、建立會話
			 * 
			 * createSession(transacted, acknowledgeMode)
			 * 
			 * transacted:是否使用事物 true|false true 表示使用事物,每次對消息進行讀寫以後,要提交事物。若是使用了事物,則消息確認機制失效
			 * false 表示不使用事物
			 * 
			 * acknowledgeMode: 消息確認機制 Session.AUTO_ACKNOWLEDGE -
			 * 自動確認消息機制,一旦讀取到消息,則消費成功,消息出隊列,避免重複消費 Session.CLIENT_ACKNOWLEDGE -
			 * 客戶端確認消息機制,手動確認,即消費了消息成功以後,再確認 Session.DUPS_OK_ACKNOWLEDGE -
			 * 有副本的客戶端確認消息機制。集羣模式下
			 * 
			 */
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

			// 4.建立隊列
			destination = session.createQueue("test-mq");

			// 5.建立消息對象
			message = session.createTextMessage(msg);

			// 6.建立消息生產者
			producer = session.createProducer(destination);

			// 7.發送消息
			producer.send(message);

			// session.commit();

			System.out.println("消息發送成功");
		} catch (JMSException e) {
			e.printStackTrace();
			System.out.println("消息發送失敗");
		} finally {
			// 回收消息發送者資源
			if (null != producer) {
				try {
					producer.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}

			// 回收會話資源
			if (null != session) {
				try {
					session.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}

			// 回收連接資源
			if (null != connection) {
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}

}

2.4.2 建立一個測試類

package com.xkt.test;

import org.junit.Test;

import com.xkt.consumer.Myconsumer;
import com.xkt.producer.MyProducer;

/**
 * @author lzx
 *
 */
public class MessageTest {

	@Test
	public void testSend() {

		try {
			MyProducer producer = new MyProducer();
			producer.sengMsg2MQ("測試發送數據");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}

2.4.3 測試

圖示
  • 查看ActiveMQ管理控制界面
圖示

2.5 第三部分 建立消息消費者,消費消息

2.5.1 建立MyConsumer類

package com.xkt.consumer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * @author lzx
 *
 */
public class Myconsumer {

	private ConnectionFactory factory;

	private Connection connection;

	private Session session;

	private Destination destination;

	private MessageConsumer consumer;

	private Message message;

	public void receiveFromMq() {

		try {
			factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616");

			connection = factory.createConnection();
			connection.start();

			session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

			// 建立目的地, 目的地命名即隊列命名, 消息消費者須要經過此命名訪問對應的隊列
			destination = session.createQueue("queue");

			// 建立消息消費者, 建立的消息消費者與某目的地對應, 即方法參數目的地
			consumer = session.createConsumer(destination);

			// 六、讀取消息
			message = consumer.receive(5000);

			// 7.提取文本
			if (null != message) {
				if (message instanceof TextMessage) {
					TextMessage tMsg = (TextMessage) message;
					String content = tMsg.getText();
					System.out.println("從列表中讀取的是" + content);
				}
			}
			// 在手動確認機制下,消費完消息以後,必須手動確認,讓消費的消息出隊列不然,會出現重複消費的問題。
			message.acknowledge();

		} catch (JMSException e) {
			e.printStackTrace();
			System.out.println("讀取失敗");
		} finally {
			if (null != consumer) {
				try {
					consumer.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}

			if (null != session) {
				try {
					session.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}

			if (null != connection) {
				try {
					connection.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}

	}

}

2.5.2 修改測試類MessageTest,新增測試方法

@Test
	public void testReceive() {

		try {
			Myconsumer consumer = new Myconsumer();
			consumer.receiveFromMq();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

2.5.3 測試

圖示
  • 查看ActiveMQ管理控制界面
圖示

  在前面的示例中,咱們發現消費者每次只能消費一條消息。當隊列中有多條消息的時候,咱們須要屢次運行消費者,才能消費完這些消息。很麻煩!!!!如何解決這個問題呢?咱們將在後面的文章中給出。apache

版權說明:歡迎以任何方式進行轉載,但請在轉載後註明出處!服務器

相關文章
相關標籤/搜索