消息中間件之ActiveMQ的Demo測試

一,消息中間件的簡單介紹:java

消息中間件利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並基於數據通訊來進行分佈式系統的集成mysql

 

二.JMSsql

 

JMSJava Messaging Service)是Java平臺上有關面向消息中間件的技術規範,它便於消息系統中的Java應用程序進行消息交換,而且經過提供標準的產生、發送、接收消息的接口簡化企業應用的開發。數據庫

 

       JMS自己只定義了一系列的接口規範,是一種與廠商無關的 API,用來訪問消息收發系統。它相似於 JDBC(java Database Connectivity):這裏,JDBC 是能夠用來訪問許多不一樣關係數據庫 API,而 JMS 則提供一樣與廠商無關的訪問方法,以訪問消息收發服務。apache

 

  JMS 定義了五種不一樣的消息正文格式:瀏覽器

 

    · TextMessage--一個字符串對象session

 

    · MapMessage--一套名稱-值對maven

 

    · ObjectMessage--一個序列化的 Java 對象tcp

 

    · BytesMessage--一個字節的數據流分佈式

 

    · StreamMessage -- Java 原始值的數據流

 

 

 

  JMS消息傳遞類型

    點對點模式:即一個生產者和一個消費者一一對應;

 

    發佈/訂閱模式:即一個生產者產生消息並進行發送後,能夠由多個消費者進行接收

--------------------------------------------------------

ActiveMQ官方網站下載:http://activemq.apache.org/

ActiveMQ安裝完成並啓動完成以後,在瀏覽器訪問http://IP地址:8161/ 便可進入ActiveMQ管理頁面  u/p:admin/admin

 

 

點對點模式Demo

 

建立Maven工程(jar)

pom.xml引入依賴

 

<dependencies>
  <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-client</artifactId>
    <version>5.13.4</version>
    </dependency>
</dependencies>

<build>
  <plugins>
    <!-- java編譯插件 -->
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-compiler-plugin</artifactId>
      <version>3.2</version>
      <configuration>
        <source>1.7</source>
        <target>1.7</target>
      <encoding>UTF-8</encoding>
      </configuration>
    </plugin>
  </plugins>
</build>

 

 

生產者:

 

 

public class QueueProducer {

 

public static void main(String[] args) throws JMSException {

 

//1.建立鏈接工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.93.131:61616");

//2.獲取鏈接
Connection connection = connectionFactory.createConnection();

//3.啓動鏈接
connection.start();

//4.獲取Session(參數1爲是否啓動事務,第二個參數:消息確認模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

//5.建立隊列對象
Queue queue = session.createQueue("test-queue");

//6.建立消息生產者
MessageProducer producer = session.createProducer(queue);

//7.建立消息

TextMessage textMessage = session.createTextMessage("AvtiveDemo測試");

//8.發送消息
producer.send(textMessage);

//9.關閉資源

producer.close();


}

 

}

 

 

消費者:

 


public class QueueConsumer {

 

public static void main(String[] args) throws JMSException, IOException {

 

//1.建立鏈接工廠
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.93.131:61616");
//2.獲取鏈接
Connection connection = connectionFactory.createConnection();
//3.啓動鏈接
connection.start();
//4.獲取session (參數1:是否啓動事務,參數2:消息確認模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立隊列對象
Queue queue = session.createQueue("test-queue");
//6.建立消息消費
MessageConsumer consumer = session.createConsumer(queue);

//7.監聽消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
  System.out.println("接收到消息:"+textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//8.等待鍵盤輸入
System.in.read();
//9.關閉資源
consumer.close();
session.close();
connection.close();
}

 

}

 

消費者:

public class QueueConsumer {

public static void main(String[] args) throws JMSException, IOException {

//1.建立鏈接工廠
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.93.131:61616");
//2.獲取鏈接
Connection connection = connectionFactory.createConnection();
//3.啓動鏈接
connection.start();
//4.獲取session (參數1:是否啓動事務,參數2:消息確認模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立隊列對象
Queue queue = session.createQueue("test-queue");
//6.建立消息消費
MessageConsumer consumer = session.createConsumer(queue);

//7.監聽消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到消息:"+textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//8.等待鍵盤輸入
System.in.read();
//9.關閉資源
consumer.close();
session.close();
connection.close();
}

}

 

 

 發佈/訂閱模式

生產者

public static void main(String[] args) throws JMSException {

//1.建立鏈接工廠
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.93.131:61616");
//2.獲取鏈接
Connection connection = connectionFactory.createConnection();
//3.啓動鏈接
connection.start();
//4.獲取session (參數1:是否啓動事務,參數2:消息確認模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立主題對象
Topic topic = session.createTopic("test-topic");
//6.建立消息生產者
MessageProducer producer = session.createProducer(topic);
//7.建立消息
TextMessage textMessage = session.createTextMessage("topic測試");
//8.發送消息
producer.send(textMessage);
//9.關閉資源
producer.close();
session.close();
connection.close();
}

 

消費者:消費者能夠建多個運行接收信息

public static void main(String[] args) throws JMSException, IOException {

//1.建立鏈接工廠 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.93.131:61616"); //2.獲取鏈接 Connection connection = connectionFactory.createConnection(); //3.啓動鏈接 connection.start(); //4.獲取session (參數1:是否啓動事務,參數2:消息確認模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.建立主題對象 //Queue queue = session.createQueue("test-queue"); Topic topic = session.createTopic("test-topic"); //6.建立消息消費 MessageConsumer consumer = session.createConsumer(topic); //7.監聽消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待鍵盤輸入 System.in.read(); //9.關閉資源 consumer.close(); session.close(); connection.close(); }

相關文章
相關標籤/搜索