RabbitMQ入門與核心概念

著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。

初識RabbitMQ

rabbitMQ是一個開源的消息代理和隊列服務器,用於經過普通協議在徹底不一樣的應用之間共享數據,RabbitMQ是使用Erlang語言來編寫的,而且RabbitMQ是基於AMQP協議的。html

哪些大廠在用RabbitMQ,爲何?

  • 滴滴、美團、頭條、去哪兒、ppmoney
  • 開源、性能優秀,穩定性保障
  • 提供可靠的消息投遞模式(confirm)、返回模式(return)
  • 與SpringAMQP完美的整合,API豐富
  • 集羣模式豐富,表達式配置,HA模式,鏡像隊列模型
  • 保證消息數據不丟失的前提作到高可靠性、可用性

RabbitMQ高性能的緣由

  • Erlang語言最初是用於交換機領域的架構模式,這樣的話RabbitMQ在Broker之間進行數據交互的性能是優秀的。
  • Erlang語言的特色:Erlang有着和原生Socket同樣的延遲

什麼是AMQP高級消息隊列協議?

AMQP定義:是具備現代特徵的二進制協議。是一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計,是一個規範,RabbitMQ就是這個規範的一種實現。node

AMQP協議模型

AMQP核心概念

  • Server:又稱Broker,接受客戶端的鏈接,實現AMQP實體服務
  • Connection:鏈接,應用程序與Broker的網絡鏈接
  • Channel:網絡信道,幾乎全部的操做都在Channel中進行,Channel是進行消息讀寫的通道。客戶端可創建多個Channel,每一個Channel表明一個會話任務。
  • Message:消息,服務器與應用程序之間傳遞的數據,由Properties和Body組成。Properties能夠對消息進行修飾,好比消息的優先級,延遲等高級特性;Body則是消息體內容。
  • Virtual host:虛擬地址,用於進行邏輯隔離,最上層的消息路由。一個Virtual host裏面能夠有若干個Exchange和Queue,同一個Virtual host裏面不能有相同名稱的Exchange和Queue(例如:redis有16個數據庫,也是邏輯隔離的概念)
  • Exchange:交換機,接受消息,根據路由鍵轉發消息到綁定的隊列
  • Binding:Exchange和Queue之間的虛擬鏈接,binding中能夠包含routing key
  • Routing key:一個路由規則,虛擬機能夠用它來肯定如何路由一個特定消息
  • Queue:也稱爲Message Queue,消息隊列,保存消息並將它們轉發給消費者

RabbitMQ的總體架構

生產者只須要關注把消息投遞到指定的exchange便可,消費者監聽指定的隊列就能夠了。經過這個圖能夠發現,生產者不關心我把消息投遞到哪一個隊列,消費者也不關係我這個隊列的消息是從哪一個exchange來的,這二者徹底沒有耦合的狀況。那麼它們是怎麼流轉的,是經過exchange和queue的綁定關係。

消息流轉圖 redis

RabbitMQ的安裝

rabbitMQ之安裝和配置(一)
rabbitmq之配置文件詳解(二)數據庫

安裝完成以後咱們能夠看到有三個與rabbitmq相關的命令 api

服務的啓動:rabbitmq-server start & 
服務的中止:rabbitmqctl stop_app
管理插件: rabbitmq-plugins enable rabbitmq_management
複製代碼

驗證是否啓動成功,出現以下內容則爲成功
使用 rabbitmq-plugins list命令查看全部已經存在的插件
查看管理控制檯頁面

命令行與管控臺的關係

基本操做bash

rabbitmqctl stop_app  #關閉應用
rabbitmqctl start_app  #啓動應用
rabbitmqctl status   #節點狀態
rabbitmqctl add_user username password  #增長用戶
rabbitmqctl list_users   #列出全部用戶
rabbitmqctl delete_user username  #刪除用戶
rabbitmqctl clear_permissions -p vhostpath username  #清除用戶權限
rabbitmqctl list_user_permissions username  #列出用戶權限
rabbitmqctl change_password username newpassword  #修改密碼
# 設置用戶權限
rabbitmqctl set_permissions -p vhostpath username ".*"".*"".*"
rabbitmqctl add_vhost vhostpath  #建立虛擬主機
rabbitmqctl list_vhosts   #列出全部虛擬主機
rabbitmqctl list_permissions -p vhostpath  #列出虛擬主機上全部權限
rabbitmqctl delete_vhost vhostpath  #刪除虛擬主機
rabbitmqctl list_queues   #查看全部隊列信息
rabbitmqctl -p vhostpath purge_queue blue   #清除隊列裏的消息
複製代碼

