RabbitMQ 從入門到精通 (一)

1. 初識RabbitMQ

RabbitMQ 是一個開源的消息代理和隊列服務器,用來經過普通協議在徹底不一樣的應用之間共享數據,RabbitMQ是使用 Erlang語言來編寫的,而且RabbitMQ是基於AMQP協議的web

RabbitMQ的優勢:spring

  • 開源、性能優秀、穩定性保障
  • 提供可靠性消息投遞模式(confirm)、返回模式(return)
  • 與SpringAMQP完美的整合、API豐富
  • 集羣模式豐富,表達式配置,HA模式,鏡像隊列模型
  • 保證數據不丟失的前提下作到高可靠性、可用性

RabbitMQ官網springboot

RabbitMQ的總體架構:服務器

 
RabbitMQ的消息流轉:網絡

 

 

2. AMQP

AMQP全稱: Advanced Message Queuing Protocol架構

AMQP翻譯: 高級消息隊列協議spring-boot

AMQP定義: 是具備現代特徵的二進制協議。是一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計性能

 
 

AMQP核心概念:ui

  • Server:又稱Broker,接受客戶端的鏈接,實現AMQP實體服務
  • Connection:鏈接,應用程序與Broker的網絡鏈接
  • Channel:網絡信道,幾乎全部的操做都在Channel中進行,Channel是進行消息讀寫的通道。客戶端可創建多個Channel,每一個Channel表明一個會話任務
  • Message:消息,服務器和應用程序之間傳送的數據,由Properties和Body組成。Properties能夠對消息進行修飾,好比消息的優先級、延遲等高級特性;Body則是消息體的內容
  • Virtual host:虛擬地址,用於進行邏輯隔離,最上層的消息路由。同一個Virtual Host裏面不能有相同名稱的Exchange或Queue
  • Exchange:交換機,接收消息,根據路由鍵轉發消息到綁定的隊列
  • Binding:Exchange和Queue之間的虛擬鏈接,binding中能夠包含routing key
  • Routing key:一個路由規則,虛擬機可用它肯定如何路由一個特定消息
  • Queue:也稱爲Message Queue,消息隊列,保存消息並將它們轉發給消費者

 

 

3.RabbitMQ的極速入門

後臺啓動: ./rabbitmq start &

關閉: ./rabbitmqctl stop

節點狀態: ./rabbitmqctl status

管控臺: http://ip:15672

 

 

RabbitMQ生產消費快速入門:

環境: springboot+jdk1.7+rabbitmq3.6.5 (Maven依賴配置)

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.9.RELEASE</version>
  </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
        </dependency>
    </dependencies>

 

public class Procuder {
    public static void main(String[] args) throws Exception {
        
        //1.建立一個ConnectionFactory 並進行配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);
        //2.經過鏈接工廠建立鏈接
        Connection connection = connectionFactory.newConnection();
        
        //3.經過Connection 建立一個 Channel
        Channel channel = connection.createChannel();
    
        /**
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         * exchange:指定交換機 不指定 則默認 (AMQP default交換機) 經過routingkey進行匹配 
         * props 消息屬性
         * body 消息體
         */
        //4.經過Channel發送數據
        for(int i = 0; i < 5; i++){
          System.out.println("生產消息:" + i);
          String msg = "Hello RabbitMQ" + i;
          channel.basicPublish("", "test", null, msg.getBytes());
        }
        
        
        //5.記得關閉相關的鏈接
        channel.close();
        connection.close();
    }
}

 

public class Consumer {
    public static void main(String[] args) throws Exception{
                //1.建立一個ConnectionFactory 並進行配置
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("192.168.244.11");
                connectionFactory.setPort(5672);
                connectionFactory.setVirtualHost("/");
                connectionFactory.setHandshakeTimeout(20000);
                //2.經過鏈接工廠建立鏈接
                Connection connection = connectionFactory.newConnection();
                
                //3.經過Connection 建立一個 Channel
                Channel channel = connection.createChannel();
                
                //4. 聲明建立一個隊列
                String queueName = "test";
                /**
                 * durable 是否持久化
                 * exclusive 獨佔的  至關於加了一把鎖
                 */
                channel.queueDeclare(queueName,true,false,false,null);
                
                //5.建立消費者
                QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
                
                //6.設置channel
                /**
                 * ACK: 當一條消息從生產端發到消費端,消費端接收到消息後會立刻回送一個ACK信息給broker,告訴它這條消息收到了
                 * autoack: 
                 * true  自動簽收 當消費者一收到消息就表示消費者收到了消息,消費者收到了消息就會當即從隊列中刪除。
                 * false 手動簽收 當消費者收到消息在合適的時候來顯示的進行確認,說我已經接收到了該消息了,RabbitMQ能夠從隊列中刪除該消息了
                 * 
                 */
                channel.basicConsume(queueName, true, queueingConsumer);
                
                //7.獲取消息
                while(true){
                    Delivery delivery = queueingConsumer.nextDelivery();
                    String msg = new String(delivery.getBody());
                    System.err.println("消費端:" + msg);
                    //Envelope envelope = delivery.getEnvelope();
                }
    }
}

 

