前面的文章已經寫過MQ的相關概念,這裏再也不贅述。java
ActiveMQ是Apache下的開源項目,徹底支持JMS1.1和J2EE1.4規範的JMS Provider實現。web
解壓後的文件夾結構:數據庫 |
直接雙擊這個「wrapper.exe」便可apache |
以後能夠在瀏覽器輸入http://localhost:8161/瀏覽器 |
點擊Manage ActiveMQ broker,會彈出身份驗證,輸入admin,admin便可session |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.linkedbear</groupId> <artifactId>ActiveMQ-Demo</artifactId> <version>0.0.1-SNAPSHOT</version> <properties> <activemq.version>5.15.4</activemq.version> </properties> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- ActiveMQ --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>${activemq.version}</version> </dependency> <!-- 熱部署 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
以前的文章中寫過,JMS的消息傳遞有兩種模式,前面的RocketMQ中只寫了一對一模式,本篇文章將會編寫兩種模式。app
/** * 生產者Controller * @Title ProducerQueueController * @author LinkedBear * @Time 2018年8月3日 下午4:52:49 */ @Controller public class ProducerQueueController { @RequestMapping("/queueProduceMessage") @ResponseBody public Map<String, Object> queueProduceMessage() throws Exception { //JMS的使用比較相似於JDBC與Hibernate //1. 建立一個鏈接工廠(相似於JDBC中的註冊驅動),須要傳入TCP協議的ActiveMQ服務地址 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); //2. 建立鏈接(相似於DriverManager.getConnection) Connection connection = connectionFactory.createConnection(); //3. 開啓鏈接(ActiveMQ建立的鏈接是須要手動開啓的) connection.start(); //注意不是open。。。 //4. 獲取session(相似於Hibernate中的session,都是用會話來進行操做) //裏面有兩個參數,參數1爲是否開啓事務,參數2爲消息確認模式 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5. 建立一對一的消息隊列 Queue queue = session.createQueue("test_queue"); //6. 建立一條消息 String text = "test queue message" + Math.random(); TextMessage message = session.createTextMessage(text); //7. 消息須要發送方,要建立消息發送方(生產者),並綁定到某個消息隊列上 MessageProducer producer = session.createProducer(queue); //8. 發送消息 producer.send(message); //9. 關閉鏈接 producer.close(); session.close(); connection.close(); //------顯示發送的消息到視圖上------ Map<String, Object> map = new HashMap<>(); map.put("message", text); return map; } }
/** * 消費者Controller * @Title ConsumerQueueController * @author LinkedBear * @Time 2018年8月3日 下午4:52:56 */ @Controller public class ConsumerQueueController { @RequestMapping("/queueGetMessage1") public void queueGetMessage1() throws Exception { //1. 建立一個鏈接工廠,須要傳入TCP協議的ActiveMQ服務地址 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); //2. 建立鏈接 Connection connection = connectionFactory.createConnection(); //3. 開啓鏈接 connection.start(); //注意不是open。。。 //4. 獲取session //裏面有兩個參數,參數1爲是否開啓事務,參數2爲消息確認模式 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5. 建立一對一的消息隊列 Queue queue = session.createQueue("test_queue"); //------------前5步都是相同的,如下爲不一樣---------------- //6. 建立消費者 MessageConsumer consumer = session.createConsumer(queue); //7. 使用監聽器監聽隊列中的消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { String text = textMessage.getText(); System.out.println("收到消息:" + text); } catch (JMSException e) { e.printStackTrace(); } } }); //因爲設置監聽器後不能立刻結束方法,要在這裏加一個等待點 System.in.read(); //8. 關閉鏈接 consumer.close(); session.close(); connection.close(); } @RequestMapping("/queueGetMessage2") public void queueGetMessage2() throws Exception //(徹底相同,再也不重複) }
先執行兩個消息的消費者框架 |
執行http://localhost:8080/queueProduceMessage: 可是隻收到一條消息 |
/** * 生產者Controller * @Title ProducerTopicController * @author LinkedBear * @Time 2018年8月3日 下午4:52:49 */ @Controller public class ProducerTopicController { @RequestMapping("/topicProduceMessage") @ResponseBody public Map<String, Object> topicProduceMessage() throws Exception { //JMS的使用比較相似於JDBC與Hibernate //1. 建立一個鏈接工廠(相似於JDBC中的註冊驅動),須要傳入TCP協議的ActiveMQ服務地址 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); //2. 建立鏈接(相似於DriverManager.getConnection) Connection connection = connectionFactory.createConnection(); //3. 開啓鏈接(ActiveMQ建立的鏈接是須要手動開啓的) connection.start(); //注意不是open。。。 //4. 獲取session(相似於Hibernate中的session,都是用會話來進行操做) //裏面有兩個參數,參數1爲是否開啓事務,參數2爲消息確認模式 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5. 建立一對多的消息廣播 Topic topic = session.createTopic("test_topic"); //6. 建立一條消息 String text = "test topic message" + Math.random(); TextMessage message = session.createTextMessage(text); //7. 消息須要發送方,要建立消息發送方(生產者),並廣播到某個消息廣播端上 MessageProducer producer = session.createProducer(topic); //8. 發送消息 producer.send(message); //9. 關閉鏈接 producer.close(); session.close(); connection.close(); //------顯示發送的消息到視圖上------ Map<String, Object> map = new HashMap<>(); map.put("message", text); return map; } }
/** * 消費者Controller * @Title ConsumerTopicController * @author LinkedBear * @Time 2018年8月3日 下午4:52:56 */ @Controller public class ConsumerTopicController { @RequestMapping("/topicGetMessage") public void topicGetMessage() throws Exception { //1. 建立一個鏈接工廠,須要傳入TCP協議的ActiveMQ服務地址 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); //2. 建立鏈接 Connection connection = connectionFactory.createConnection(); //3. 開啓鏈接 connection.start(); //注意不是open。。。 //4. 獲取session //裏面有兩個參數,參數1爲是否開啓事務,參數2爲消息確認模式 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5. 建立一對多的消息廣播 Topic topic = session.createTopic("test_topic"); //------------前5步都是相同的,如下爲不一樣---------------- //6. 建立消費者 MessageConsumer consumer = session.createConsumer(topic); //7. 使用監聽器監聽隊列中的消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { String text = textMessage.getText(); System.out.println("收到消息:" + text); } catch (JMSException e) { e.printStackTrace(); } } }); //因爲設置監聽器後不能立刻結束方法,要在這裏加一個等待點 System.in.read(); //8. 關閉鏈接 consumer.close(); session.close(); connection.close(); } @RequestMapping("/topicGetMessage2") public void topicGetMessage2() throws Exception //(徹底相同,再也不重複) }
先執行兩個消息的消費者 |
執行http://localhost:8080/topicProduceMessage: 此次收到了兩條消息 |
從這兩種消息中間件的編寫過程來看,兩種產品的區別是比較大的,下面就這兩種產品進行多方面對比。
參考文章:http://www.javashuo.com/article/p-gqzlvfbx-p.html
比較項 |
RocketMQ |
ActiveMQ |
語言支持 |
只支持Java |
多語言,Java爲主 |
可用性 |
分佈式 |
主從 |
JMS規範 |
經常使用的使用方式沒有遵循JMS |
嚴格遵循JMS規範 |
消息持久化 |
硬盤 |
內存,硬盤,數據庫 |
部署方式 |
獨立部署 |
獨立部署、嵌入應用,能夠與Spring很好的整合 |
社區活躍 |
活躍 |
不很活躍 |