消息中間件——RabbitMQ(六)理解Exchange交換機核心概念!

求關注

理解Exchange交換機核心概念!

前言

來了解RabbitMQ一個重要的概念:Exchange交換機java

1. Exchange概念

  • Exchange:接收消息,並根據路由鍵轉發消息所綁定的隊列。

交換機

藍色框:客戶端發送消息至交換機,經過路由鍵路由至指定的隊列。 黃色框:交換機和隊列經過路由鍵有一個綁定的關係。 綠色框:消費端經過監聽隊列來接收消息。git

2. 交換機屬性

Name:交換機名稱 Type:交換機類型——direct、topic、fanout、headers、sharding(此篇不講) Durability:是否須要持久化,true爲持久化 Auto Delete:當最後一個綁定到Exchange上的隊列刪除後,自動刪除該Exchange Internal:當前Exchange是否用於RabbitMQ內部使用,默認爲false Arguments:擴展參數,用於擴展AMQP協議自定製化使用github

3. Direct Exchange(直連)

  • 全部發送到Direct Exchange的消息被轉發到RouteKey中指定的Queue

注意:Direct模式可使用RabbitMQ自帶的Exchange:default Exchange,因此不須要將Exchange進行任何綁定(binding)操做,消息傳遞時,RouteKey必須徹底匹配纔會被隊列接收,不然該消息會被拋棄。面試

Direct Exchange結構圖

重點:routing key與隊列queues 的key保持一致,便可以路由到對應的queue中。編程

3.1 代碼演示

生產端:服務器

/**
 * 
* @ClassName: Producer4DirectExchange 
* @Description: 生產者
* @author Coder編程
* @date2019年7月19日 下午22:15:52 
*
 */
public class Producer4DirectExchange {

	
	public static void main(String[] args) throws Exception {
		
		//1建立ConnectionFactory
		Connection connection = ConnectionUtils.getConnection();
		//2建立Channel
		Channel channel = connection.createChannel();  
		//3 聲明
		String exchangeName = "test_direct_exchange";
		String routingKey = "test.direct";
		//4 發送
		String msg = "Coder編程 Hello World RabbitMQ 4  Direct Exchange Message ... ";
		channel.basicPublish(exchangeName, routingKey , null , msg.getBytes()); 		
	}
}

消費端:微信

/**
 * 
* @ClassName: Consumer4DirectExchange 
* @Description: 消費者
* @author Coder編程
* @date2019年7月19日 下午22:18:52 
*
 */
public class Consumer4DirectExchange {

	public static void main(String[] args) throws Exception {
		
		
		//建立ConnectionFactory
		Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();  
		//聲明
		String exchangeName = "test_direct_exchange";
		String exchangeType = "direct";
		String queueName = "test_direct_queue";
		String routingKey = "test.direct";
		
		//表示聲明瞭一個交換機
		channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
		//表示聲明瞭一個隊列
		channel.queueDeclare(queueName, false, false, false, null);
		//創建一個綁定關係:
		channel.queueBind(queueName, exchangeName, routingKey);
		
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //參數:隊列名稱、是否自動ACK、Consumer
        channel.basicConsume(queueName, true, consumer);  
        //循環獲取消息  
        while(true){  
            //獲取消息,若是沒有消息,這一步將會一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
	}
}

測試結果:app

打印結果

注意須要routingKey保持一致。能夠本身嘗試修改routingkey,是否能收到消息。學習

4. Topic Exchange

  • 全部發送到Topic Exchange的消息被轉發到全部管線RouteKey中指定Topic的Queue上
  • Exchange將RouteKey和某Topic進行模糊匹配,此時隊列須要綁定一個Topic

注意:可使用通配符進行模糊匹配 符號 "#" 匹配一個或多個詞 符號 "" 匹配很少很多一個詞 例如:"log.#" 可以匹配到 "log.info.oa" "log." 只會匹配到 "log.error"測試

Topic Exchange結構圖

在一堆消息中,每一個不一樣的隊列只關心本身須要的消息。

4.1 代碼演示

生產端:

/**
 * 
* @ClassName: Producer4TopicExchange 
* @Description: 生產者
* @author Coder編程
* @date2019年7月19日 下午22:32:41
*
 */
public class Producer4TopicExchange {

	
	public static void main(String[] args) throws Exception {
		
		//1建立ConnectionFactory
		Connection connection = ConnectionUtils.getConnection();
		//2建立Channel
		Channel channel = connection.createChannel();  
		//3聲明
		String exchangeName = "test_topic_exchange";
		String routingKey1 = "user.save";
		String routingKey2 = "user.update";
		String routingKey3 = "user.delete.abc";
		//4發送
		String msg = "Coder編程 Hello World RabbitMQ 4 Topic Exchange Message ...";
		channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); 
		channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes()); 	
		channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); 
		channel.close();  
        connection.close();  
	}
}

消費端:

/**
 * 
* @ClassName: Consumer4TopicExchange 
* @Description: 消費者
* @author Coder編程
* @date2019年7月19日 下午22:37:12
*
 */
public class Consumer4TopicExchange {