高級操做服務器

#移除全部數據,要在rabbitmqctl stop_app 以後使用
rabbitmqctl reset 
#[--ram] 指定節點的存儲模式,ram是內存基本存儲
rabbitmqctl join_cluster <clusternode> [--ram]  #組成集羣命令
rabbitmqctl cluster_status   #查看集羣狀態
# 修改集羣節點的存儲形式 disc磁盤 ram內存
rabbitmqctl change_cluster_node_type disc|ram 
# 忘記節點(摘除節點)
rabbitmqctl forget_cluster_node [--offline]  
#修改節點名稱
rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2...]
複製代碼

管控臺有的操做,ctl命令行也都有網絡

實例代碼操做

maven 依賴架構

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.5</version>
</dependency>
複製代碼

生產者發佈消息app

package com.example.rabbitmq.quickstart;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Procuder {

    public static void main(String[] args) throws Exception {

        // 1 建立一個ConnectionFactory,並進行配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2 經過鏈接工程建立鏈接
        Connection connection = connectionFactory.newConnection();

        // 3 經過connection建立一個channel
        Channel channel = connection.createChannel();

        for (int i = 0; i < 5; i++) {
            String msg = "hello world!";
            // 1 exchange   2 routing key
            channel.basicPublish("", "test01", null, msg.getBytes());
        }

        channel.close();
        connection.close();
    }
}
複製代碼

消費者訂閱處理消息

package com.example.rabbitmq.quickstart;

import com.rabbitmq.client.*;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 1 建立一個ConnectionFactory,並進行配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2 經過鏈接工程建立鏈接
        Connection connection = connectionFactory.newConnection();

        // 3 經過connection建立一個channel
        Channel channel = connection.createChannel();
        //4 聲明(建立)隊列
        String queueName = "test01";
        channel.queueDeclare(queueName, true, false, false, null);
        //5 建立消費者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        // 6 設置channel
        channel.basicConsume(queueName, true, queueingConsumer);

        while (true) {
            Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("消費端消息:" + msg);
            Envelope envelope = delivery.getEnvelope();
        }
    }
}
複製代碼

exchange交換機

交換機屬性

  • Name: 交換機名稱,
  • Type: 交換機類型direct、topic、fanout、headers
  • Durability: 是否須要持久化,true爲持久化
  • Auto Delete:當最後一個綁定到Exchange上的隊列刪除後,自動刪除該Exchange
  • Internal:當前Exchange是否用於RabbitMQ內部使用,默認爲false
  • ArgArgumens:擴展參數,用於擴展AMQP協議自制定化使用

Direct Exchange

全部發送到Direct Exchange的消息被轉發到RouteKey中指定到Queue

⚠️注意:Direct模式可使用RabbitMQ自帶的Exchange:default Exchange,因此不須要將Exchange進行任何綁定(binding)操做,消息傳遞時,RouteKey必須徹底匹配纔會被隊列接收,不然該消息會被拋棄。

示例代碼

package com.bfxy.rabbitmq.api.exchange.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class ConsumerDirectExchange {
	public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        
        Channel channel = connection.createChannel();  
		//4 聲明
		String exchangeName = "test_direct_exchange";
		String exchangeType = "direct";
		String queueName = "test_direct_queue";
		String routingKey = "test.direct";
		
		//表示聲明瞭一個交換機
		channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
		//表示聲明瞭一個隊列
		channel.queueDeclare(queueName, false, false, false, null);
		//創建一個綁定關係:
		channel.queueBind(queueName, exchangeName, routingKey);
		
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //參數:隊列名稱、是否自動ACK、Consumer
        channel.basicConsume(queueName, true, consumer);  
        //循環獲取消息  
        while(true){  
            //獲取消息,若是沒有消息,這一步將會一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
	}
}

複製代碼
package com.bfxy.rabbitmq.api.exchange.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ProducerDirectExchange {
	public static void main(String[] args) throws Exception {
		//1 建立ConnectionFactory
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.11.76");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		//2 建立Connection
		Connection connection = connectionFactory.newConnection();
		//3 建立Channel
		Channel channel = connection.createChannel();  
		//4 聲明
		String exchangeName = "test_direct_exchange";
		String routingKey = "test.direct111";
		//5 發送
		
		String msg = "Hello World RabbitMQ 4 Direct Exchange Message 111 ... ";
		channel.basicPublish(exchangeName, routingKey , null , msg.getBytes()); 		
	}
}
複製代碼

Topic Exchange

全部發送到Topic Exchange的消息會被轉發到全部關心該RouteKey中指定Topic的Queue上

Exchange將RouteKey和某Topic進行模糊匹配,此時隊列須要綁定一個Topic