4. Exchange(交換機)詳解

Exchange: 接收消息,並根據路由鍵轉發消息所綁定的隊列

 

交換機屬性:

  • Name: 交換機名稱
  • Type: 交換機類型 diect、topic、fanout、headers
  • Durability: 是否須要持久化,true爲持久化
  • AutoDelete: 當最後一個綁定到Exchange的隊列刪除後,自動刪除該Exchange
  • Internal: 當前Exchange是否用於RabbitMQ內部使用,默認爲false (百分之99的狀況默認爲false 除非對Erlang語言較瞭解,作一些擴展)
  • Arguments: 擴展參數, 用於擴展AMQP協議可自定化使用

 

4.1 Direct Exchange

全部發送到Direct Exchange的消息被轉發到RouteKey指定的Queue

注意:Direct模式可使用RabbitMQ自帶的Exchange: default Exchange,因此不須要將Exchange進行任何綁定(binding)操做,消息傳遞時,RoutingKey必須徹底匹配纔會被隊列接收,不然該消息會被拋棄

 

public class ProducerDirectExchange {
    public static void main(String[] args) throws Exception {
        //1.建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        //2.建立Connection
        Connection connection = connectionFactory.newConnection();
        //3.建立Channel
        Channel channel = connection.createChannel();
        //4.聲明
        String exchangeName = "test_direct_exchange";
        String routingKey = "test.direct";
        //5.發送
        String msg = "Hello World RabbitMQ4 Direct Exchange Message";
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
    }
}

 

public class ConsumerDirectExchange {
    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        
        Connection connection = connectionFactory.newConnection();
        
        Channel channel = connection.createChannel();
        //聲明
        String exchangeName = "test_direct_exchange";
        String exchangeType = "direct";
        String queueName = "test_direct_queue";
        String routingKey = "test.direct";
        //表示聲明瞭一個交換機
        channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);
        //表示聲明瞭一個隊列
        channel.queueDeclare(queueName,false,false,false,null);
        //創建一個綁定關係
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //參數:隊列名稱,是否自動ACK,Consumer
        channel.basicConsume(queueName, true, consumer);
        
        //循環獲取消息
        while(true){
            //獲取消息,若是沒有消息,這一步將會一直阻塞
            Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg);
        }
    }
}

 

4.2 Topic Exchange

全部發送到Topic Exchange的消息被轉發到全部關心RouteKey中指定Topic的Queue上

Exchange將RouteKey和某Topic進行模糊匹配,此時隊列須要綁定一個Topic

注意:可使用通配符進行匹配

符號 # 匹配一個或多個詞

符號 * 匹配很少很多一個詞

例如: "log.#" 可以匹配到 「log.info.oa」

​ "log.*" 只會匹配到 "log.err"

public class ProducerTopicExchange {
    public static void main(String[] args) throws Exception {
        //1.建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);

        //2.建立Connection
        Connection connection = connectionFactory.newConnection();
        //3.建立Channel
        Channel channel = connection.createChannel();
        //4.聲明
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.abc";
        //5.發送
        String msg = "Hello World RabbitMQ4 Direct Exchange Message";
        channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
        channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
        channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
    }
}

 

public class ConsumerTopicExchange {
    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        
        Connection connection = connectionFactory.newConnection();
        