	public static void main(String[] args) throws Exception {
		
		
		//建立ConnectionFactory
		Connection connection = ConnectionUtils.getConnection();
        
        Channel channel = connection.createChannel();  
		// 聲明
		String exchangeName = "test_topic_exchange";
		String exchangeType = "topic";
		String queueName = "test_topic_queue";
		//String routingKey = "user.*";
		String routingKey = "user.*";
		// 1 聲明交換機 
		channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
		// 2 聲明隊列
		channel.queueDeclare(queueName, false, false, false, null);
		// 3 創建交換機和隊列的綁定關係:
		channel.queueBind(queueName, exchangeName, routingKey);
		
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //參數:隊列名稱、是否自動ACK、Consumer
        channel.basicConsume(queueName, true, consumer);  
        //循環獲取消息  
        while(true){  
            //獲取消息,若是沒有消息,這一步將會一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
	}
}

測試結果:

打印結果

注意一個問題:須要進行解綁

5. Fanout Exchange

  • 不處理路由鍵,只須要簡單的將隊裏綁定到交換機上
  • 發送到交換機的消息都會被轉發到與該交換機綁定的全部隊列上
  • Fanout交換機轉發消息是最快的

Fanout Exchange結構圖

5.1 代碼演示

生產端:

/**
 * 
* @ClassName: Producer4FanoutExchange 
* @Description: 生產者
* @author Coder編程
* @date2019年7月19日 下午23:01:16
*
 */
public class Producer4FanoutExchange {

	
	public static void main(String[] args) throws Exception {
		
		//1建立ConnectionFactory
		Connection connection = ConnectionUtils.getConnection();
		//2 建立Channel
		Channel channel = connection.createChannel();  
		//3 聲明
		String exchangeName = "test_fanout_exchange";
		//4 發送
		for(int i = 0; i < 10; i ++) {
			String msg = "Coder 編程  Hello World RabbitMQ 4 FANOUT Exchange Message ...";
			channel.basicPublish(exchangeName, "", null , msg.getBytes()); 			
		}
		channel.close();  
        connection.close();  
	}
	
}

消費端:

/**
 * 
* @ClassName: Consumer4FanoutExchange 
* @Description: 消費者
* @author Coder編程
* @date2019年7月19日 下午23:21:18
*
 */
public class Consumer4FanoutExchange {

	public static void main(String[] args) throws Exception {
		
		//建立ConnectionFactory
		Connection connection = ConnectionUtils.getConnection();
        
        Channel channel = connection.createChannel();  
		// 聲明
		String exchangeName = "test_fanout_exchange";
		String exchangeType = "fanout";
		String queueName = "test_fanout_queue";
		String routingKey = "";	//不設置路由鍵
		channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
		channel.queueDeclare(queueName, false, false, false, null);
		channel.queueBind(queueName, exchangeName, routingKey);
		
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //參數:隊列名稱、是否自動ACK、Consumer
        channel.basicConsume(queueName, true, consumer); 
        //循環獲取消息  
        while(true){  
            //獲取消息,若是沒有消息,這一步將會一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
	}
}

測試結果:

打印結果


6. 其餘

6.1 Bingding —— 綁定

  • Exchange和Exchange、Queue之間的鏈接關係
  • Bingding能夠包含RoutingKey或者參數

6.2 Queue——消息隊列

  • 消息隊列,實際存儲消息數據
  • Durability:是否持久化,Durable:是 ,Transient:否
  • Auto delete:如選yes,表明當最後一個監聽被移除以後,該Queue會自動被刪除。

6.3 Message——消息

  • 服務器與應用程序之間傳送的數據
  • 本質上就是一段數據,由Properties和Payload(Body)組成
  • 經常使用屬性:delivery mode、headers(自定義屬性)

6.4 其餘屬性

content_type、content_encoding、priority

correlation_id、reply_to、expiration、message_id

timestamp、type、user_id、app_id、cluster_id

6.5 Virtual Host虛擬主機

  • 虛擬地址,用於進行邏輯隔離,最上層的消息路由
  • 一個Virtual Host裏面能夠有若干個Exchange和Queue
  • 同一個Virtual Host裏面不能有相同名稱的Exchange或Queue

7. 總結

RabbitMQ的概念、安裝與使用、管控臺操做、結合RabbitMQ的特性、Exchange、Queue、Binding 、RoutingKey、Message進行覈銷API的講解,經過本章的學習,但願你們對RabbitMQ有一個初步的認識。

文末

歡迎關注我的微信公衆號:Coder編程 獲取最新原創技術文章和免費學習資料,更有大量精品思惟導圖、面試資料、PMP備考資料等你來領,方便你隨時隨地學習技術知識! 新建了一個qq羣:315211365,歡迎你們進羣交流一塊兒學習。謝謝了!也能夠介紹給身邊有須要的朋友。

文章收錄至 Github: https://github.com/CoderMerlin/coder-programming Gitee: https://gitee.com/573059382/coder-programming 歡迎關注並star~ 微信公衆號

參考文章:

《RabbitMQ消息中間件精講》

推薦文章:

消息中間件——RabbitMQ(三)理解RabbitMQ核心概念和AMQP協議!

消息中間件——RabbitMQ(四)命令行與管控臺的基本操做!

消息中間件——RabbitMQ(五)快速入門生產者與消費者,SpringBoot整合RabbitMQ!

相關文章
相關標籤/搜索