ActiveMQ 部署及發送接收消息java
1、 下載下載地址:http://activemq.apache.org/ 我這裏使用的版本爲當前最新5.8.0。web
下載版本有Windows和Linux兩個版本,且都分爲32位和64位。根據本身須要選擇下載。apache
2、 安裝我這裏下載的爲windows的32位版本(apache-activemq-5.8.0-bin.zip),下載後直接解壓到須要安裝的目錄或在直接解壓到當前目錄也可,解壓完安裝也完成。windows
解壓後目錄如上圖,裏面包含了示例和文檔,及全部的jar包。session
3、 運行
進入到bin目錄(apache-activemq-5.8.0\bin),雙擊activemq.bat,就會運行,運行截圖以下:dom
此時表示ActiveMQ已經在運行了,固然正常生產環境下能夠設置做爲服務在後臺運行,而且隨系統啓動而啓動。tcp
4、 測試ActiveMQ自帶了一套管理系統,訪問http://localhost:8161/admin/,會出現須要輸入用戶名和密碼的頁面以下:ide
默認用戶名和密碼都是admin,進入後則爲主界面:測試
在這個界面上,咱們能夠管理隊列及其餘的一些功能,爲了下面的繼續,咱們在這裏建立一個Queue和一個Topic。url
點擊目錄上的Queues進入建立Queue頁面,輸入Queue名稱,點擊Create後下面就建立了G2Queue的queue隊列。
這裏也能夠不用這樣手工建立,在發送端指定了一個Queue或Topic名字後,會自動建立一個隊列,如上面的choice.queue和FirstQueue都是我測試程序時,程序裏面指定的Queue名稱,自動建立的。
一樣的方式建立一個Topic,以下:
建立一個新的項目,我這裏是建立的webproject名稱爲ActiveMQ,引入ActiveMQ的jar包,整個工程結構以下:
此段代碼從網上直接copy,只是稍做修改:
[java] view plaincopy
import java.util.Random;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class SendMessage {
private static final String url = "tcp://localhost:61616";
private static final String QUEUE_NAME = "G2Queue";
public void sendMessage() throwsJMSException {
// JMS 客戶端到JMSProvider 的鏈接
Connection connection = null;
try {
// 鏈接工廠,JMS 用它建立鏈接
// 構造ConnectionFactory實例對象,此處採用ActiveMq的實現jar
ConnectionFactory connectionFactory = newActiveMQConnectionFactory(url);
connection = (Connection)connectionFactory.createConnection();
// 啓動鏈接
connection.start();
//Session:發送或接收消息的線程
// 獲取session
Session session = (Session) connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
// 消息的目的地,消息發送到那個隊列
Destination destination = session.createQueue(QUEUE_NAME);
//MessageProducer:消息發送者(生產者)
// 建立消息發送者
MessageProducer producer =session.createProducer(destination);
// 設置是否持久化
//DeliveryMode.NON_PERSISTENT:不持久化
//DeliveryMode.PERSISTENT:持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
String msg = "";
int i = 0;
do {
msg = "第"+i + "次發送的消息:"+new Random();
TextMessagemessage = session.createTextMessage(msg);
Thread.sleep(1000);
// 發送消息到目的地方
producer.send(message);
System.out.println("發送消息:" +msg);
i++;
} while (i<1000);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
SendMessage sndMsg = newSendMessage();
try {
sndMsg.sendMessage();
} catch (Exception ex) {
System.out.println(ex.toString());
}
}
}
運行結果以下:
6、 接收消息[java] view plaincopy
package cn.g2room.mq.test;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消息接收類
*
* @createTime:Apr 7, 2013 5:11:11 PM
* @author:<a href="mailto:252909344@qq.com">迷蝶</a>
* @version:0.1
*@lastVersion: 0.1
* @updateTime:
*@updateAuthor: <a href="mailto:252909344@qq.com">迷蝶</a>
* @changesSum:
*
*/
public class ReceiveMessage {
privatestatic final String url = "tcp://localhost:61616";
privatestatic final String QUEUE_NAME = "G2Queue";
publicvoid receiveMessage() {
Connectionconnection = null;
try{
try{
ConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory(
url);
connection= connectionFactory.createConnection();
}catch (Exception e) {
System.out.println(e.toString());
}
connection.start();
Sessionsession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destinationdestination = session.createQueue(QUEUE_NAME);
//消息接收者,也就是消費者
MessageConsumerconsumer = session.createConsumer(destination);
consumeMessagesAndClose(connection,session, consumer);
}catch (Exception e) {
System.out.println(e.toString());
}
}
/**
* 接收和關閉消息,如遇到消息內容爲close則,關閉鏈接
*
* @param connection JMS 客戶端到JMSProvider 的鏈接
* @param session 發送或接收消息的線程
* @param consumer 消息接收對象
* @throws JMSException
* @auther <ahref="mailto:252909344@qq.com">迷蝶</a>
* Apr 8, 2013 10:31:55 AM
*/
protectedvoid consumeMessagesAndClose(Connection connection,
Sessionsession, MessageConsumer consumer) throws JMSException {
do{
Messagemessage = consumer.receive(1000);
if("close".equals(message)){
consumer.close();
session.close();
connection.close();
}
if(message != null) {
onMessage(message);
}
}while (true);
}
publicvoid onMessage(Message message) {
try{
if(message instanceof TextMessage) {
TextMessagetxtMsg = (TextMessage) message;
Stringmsg = txtMsg.getText();
System.out.println("Received:" + msg);
}
}catch (Exception e) {
e.printStackTrace();
}
}
publicstatic void main(String args[]) {
ReceiveMessagerm = new ReceiveMessage();
rm.receiveMessage();
}
}
運行結果以下: