RabbitMQ是基於AMQP的一款消息管理系統。AMQP(Advanced Message Queuing Protocol),是一個提供消息服務的應用層標準高級消息隊列協議,其中RabbitMQ就是基於這種協議的一種實現。算法
常見mq:spring
RabbitMq有5種經常使用的消息模型網絡
這是最簡單的消息模型,以下圖:
app
生產者將消息發送到隊列,消費者從隊列中獲取消息,隊列是存儲消息的緩衝區。負載均衡
再演示代碼以前,咱們先建立一個工程rabbitmq-demo,並編寫一個工具類,用於提供與mq服務建立鏈接異步
public class ConnectionUtil { /** * 創建與RabbitMQ的鏈接 * @return * @throws Exception */ public static Connection getConnection() throws Exception { //定義鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置服務地址 factory.setHost("192.168.18.130"); //端口 factory.setPort(5672); //設置帳號信息,用戶名、密碼、vhost factory.setUsername("admin"); factory.setPassword("admin"); // 經過工程獲取鏈接 Connection connection = factory.newConnection(); return connection; } }
接下來是生產者發送消息,其過程包括:1.與mq服務創建鏈接,2.創建通道,3.聲明隊列(有相同隊列則不建立,沒有則建立),4.發送消息,代碼以下:分佈式
public class Send { private static final String QUEUE_NAME = "basic_queue"; public static void main(String[] args) throws Exception { //消息發送端與mq服務建立鏈接 Connection connection = ConnectionUtil.getConnection(); //創建通道 Channel channel = connection.createChannel(); //聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "hello world"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("生產者已發送:" + message); channel.close(); connection.close(); } }
消費者在接收消息的過程須要經歷以下幾個步驟: 1.與mqfuwu創建鏈接,2.創建通道,3.聲明隊列,4,接收消息,代碼以下:ide
public class Consumer1 { private static final String QUEUE_NAME = "basic_queue"; public static void main(String[] args) throws Exception { //消息消費者與mq服務創建鏈接 Connection connection = ConnectionUtil.getConnection(); //創建通道 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,而且處理,這個方法相似事件監聽,若是有消息的時候,會被自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println("消費者1接收到消息:" + msg); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
消息的接收與消費使用都須要在一個匿名內部類DefaultConsumer中完成spring-boot
注意:隊列須要提早聲明,若是未聲明就使用隊列,則會報錯。若是不清楚生產者和消費者誰先聲明,爲了保證不報錯,生產者和消費者都聲明隊列,隊列的建立會保證冪等性,也就是說生產者和消費者都聲明同一個隊列,則只會建立一個隊列工具
在基本消息模型中,一個生產者對應一個消費者,而實際生產過程當中,每每消息生產會發送不少條消息,若是消費者只有一個的話效率就會很低,所以rabbitmq有另一種消息模型,這種模型下,一個生產發送消息到隊列,容許有多個消費者接收消息,可是一條消息只會被一個消費者獲取。
與基本消息模型基本一致,這裏測試循環發佈20條消息:
public class Send { private static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 循環發佈任務 for (int i = 1; i <= 20; i++) { // 消息內容 String message = "task .. " + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("生產者發送消息:" + message); Thread.sleep(500); } channel.close(); connection.close(); } }
public class Consumer1 { private static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.println("消費者1接收到消息:" + msg); try { Thread.sleep(50);//模擬消費耗時 } catch (InterruptedException e) { e.printStackTrace(); } } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
public class Consumer2 { private static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException { String msg = new String(body); System.out.println("消費者2接收到消息:" + msg); try { Thread.sleep(50);//模擬消費耗時 } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
此時有兩個消費者監聽同一個隊列,當兩個消費者都工做時,生成者發送消息,就會按照負載均衡算法分配給不一樣消費者,以下圖:
在以前的模型中,一條消息只能被一個消費者獲取,而在訂閱模式中,能夠實現一條消息被多個消費者獲取。在這種模型下,消息傳遞過程當中比以前多了一個exchange交換機,生產者不是直接發送消息到隊列,而是先發送給交換機,經由交換機分配到不一樣的隊列,而每一個消費者都有本身的隊列:
解讀:
一、1個生產者,多個消費者
二、每個消費者都有本身的一個隊列
三、生產者沒有將消息直接發送到隊列,而是發送到了交換機
四、每一個隊列都要綁定到交換機
五、生產者發送的消息,通過交換機到達隊列,實現一個消息被多個消費者獲取的目的
X(exchange)交換機的類型有如下幾種:
Fanout:廣播,交換機將消息發送到全部與之綁定的隊列中去 Direct:定向,交換機按照指定的Routing Key發送到匹配的隊列中去 Topics:通配符,與Direct大體相同,不一樣在於Routing Key能夠根據通配符進行匹配
注意:在發佈訂閱模型中,生產者只負責發消息到交換機,至於消息該怎麼發,以及發送到哪一個隊列,生產者都不負責。通常由消費者建立隊列,而且綁定到交換機
在廣播模式下,消息發送的流程以下:
public class Send { private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明exchange,指定類型爲fanout channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String message = "hello world"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println("生產者發送消息:" + message); channel.close(); connection.close(); } }
public class Consumer1 { private static final String QUEUE_NAME = "fanout_queue_1"; private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //消費者聲明本身的隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 聲明exchange,指定類型爲direct channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //消費者將隊列與交換機進行綁定 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.println("消費者1獲取到消息:" + msg); } }); } }
其餘消費者只需修改QUEUE_NAME便可
注意:exchange與隊列同樣都須要提早聲明,若是未聲明就使用交換機,則會報錯。若是不清楚生產者和消費者誰先聲明,爲了保證不報錯,生產者和消費者都聲明交換機,一樣的,交換機的建立也會保證冪等性。
在fanout模型中,生產者發佈消息,全部消費者均可以獲取全部消息。在路由模式(Direct)中,能夠實現不一樣的消息被不一樣的隊列消費,在Direct模式下,交換機再也不將消息發送給全部綁定的隊列,而是根據Routing Key將消息發送到指定的隊列,隊列在與交換機綁定時會設定一個Routing Key,而生產者發送的消息時也須要攜帶一個Routing Key。
如圖所示,消費者C1的隊列與交換機綁定時設置的Routing Key是「error」, 而C2的隊列與交換機綁定時設置的Routing Key包括三個:「info」,「error」,「warning」,假如生產者發送一條消息到交換機,並設置消息的Routing Key爲「info」,那麼交換機只會將消息發送給C2的隊列。
public class Send { private static final String EXCHANGE_NAME = "direct_exchange"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明exchange,指定類型爲direct channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String message = "新增一個訂單"; //生產者發送消息時,設置消息的Routing Key:"insert" channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes()); System.out.println("生產者發送消息:" + message); channel.close(); connection.close(); } }
public class Consumer1 { private static final String QUEUE_NAME = "direct_queue_1"; private static final String EXCHANGE_NAME = "direct_exchange"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //消費者聲明本身的隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //聲明交換機 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //消費者將隊列與交換機進行綁定,而且設置Routing Key:"insert" channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert"); channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.println("消費者1獲取到消息:" + msg); } }); } }
其餘消費者須要修改隊列名QUEUE_NAME和Routing Key,上述生成者發送的消息,消費者1是能夠獲取到的
Topic類型的Exchange與Direct相比,都是能夠根據RoutingKey把消息路由到不一樣的隊列。只不過Topic類型Exchange可讓隊列在綁定Routing key 的時候使用通配符
Routingkey 通常都是有一個或多個單詞組成,多個單詞之間以」.」分割,例如: item.insert
通配符規則:
#:匹配一個或多個詞 *:匹配很少很多剛好1個詞
舉例:
audit.#:可以匹配audit.irs.corporate 或者 audit.irs audit.*:只能匹配audit.irs
Topics生產者代碼與Direct大體相同,只不過子聲明交換機時,將類型設爲BuiltinExchangeType.TOPIC(topic),
消費者代碼也與Direct大體相同,也是在聲明交換機時設置類型爲topic,代碼再也不演示
Spring AMQP是對AMQP的一種封裝,目的是可以讓咱們更簡便的使用消息隊列,下面介紹一下Spring AMQP在Spring boot中的使用方法
添加AMQP的啓動器:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
在application.yml中添加RabbitMQ的地址:
spring: rabbitmq: host: 192.168.18.130 username: admin password: admin
消費者須要定義一個類,類中定義監聽隊列的方法
@Component public class Listener { @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "spring.test.queue", durable = "false"), exchange = @Exchange(value = "spring.test.exchange", type = ExchangeTypes.DIRECT), key = "insert" ) ) public void listen(String msg){ System.out.println("消費者接受到消息:" + msg); } }
註解:
@Component:保證監聽類被spring掃描到
@RabbitListener:
@RabbitListener包含不少內容,在發佈訂閱模式中,咱們可使用其中的「QueueBinding[] bindings」,其中QueueBinding底層以下:
其中Queue表示隊列,Exchange表示交換機,key表示Routing Key
@RabbitListener( bindings = @QueueBinding( value = @Queue(value = "spring.test.queue", durable = "false"), exchange = @Exchange(value = "spring.test.exchange", type = ExchangeTypes.DIRECT), key = "insert" ) )
@Queue會建立隊列
@Exchange會建立交換機
@QueueBinding會綁定隊列和交換機
能夠經過註解引入AmqpTemplate:
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAMQPTest { @Resource private AmqpTemplate template; @Test public void testSendMsg() throws InterruptedException { String message = "hello spring"; template.convertAndSend("spring.test.exchange", "insert", message); System.out.println("生產者發送消息:" + message); Thread.sleep(10000);//等待10s,讓測試方法延遲結束,防止消費者將來得及獲取消息 } }
RabbitMQ有一個ACK機制,消費者在接收到消息後會向mq服務發送回執ACK,告知消息已被接收。這種ACK分爲兩種狀況:
若是消費者沒有發送ACK,則消息會一直保留在隊列中,等待下次接收。但這裏存在一個問題,就是一旦消費者發送了ACK,若是消費者後面宕機,則消息會丟失。所以自動ACK不能保證消費者在接收到消息以後可以正常完成業務功能,所以須要在消息被充分利用以後,手動ACK確認
自動ACK,basicConsume方法中將autoAck參數設爲true便可:
手動ack,在匿名內部類中,手動發送ACK:
固然,若是設置了手動ack,但又不手動發送ACK確認,消息會一直停留在隊列中,可能形成消息的重複獲取
消息確認機制(ACK)可以保證消費者不丟失消息,但假如消費者在獲取消息以前mq服務宕機,則消息也會丟失,所以要保證消息在服務端不丟失,則須要將消息進行持久化。隊列、交換機、消息都要持久化。
生成者在發送消息過程當中也可能出現錯誤或者網絡延遲燈故障,致使消息未成功發送到交換機或者隊列,或重複發送消息,爲了解決這個問題,rabbitmq中有多個解決辦法:
用事務將消息發送代碼包圍起來:
以下所示,在發送代碼前執行channel.confirmSelect(),若是消息未正常發送,就會進入if代碼塊,能夠進行重發也能夠對失敗消息進行記錄
顧名思義,就是生產者發送消息後不用等待服務端回饋發送狀態,能夠繼續執行後面的代碼,對於失敗消息重發進行異步處理:
生產者確認機制,確保消息正確發送,若是發送失敗會有錯誤回執,從而觸發重試
spring: rabbitmq: publisher-confirms: true