
今天給你們分享的是ActiveMQ,若有不足,敬請指教。html
那麼咱們必須知道ActiveMQ是什麼。java
1、ActiveMQ簡介
1.1 ActiveMQ是什麼
- ActiveMQ是一個消息隊列應用服務器。支持JMS規範。
1.1.1 JMS概述
- 全稱:Java Message Service ,即爲Java消息服務,是一套java消息服務的API標準。(標準即接口)
- 實現了JMS標準的系統,稱之爲JMS Provider。
1.1.2 消息隊列
1.1.2.1 概念
- 消息隊列是在消息的傳輸過程當中保存消息的容器,提供一種不一樣進程或者同一進程不一樣線程直接通信的方式
圖示 |
 |
- Producer:消息生產者,負責產生和發送消息到 Broker;
- Broker:消息處理中心。負責消息存儲、確認、重試等,通常其中會包含多個 queue;
- Consumer:消息消費者,負責從 Broker 中獲取消息,並進行相應處理;
1.2 ActiveMQ能作什麼
- 實現兩個不一樣應用(程序)之間的消息通信。
- 實現同一個應用,不一樣模塊之間的消息通信。
1.3 ActiveMQ下載
1.4 ActiveMQ主要特色
- 支持多語言、多協議客戶端。語言: Java,C,C++,C#,Ruby,Perl,Python,PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
- 對Spring的支持,ActiveMQ能夠很容易整合到Spring的系統裏面去。
- 支持高可用、高性能的集羣模式
2、示例
2.1 需求
2.2 配置步驟說明
- 搭建ActiveMQ消息服務器(略)。
- 建立一個java項目。
- 建立消息生產者,發送消息。
- 建立消息消費者,接收消息
2.3 第一部分 建立java項目,導入jar包
圖示 |
 |
2.4 第二部分 建立消息生成者,發送消息
2.4.1 建立MyProducer類,定義sengMsg2MQ方法
package com.xkt.producer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* @author lzx
*
*/
public class MyProducer {
// 定義連接工廠
private ConnectionFactory factory;
// 定義連接
private Connection connection;
// 定義會話
private Session session;
// 定義目的地
private Destination destination;
// 定義消息
private Message message;
// 定義消息生生產者
private MessageProducer producer;
public void sengMsg2MQ(String msg) {
try {
/*
* 一、建立連接工廠
*
* ActiveMQConnectionFactory(userName, password, brokerURL)
*
* userName:用戶名 默認admin password:密碼 默認admin brokerURL:消息服務中心地址
* tcp://0.0.0.0:61616 基於tcp協議
*/
factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616");
// 2.建立連接
connection = factory.createConnection();
// 開啓連接
connection.start();
/*
* 三、建立會話
*
* createSession(transacted, acknowledgeMode)
*
* transacted:是否使用事物 true|false true 表示使用事物,每次對消息進行讀寫以後,要提交事物。若是使用了事物,則消息確認機制失效
* false 表示不使用事物
*
* acknowledgeMode: 消息確認機制 Session.AUTO_ACKNOWLEDGE -
* 自動確認消息機制,一旦讀取到消息,則消費成功,消息出隊列,避免重複消費 Session.CLIENT_ACKNOWLEDGE -
* 客戶端確認消息機制,手動確認,即消費了消息成功以後,再確認 Session.DUPS_OK_ACKNOWLEDGE -
* 有副本的客戶端確認消息機制。集羣模式下
*
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.建立隊列
destination = session.createQueue("test-mq");
// 5.建立消息對象
message = session.createTextMessage(msg);
// 6.建立消息生產者
producer = session.createProducer(destination);
// 7.發送消息
producer.send(message);
// session.commit();
System.out.println("消息發送成功");
} catch (JMSException e) {
e.printStackTrace();
System.out.println("消息發送失敗");
} finally {
// 回收消息發送者資源
if (null != producer) {
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
// 回收會話資源
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
// 回收連接資源
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
2.4.2 建立一個測試類
package com.xkt.test;
import org.junit.Test;
import com.xkt.consumer.Myconsumer;
import com.xkt.producer.MyProducer;
/**
* @author lzx
*
*/
public class MessageTest {
@Test
public void testSend() {
try {
MyProducer producer = new MyProducer();
producer.sengMsg2MQ("測試發送數據");
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.4.3 測試
圖示 |
 |
圖示 |
 |
2.5 第三部分 建立消息消費者,消費消息
2.5.1 建立MyConsumer類
package com.xkt.consumer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* @author lzx
*
*/
public class Myconsumer {
private ConnectionFactory factory;
private Connection connection;
private Session session;
private Destination destination;
private MessageConsumer consumer;
private Message message;
public void receiveFromMq() {
try {
factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616");
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// 建立目的地, 目的地命名即隊列命名, 消息消費者須要經過此命名訪問對應的隊列
destination = session.createQueue("queue");
// 建立消息消費者, 建立的消息消費者與某目的地對應, 即方法參數目的地
consumer = session.createConsumer(destination);
// 六、讀取消息
message = consumer.receive(5000);
// 7.提取文本
if (null != message) {
if (message instanceof TextMessage) {
TextMessage tMsg = (TextMessage) message;
String content = tMsg.getText();
System.out.println("從列表中讀取的是" + content);
}
}
// 在手動確認機制下,消費完消息以後,必須手動確認,讓消費的消息出隊列不然,會出現重複消費的問題。
message.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
System.out.println("讀取失敗");
} finally {
if (null != consumer) {
try {
consumer.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (null != session) {
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
2.5.2 修改測試類MessageTest,新增測試方法
@Test
public void testReceive() {
try {
Myconsumer consumer = new Myconsumer();
consumer.receiveFromMq();
} catch (Exception e) {
e.printStackTrace();
}
}
2.5.3 測試
圖示 |
 |
圖示 |
 |
在前面的示例中,咱們發現消費者每次只能消費一條消息。當隊列中有多條消息的時候,咱們須要屢次運行消費者,才能消費完這些消息。很麻煩!!!!如何解決這個問題呢?咱們將在後面的文章中給出。apache
版權說明:歡迎以任何方式進行轉載,但請在轉載後註明出處!服務器