在客戶端與服務器進行通信時.客戶端調用後,必須等待服務對象完成處理返回結果才能繼續執行。java
客戶與服務器對象的生命週期緊密耦合,客戶進程和服務對象進程都都必須正常運行;若是因爲服務對象崩潰或者網絡故障致使用戶的請求不可達,客戶會受到異常web
點對點通訊: 客戶的一次調用只發送給某個單獨的目標對象。spring
面向消息的中間件(MessageOrlented MiddlewareMOM)較好的解決了以上問
題。發送者將消息發送給消息服務器,消息服務器將消感存放在若千隊列中,在合適
的時候再將消息轉發給接收者。數據庫
這種模式下,發送和接收是異步的,發送者無需等
待; 兩者的生命週期未必相同: 發送消息的時候接收者不必定運行,接收消息的時候
發送者也不必定運行;一對多通訊: 對於一個消息能夠有多個接收者。apache
JMS是java的消息服務,JMS的客戶端之間能夠經過JMS服務進行異步的消息傳輸。windows
○ Point-to-Point(P2P) --- 點對點瀏覽器
○ Publish/Subscribe(Pub/Sub)--- 發佈訂閱springboot
即:點對點和發佈訂閱模型服務器
P2P網絡
若是你但願發送的每一個消息都應該被成功處理的話,那麼你須要P2P模式。
A用戶與B用戶發送消息
Pub/Sub模式圖
涉及到的概念 :
主題(Topic)
發佈者(Publisher)
訂閱者(Subscriber)
客戶端將消息發送到主題。多個發佈者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。
Pub/Sub的特色
每一個消息能夠有多個消費者
發佈者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須建立一個訂閱者以後,才能消費發佈者的消息,並且爲了消費消息,訂閱者必須保持運行的狀態。
爲了緩和這樣嚴格的時間相關性,JMS容許訂閱者建立一個可持久化的訂閱。這樣,即便訂閱者沒有被激活(運行),它也能接收到發佈者的消息。
若是你但願發送的消息能夠不被作任何處理、或者被一個消息者處理、或者能夠被多個消費者處理的話,那麼能夠採用Pub/Sub模型
消息的消費
在JMS中,消息的產生和消息是異步的。對於消費來講,JMS的消息者能夠經過兩種方式來消費消息。
○ 同步
訂閱者或接收者調用receive方法來接收消息,receive方法在可以接收到消息以前(或超時以前)將一直阻塞
○ 異步
訂閱者或接收者能夠註冊爲一個消息監聽器。當消息到達以後,系統自動調用監聽器的onMessage方法。
用戶註冊、訂單修改庫存、日誌存儲
畫圖演示
是使用Erlang編寫的一個開源的消息隊列,自己支持不少的協議:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它變的很是重量級,更適合於企業級的開發。同時實現了一個經紀人(Broker)構架,這意味着消息在發送給客戶端時先在中心隊列排隊。對路由(Routing),負載均衡(Load balance)或者數據持久化都有很好的支持。
是一個Key-Value的NoSQL數據庫,開發維護很活躍,雖然它是一個Key-Value數據庫存儲系統,但它自己支持MQ功能,因此徹底能夠當作一個輕量級的隊列服務來使用。對於RabbitMQ和Redis的入隊和出隊操做,各執行100萬次,每10萬次記錄一次執行時間。測試數據分爲128Bytes、512Bytes、1K和10K四個不一樣大小的數據。實驗代表:入隊時,當數據比較小時Redis的性能要高於RabbitMQ,而若是數據大小超過了10K,Redis則慢的沒法忍受;出隊時,不管數據大小,Redis都表現出很是好的性能,而RabbitMQ的出隊性能則遠低於Redis。
|
入隊 |
出隊 |
||||||
|
128B |
512B |
1K |
10K |
128B |
512B |
1K |
10K |
Redis |
16088 |
15961 |
17094 |
25 |
15955 |
20449 |
18098 |
9355 |
RabbitMQ |
10627 |
9916 |
9370 |
2366 |
3219 |
3174 |
2982 |
1588 |
號稱最快的消息隊列系統,尤爲針對大吞吐量的需求場景。ZMQ可以實現RabbitMQ不擅長的高級/複雜的隊列,可是開發人員須要本身組合多種技術框架,技術上的複雜度是對這MQ可以應用成功的挑戰。ZeroMQ具備一個獨特的非中間件的模式,你不須要安裝和運行一個消息服務器或中間件,由於你的應用程序將扮演了這個服務角色。你只須要簡單的引用ZeroMQ程序庫,可使用NuGet安裝,而後你就能夠愉快的在應用程序之間發送消息了。可是ZeroMQ僅提供非持久性的隊列,也就是說若是down機,數據將會丟失。其中,Twitter的Storm中使用ZeroMQ做爲數據流的傳輸。
是Apache下的一個子項目。 相似於ZeroMQ,它可以以代理人和點對點的技術實現隊列。同時相似於RabbitMQ,它少許代碼就能夠高效地實現高級應用場景。RabbitMQ、ZeroMQ、ActiveMQ均支持經常使用的多種語言客戶端 C++、Java、.Net,、Python、 Php、 Ruby等。
Kafka是Apache下的一個子項目,是一個高性能跨語言分佈式Publish/Subscribe消息隊列系統,而Jafka是在Kafka之上孵化而來的,即Kafka的一個升級版。具備如下特性:快速持久化,能夠在O(1)的系統開銷下進行消息持久化;高吞吐,在一臺普通的服務器上既能夠達到10W/s的吞吐速率;徹底的分佈式系統,Broker、Producer、Consumer都原生自動支持分佈式,自動實現複雜均衡;支持Hadoop數據並行加載,對於像Hadoop的同樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka經過Hadoop的並行加載機制來統一了在線和離線的消息處理,這一點也是本課題所研究系統所看重的。Apache Kafka相對於ActiveMQ是一個很是輕量級的消息系統,除了性能很是好以外,仍是一個工做良好的分佈式系統。
其餘一些隊列列表HornetQ、Apache Qpid、Sparrow、Starling、Kestrel、Beanstalkd、Amazon SQS就再也不一一分析。
ActiveMQ部署其實很簡單,和全部Java同樣,要跑java程序就必須先安裝JDK並配置好環境變量,這個很簡單。
而後解壓下載的apache-activemq-5.10-20140603.133406-78-bin.zip壓縮包到一個目錄,獲得解壓後的目錄結構以下圖:
進入bin目錄,發現有win32和win64兩個文件夾,這2個文件夾分別對應windows32位和windows64位操做系統的啓動腳本。
個人實驗環境是windowsXP,就進入win32目錄,會看到以下目錄結構。
其中activemq.bat即是啓動腳本,雙擊啓動。
ActiveMQ默認啓動到8161端口,啓動完了後在瀏覽器地址欄輸入:http://localhost:8161/admin要求輸入用戶名密碼,默認用戶名密碼爲admin、admin,這個用戶名密碼是在conf/users.properties中配置的。輸入用戶名密碼後即可看到以下圖的ActiveMQ控制檯界面了。
Number Of Consumers 消費者 這個是消費者端的消費者數量
Number Of Pending Messages 等待消費的消息 這個是當前未出隊列的數量。能夠理解爲總接收數-總出隊列數
Messages Enqueued 進入隊列的消息 進入隊列的總數量,包括出隊列的。 這個數量只增不減
Messages Dequeued 出了隊列的消息 能夠理解爲是消費這消費掉的數量
這個要分兩種狀況理解
在queues裏它和進入隊列的總數量相等(由於一個消息只會被成功消費一次),若是暫時不等是由於消費者還沒來得及消費。
在 topics裏 它由於多消費者從而致使數量會比入隊列數高。
簡單的理解上面的意思就是
當有一個消息進入這個隊列時,等待消費的消息是1,進入隊列的消息是1。
當消息消費後,等待消費的消息是0,進入隊列的消息是1,出隊列的消息是1。
在來一條消息時,等待消費的消息是1,進入隊列的消息就是2。
沒有消費者時 Pending Messages 和 入隊列數量同樣
有消費者消費的時候 Pedding會減小 出隊列會增長
到最後 就是 入隊列和出隊列的數量同樣多
以此類推,進入隊列的消息和出隊列的消息是池子,等待消費的消息是水流。
使用ActiveMQ完成點對點(p2p)通信模式
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency>
package com.hongmoshui; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Producter { public static void main(String[] args) throws JMSException { // ConnectionFactory :鏈接工廠,JMS // 用它建立鏈接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616"); // JMS 客戶端到JMS Provider 的鏈接 Connection connection = connectionFactory.createConnection(); connection.start(); // Session: 一個發送或接收消息的線程 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // Destination :消息的目的地;消息發送給誰. // 獲取session注意參數值my-queue是Query的名字 Destination destination = session.createQueue("my-queue"); // MessageProducer:消息生產者 MessageProducer producer = session.createProducer(destination); // 設置不持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 發送一條消息 for (int i = 1; i <= 5; i++) { sendMsg(session, producer, i); } connection.close(); } /** * 在指定的會話上,經過指定的消息生產者發出一條消息 * * @param session 消息會話 * @param producer 消息生產者 */ public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException { // 建立一條文本消息 TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i); // 經過消息生產者發出消息 producer.send(message); } }
package com.hongmoshui; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class JmsReceiver { public static void main(String[] args) throws JMSException { // ConnectionFactory :鏈接工廠,JMS // 用它建立鏈接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616"); // JMS 客戶端到JMS Provider 的鏈接 Connection connection = connectionFactory.createConnection(); connection.start(); // Session: 一個發送或接收消息的線程 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // Destination :消息的目的地;消息發送給誰. // 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置 Destination destination = session.createQueue("my-queue"); // 消費者,消息接收者 MessageConsumer consumer = session.createConsumer(destination); while (true) { TextMessage message = (TextMessage) consumer.receive(); if (null != message) { System.out.println("收到消息:" + message.getText()); } else break; } session.close(); connection.close(); } }
注:activeMQ的管理臺端口號默認爲8161,瀏覽器輸入http://127.0.0.1:8161,帳號:admin,密碼:admin
Number Of Consumers 消費者 這個是消費者端的消費者數量
Number Of Pending Messages 等待消費的消息 這個是當前未出隊列的數量。能夠理解爲總接收數-總出隊列數
Messages Enqueued 進入隊列的消息 進入隊列的總數量,包括出隊列的。 這個數量只增不減
Messages Dequeued 出了隊列的消息 能夠理解爲是消費這消費掉的數量
ActiveMQ消息簽收機制:
客戶端成功接收一條消息的標誌是一條消息被簽收,成功應答。
消息的簽收情形分兩種:
一、帶事務的session
若是session帶有事務,而且事務成功提交,則消息被自動簽收。若是事務回滾,則消息會被再次傳送。
二、不帶事務的session
不帶事務的session的簽收方式,取決於session的配置。
Activemq支持一下三種模式:
Session.AUTO_ACKNOWLEDGE 消息自動簽收
Session.CLIENT_ACKNOWLEDGE 客戶端調用acknowledge方法手動簽收
textMessage.acknowledge();//手動簽收
Session.DUPS_OK_ACKNOWLEDGE 不是必須簽收,消息可能會重複發送。在第二次從新傳送消息的時候,消息
只有在被確認以後,才認爲已經被成功地消費了。消息的成功消費一般包含三個階段:客戶接收消息、客戶處理消息和消息被確認。 在事務性會話中,當一個事務被提交的時候,確認自動發生。在非事務性會話中,消息什麼時候被確認取決於建立會話時的應答模式(acknowledgement mode)。
package com.hongmoshui; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class TOPSend { private static String BROKERURL = "tcp://127.0.0.1:61616"; private static String TOPIC = "my-topic"; public static void main(String[] args) throws JMSException { start(); } static public void start() throws JMSException { System.out.println("生產者已經啓動...."); // 建立ActiveMQConnectionFactory // 會話工廠 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL); Connection connection = activeMQConnectionFactory.createConnection(); // 啓動JMS 鏈接 connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(null); producer.setDeliveryMode(DeliveryMode.PERSISTENT); send(producer, session); System.out.println("發送成功!"); connection.close(); } static public void send(MessageProducer producer, Session session) throws JMSException { for (int i = 1; i <= 5; i++) { System.out.println("我是消息" + i); TextMessage textMessage = session.createTextMessage("我是消息" + i); Destination destination = session.createTopic(TOPIC); producer.send(destination, textMessage); } } }
package com.hongmoshui; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class TOPReceiver { private static String BROKERURL = "tcp://127.0.0.1:61616"; private static String TOPIC = "my-topic"; public static void main(String[] args) throws JMSException { start(); } static public void start() throws JMSException { System.out.println("消費點啓動..."); // 建立ActiveMQConnectionFactory // 會話工廠 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL); Connection connection = activeMQConnectionFactory.createConnection(); // 啓動JMS 鏈接 connection.start(); // 不開消息啓事物,消息主要發送消費者,則表示消息已經簽收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 建立一個隊列 Topic topic = session.createTopic(TOPIC); MessageConsumer consumer = session.createConsumer(topic); // consumer.setMessageListener(new // MsgListener()); while (true) { TextMessage textMessage = (TextMessage) consumer.receive(); if (textMessage != null) { System.out.println("接受到消息:" + textMessage.getText()); // textMessage.acknowledge();// // 手動簽收 // session.commit(); } else { break; } } connection.close(); } }
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.4.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!-- spring boot web支持:mvc,aop... --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
spring: activemq: broker-url: tcp://127.0.0.1:61616 user: admin password: admin queue: springboot-queue server: port: 8080
package com.hongmoshui.config; import javax.jms.Queue; import org.apache.activemq.command.ActiveMQQueue; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class QueueConfig { @Value("${queue}") private String queue; @Bean public Queue logQueue() { return new ActiveMQQueue(queue); } }
package com.hongmoshui.producer; import javax.jms.Queue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @Component @EnableScheduling public class Producer { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue queue; @Scheduled(fixedDelay = 5000) public void send() { jmsMessagingTemplate.convertAndSend(queue, "測試消息隊列" + System.currentTimeMillis()); } }
package com.hongmoshui; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableScheduling public class MQProducerStartApp { public static void main(String[] args) { SpringApplication.run(MQProducerStartApp.class, args); } }
spring: activemq: broker-url: tcp://127.0.0.1:61616 user: admin password: admin queue: springboot-queue server: port: 8081
package com.hongmoshui.consumer; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; @Component public class Consumer { @JmsListener(destination = "${queue}") public void receive(String msg) { System.out.println("監聽器收到msg:" + msg); } }
package com.hongmoshui; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class MQConSumerStartApp { public static void main(String[] args) { SpringApplication.run(MQConSumerStartApp.class, args); } }
產生緣由:網絡延遲傳輸中,會形成進行MQ重試中,在重試過程當中,可能會形成重複消費。
解決辦法:
1.使用全局MessageID 判斷消費方使用同一個,解決冪等性。
2.使用JMS可靠消息機制