⚠️注意:可使用通配符進行模糊匹配

符號 "#" 匹配一個或多個詞   
符號 "*" 匹配很少很多一個詞
例如:"log.#" 可以匹配到 "log.info.oa"
      "log.*" 只會匹配到 "log.error"
複製代碼

示例代碼

package com.bfxy.rabbitmq.api.exchange.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class ConsumerTopicExchange {
	public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        
        Channel channel = connection.createChannel();  
		//4 聲明
		String exchangeName = "test_topic_exchange";
		String exchangeType = "topic";
		String queueName = "test_topic_queue";
		//String routingKey = "user.*";
		String routingKey = "user.*";
		// 1 聲明交換機 
		channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
		// 2 聲明隊列
		channel.queueDeclare(queueName, false, false, false, null);
		// 3 創建交換機和隊列的綁定關係:
		channel.queueBind(queueName, exchangeName, routingKey);
		
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //參數:隊列名稱、是否自動ACK、Consumer
        channel.basicConsume(queueName, true, consumer);  
        //循環獲取消息  
        while(true){  
            //獲取消息,若是沒有消息,這一步將會一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
	}
}
複製代碼
package com.bfxy.rabbitmq.api.exchange.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ProducerTopicExchange {
	public static void main(String[] args) throws Exception {
		
		//1 建立ConnectionFactory
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.11.76");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		//2 建立Connection
		Connection connection = connectionFactory.newConnection();
		//3 建立Channel
		Channel channel = connection.createChannel();  
		//4 聲明
		String exchangeName = "test_topic_exchange";
		String routingKey1 = "user.save";
		String routingKey2 = "user.update";
		String routingKey3 = "user.delete.abc";
		//5 發送
		String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
		channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); 
		channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes()); 	
		channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); 
		channel.close();  
        connection.close();  
	}
}

複製代碼

Fanout Exchange

  • 不處理路由鍵,只須要簡單到將隊列綁定到交換機上
  • 發送到交換機的消息都會被轉發到與該交換機綁定到全部隊列上
  • Fanout交換機轉發消息上最快的

示例代碼

package com.bfxy.rabbitmq.api.exchange.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class ConsumerFanoutExchange {
	public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        
        Channel channel = connection.createChannel();  
		//4 聲明
		String exchangeName = "test_fanout_exchange";
		String exchangeType = "fanout";
		String queueName = "test_fanout_queue";
		String routingKey = "";	//不設置路由鍵
		channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
		channel.queueDeclare(queueName, false, false, false, null);
		channel.queueBind(queueName, exchangeName, routingKey);
		
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //參數:隊列名稱、是否自動ACK、Consumer
        channel.basicConsume(queueName, true, consumer); 
        //循環獲取消息  
        while(true){  
            //獲取消息,若是沒有消息,這一步將會一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
	}
}
複製代碼
package com.bfxy.rabbitmq.api.exchange.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ProducerFanoutExchange {
	public static void main(String[] args) throws Exception {
		//1 建立ConnectionFactory
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.11.76");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		//2 建立Connection
		Connection connection = connectionFactory.newConnection();
		//3 建立Channel
		Channel channel = connection.createChannel();  
		//4 聲明
		String exchangeName = "test_fanout_exchange";
		//5 發送
		for(int i = 0; i < 10; i ++) {
			String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
			channel.basicPublish(exchangeName, "", null , msg.getBytes()); 			
		}
		channel.close();  
        connection.close();  
	}
}
複製代碼

基礎概念講解

Binding-綁定

  • Exchange和Exchange、Queue之間的鏈接關係
  • Binding能夠包含RoutingKey或者參數

Queue-消息隊列

  • 消息隊列,實際存儲消息數據
  • Durability:是否持久化,Durable:是,Transient:否
  • Auto Delete:如選yes,表明當最後一個監聽被移除以後,該Queue會自動被刪除

Message-消息

  • 服務器與應用程序之間傳遞當數據
  • 本質上就是一段數據,由propertie和Payload(Body)組成
  • 經常使用屬性:delivery mode、headers(自定義屬性)、content_type、content_enconding、priority、correlation_id、reply_to、expiration、message_id、timestamp、type、user_id、app_id、cluster_id

Virtual host-虛擬主機

  • 虛擬地址,用於進行邏輯隔離,最上層當消息路由
  • 一個Virtual host裏面能夠由若干個Exchange和Queue
  • 同一個Virtual host裏面不能有相同名稱的Exchange或Queue

更多內容閱讀:
rabbitmq核心概念總結
中文文檔

相關文章
相關標籤/搜索