        Channel channel = connection.createChannel();
        //聲明
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
        String routingKey = "user.#";
        //表示聲明瞭一個交換機
        channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);
        //表示聲明瞭一個隊列
        channel.queueDeclare(queueName,false,false,false,null);
        //創建一個綁定關係
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //參數:隊列名稱,是否自動ACK,Consumer
        channel.basicConsume(queueName, true, consumer);
        
        //循環獲取消息
        while(true){
            //獲取消息,若是沒有消息,這一步將會一直阻塞
            Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg);
        }
    }
}

 

4.3 Fanout Exchange

不處理路由鍵,只須要簡單的將隊列綁定到交換機上
發送到交換機的消息都會被轉發到與該交換機綁定的全部隊列上
因此Fanout交換機轉發消息是最快的

 

public class ProducerFanoutExchange {
    public static void main(String[] args) throws Exception {
        //1.建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);

        //2.建立Connection
        Connection connection = connectionFactory.newConnection();
        //3.建立Channel
        Channel channel = connection.createChannel();
        //4.聲明
        String exchangeName = "test_fanout_exchange";
        //5.發送
        for(int i = 0; i < 10 ; i++){
            String msg = "Hello World RabbitMQ4 Direct Exchange Message";
            channel.basicPublish(exchangeName, "", null, msg.getBytes());
        }
        channel.close();
        connection.close();
    }
}

 

public class ConsumerFanoutExchange {
    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        
        Connection connection = connectionFactory.newConnection();
        
        Channel channel = connection.createChannel();
        //聲明
        String exchangeName = "test_fanout_exchange";
        String exchangeType = "fanout";
        String queueName = "test_topic_queue";
        //無需指定路由key 
        String routingKey = "";
        //表示聲明瞭一個交換機
        channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);
        //表示聲明瞭一個隊列
        channel.queueDeclare(queueName,false,false,false,null);
        //創建一個綁定關係
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //參數:隊列名稱,是否自動ACK,Consumer
        channel.basicConsume(queueName, true, consumer);
        
        //循環獲取消息
        while(true){
            //獲取消息,若是沒有消息,這一步將會一直阻塞
            Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg);
        }
    }
}

 

5. Message 消息

服務器與應用程序之間傳遞的數據,本質上就是一段數據,由Properties和Body組成

經常使用屬性:delivery mode、headers (自定義屬性)

其餘屬性:content_type、content_encoding、priority、expiration

消息的properties屬性用法示例:

public class Procuder {
    public static void main(String[] args) throws Exception {
        
        //1.建立一個ConnectionFactory 並進行配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);
        //2.經過鏈接工廠建立鏈接
        Connection connection = connectionFactory.newConnection();
        
        //3.經過Connection 建立一個 Channel
        Channel channel = connection.createChannel();
    
        Map<String,Object> headers = new HashMap<>();
        headers.put("my1", "111");
        headers.put("my2", "222");
        
        //10秒不消費 消息過時移除消息隊列
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                .deliveryMode(2)
                .contentEncoding("utf-8")
                .expiration("10000")
                .headers(headers)
                .build();
        
        //4.經過Channel發送數據
        for(int i = 0; i < 5; i++){
          System.out.println("生產消息:" + i);
          String msg = "Hello RabbitMQ" + i;
          channel.basicPublish("", "test", properties, msg.getBytes());
        }
        
        
        //5.記得關閉相關的鏈接
        channel.close();
        connection.close();
    }
}

 

public class Consumer {
    public static void main(String[] args) throws Exception{
                //1.建立一個ConnectionFactory 並進行配置
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("192.168.244.11");
                connectionFactory.setPort(5672);
                connectionFactory.setVirtualHost("/");
                connectionFactory.setHandshakeTimeout(20000);
                //2.經過鏈接工廠建立鏈接
                Connection connection = connectionFactory.newConnection();
                
                //3.經過Connection 建立一個 Channel
                Channel channel = connection.createChannel();
                
                //4. 聲明建立一個隊列
                String queueName = "test";
                channel.queueDeclare(queueName,true,false,false,null);
                
                //5.建立消費者
                QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
                
                //6.設置channel
                channel.basicConsume(queueName, true, queueingConsumer);
                
                //7.獲取消息
                while(true){
                    Delivery delivery = queueingConsumer.nextDelivery();
                    String msg = new String(delivery.getBody());
                    System.err.println("消費端:" + msg);
                    
                    Map<String, Object> headers = delivery.getProperties().getHeaders();
                    System.err.println("headers value:" + headers.get("my1"));
                }
    }
}
相關文章
相關標籤/搜索