activemq是Apache的一款開源消息總線,主要用來作消息的分發。java
首先須要下載MQ,進行啓動。spring
而後在控制檯建立隊列,初始用戶名密碼admin/admin。數據庫
而後能夠寫生產者、消費者進行測試了。因爲activemq支持spring,所以有兩種不一樣的寫法:apache
方法一:建立factory, connection, session, destination, producer,consumer服務器
方法二:經過配置文件進行建立(何嘗試)。session
最初在其做用的理解上有一些誤差,其實是,在發送端引入MQ的jar包,向指定的MQ服務器發送信息,MQ會自動將其添加到消息隊列中,用控制檯能夠比較清晰的看到隊列狀況:http://localhost:8161/admin/ide
在接收端循環掃描要接收的隊列,當讀取到信息時進行接收處理。測試
須要注意的是,mq支持持久化,可將消息持久化到本地文件、數據庫。this
另外一個須要注意的地方是,建立會話session時,第一個參數爲true時,須要向服務器確認消息的接收。不然服務器認爲沒有成功接收,引用一下其餘同窗的話:spa
createSession(paramA,paramB);
paramA 取值有 : true or false 表示是否支持事務
paramB 取值有:Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE,SESSION_TRANSACTED
createSession(paramA,paramB);
paramA是設置事務的,paramB設置acknowledgment mode
paramA設置爲false時:paramB的值可爲Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一個。
paramA設置爲true時:paramB的值忽略, acknowledgment mode被jms服務器設置爲SESSION_TRANSACTED 。
Session.AUTO_ACKNOWLEDGE爲自動確認,客戶端發送和接收消息不須要作額外的工做。
Session.CLIENT_ACKNOWLEDGE爲客戶端確認。客戶端接收到消息後,必須調用javax.jms.Message的acknowledge方法。jms服務器纔會刪除消息。
DUPS_OK_ACKNOWLEDGE容許副本的確認模式。一旦接收方應用程序的方法調用從處理消息處返回,會話對象就會確認消息的接收;並且容許重複確認。在須要考慮資源使用時,這種模式很是有效。
附代碼
接收端:
package com.receiver; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.PrintStream; import java.util.Properties; import javax.jms.*; import org.apache.activemq.*; public class MessageReceiver implements IMessageReceiver { public ActiveMQConnectionFactory connectionFactory = null; public Connection connection = null; public Session session = null; public Destination destination = null; public MessageConsumer getConsumer() { return consumer; } public void setConsumer(MessageConsumer consumer) { this.consumer = consumer; } public MessageConsumer consumer = null; //初始化,建立factory, connection, session, destination, producer public MessageReceiver(){ try { InputStream inProperties=MessageReceiver.class.getResourceAsStream("../config/connection.properties"); Properties properties = new Properties(); properties.load(inProperties); //建立factory connectionFactory = new ActiveMQConnectionFactory(properties.getProperty("name"), properties.getProperty("password"), properties.getProperty("brokerURL")); //建立connection connection = connectionFactory.createConnection(); connection.start(); //獲取操做鏈接 session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE); //獲取消息目的地,需在控制檯配置 destination = session.createQueue(properties.getProperty("queueName")); //獲得消息接收者 consumer = session.createConsumer(destination); } catch (Exception e) { e.printStackTrace(); } } public void ReceiveMessage(MessageConsumer consumer) { int i = 0; while(true){ try { TextMessage message = (TextMessage) consumer.receive(RECEIVE_TIME); if(message != null){ System.out.println("queue1 "+message.getText()+" "+i); FileOutputStream out; out = new FileOutputStream("D:/test.txt"); PrintStream p=new PrintStream(out); p.println("queue1 "+message.getText()+" "+i); out.close(); } Thread.sleep(1000); i++; } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public void CloseConnection(Connection connection) { if(connection != null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } public ConnectionFactory getConnectionFactory() { return connectionFactory; } public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; } public Connection getConnection() { return connection; } public void setConnection(Connection connection) { this.connection = connection; } public Session getSession() { return session; } public void setSession(Session session) { this.session = session; } public Destination getDestination() { return destination; } public void setDestination(Destination destination) { this.destination = destination; } }
package com.receiver; import javax.jms.JMSException; import javax.jms.TextMessage; public class ReceiveMain { /** * @param args * @throws JMSException */ public static void main(String[] args) throws JMSException { MessageReceiver messageReceiver = new MessageReceiver(); messageReceiver.ReceiveMessage(messageReceiver.getConsumer()); messageReceiver.CloseConnection(messageReceiver.getConnection()); } }
發送端:
package com.sender; import java.io.InputStream; import java.util.Properties; import javax.jms.*; import org.apache.activemq.*; public class MessageSender implements IMessageSender { public ActiveMQConnectionFactory connectionFactory = null; public Connection connection = null; public Session session = null; public Destination destination = null; public MessageProducer producer = null; //初始化,建立factory, connection, session, destination, producer public MessageSender(){ try { InputStream inProperties=MessageSender.class.getResourceAsStream("../config/connection.properties"); Properties properties = new Properties(); properties.load(inProperties); //建立factory connectionFactory = new ActiveMQConnectionFactory(properties.getProperty("name"), properties.getProperty("password"), properties.getProperty("brokerURL")); //建立connection connection = connectionFactory.createConnection(); connection.start(); //獲取操做鏈接 session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE); //獲取消息目的地,需在控制檯配置 destination = session.createQueue(properties.getProperty("queueName")); //獲得消息發送者 producer = session.createProducer(destination); //設置不持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } catch (Exception e) { e.printStackTrace(); } } @Override public TextMessage CreateMessage(Session session, int i) { String strMessage = "hello world! "+i; TextMessage message = null; try { message = session.createTextMessage(strMessage); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } return message; } public void SendMessage(TextMessage message, MessageProducer producer) { try { producer.send(message); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void CloseConnection(Connection connection) { if(connection != null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } public ConnectionFactory getConnectionFactory() { return connectionFactory; } public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; } public Connection getConnection() { return connection; } public void setConnection(Connection connection) { this.connection = connection; } public Session getSession() { return session; } public void setSession(Session session) { this.session = session; } public Destination getDestination() { return destination; } public void setDestination(Destination destination) { this.destination = destination; } public MessageProducer getProducer() { return producer; } public void setProducer(MessageProducer producer) { this.producer = producer; } }
package com.sender; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintStream; import javax.jms.JMSException; import javax.jms.TextMessage; public class SendMain { /** * @param args * @throws JMSException */ public static void main(String[] args) throws JMSException { MessageSender messageSender = new MessageSender(); for(int i = 0;i < 10;i++){ TextMessage textMessage = messageSender.CreateMessage(messageSender.getSession(),i); messageSender.SendMessage(textMessage, messageSender.getProducer()); System.out.println("send message sucess! : " + i); FileOutputStream out; try { out = new FileOutputStream("D:/test.txt"); PrintStream p=new PrintStream(out); p.println("send message sucess! : " + i); out.close(); } catch (FileNotFoundException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } messageSender.getSession().commit(); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } messageSender.CloseConnection(messageSender.getConnection()); } }