RabbitMQ基礎

一. RabbitMQ簡介

  • 1 . RabbitMQ是一個有Erlang開發的AMQP(Advanced Message Queue)的開源實現java

  • 2 . RabbitMQ的官網:http://www.rabbitmq.comspring

  • 3 . RabbitMQ是一款消息組件,其中必定包含生產者,消費者,消息組件。RabbitMQ中有三個重要組成部分vim

    • a . Exchange:交換空間瀏覽器

    • b . Queue:數據隊列bash

    • c . RoutingKey:隊列路由(若是全部的隊列的RoutingKey都同樣,則屬於廣播小,若是不同,則屬於點對點消息)ide

  • 4 . RabbitMQ中的幾個核心概念spring-boot

    • a . Broker:消息隊列的服務主機測試

    • b . Exchange:消息交換機,用於分發消息到隊列ui

    • c . Queue:消息隊列的載體,每一個消息都會被投入到一個或多個隊列es5

    • e . Binding:將Exchange與Queue按照RoutingKey規則進行綁定

    • f . RoutingKey:路由Key,Exchange根據RoutingKey進行消息分發

    • g . Vhost:虛擬主機,一個Broker能夠有多個Vhost,用於實現用戶(權限)的分離

    • h . Producer:消息生產者

    • i . Consumer:消息消費者

    • j . Channel:消息通道,每一個Channel表明一個會話任務

二. 環境搭建

  • 1 . 安裝Erlang開發環境

    • a . 在這裏安裝Erlang時遇到的坑較多,我的不推薦下載erlang源碼進行解壓縮編譯安裝,由於依賴的庫較多(gcc,libncurses5-dev,.eg):

    • 創建erlang目錄mkdir -p /usr/local/erlang

    • 進入源碼目錄 cd /user/local/src/otp_src_19.3

    • 編譯配置 ./configure --prefix=/usr/local/erlang

    • 編譯安裝 make && make install

    • 配置環境變量

    vim /etc/profile
    export ERLANG_HOME=/usr/local/erlang
    export PATH=$PATH:$ERLANG_HOME/bin:
    source /etc/profile

    • b . 本人使用apt-get安裝erlang語言環境

    • apt-get install erlang 或者apt-get install erlang-nox

    • c . 測試erlang

    • 輸入erl 表示進入erlang環境

    • 輸入halt().退出

  • 2 . 安裝RabbitMQ

    • a . 根據官網介紹進行安裝
      clipboard.png

      • 相關命令

        echo 'deb http://www.rabbitmq.com/debian/ testing main' |
             sudo tee /etc/apt/sources.list.d/rabbitmq.list
        wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc |
             sudo apt-key add -
        sudo apt-get update
        sudo apt-get install rabbitmq-server
    • b . 後臺啓動RabbitMQrabbitmq-server start > /dev/null 2>&1 &

    • c . 開啓管理頁面插件rabbitmq-plugins enable rabbitmq_management

    • d . 添加新用戶rabbitmqctl add_user evans 123123(建立一個用戶名爲evans,密碼爲123123的用戶)

    • e . 將新用戶設爲管理員rabbitmqctl set_user_tags evans administrator

    • f . 打開瀏覽器輸入訪問地址http://192.168.1.1:15672訪問RabbitMQ管理頁面
      clipboard.pngclipboard.png

    • g . 查看RabbitMQ狀態rabbitmqctl status,關閉RabbitMQrabbitmqctl stop

    • h . 設置用戶虛擬主機,不然程序沒法鏈接Queueclipboard.pngclipboard.png

二. Java基本操做

    • 1 . 在管理界面中增長一個新的Queue
      clipboard.png

      • a . Name:隊列名稱

      • b . Durability:持久化選項:Durable(持久化保存),Transient(即時保存),持久化保存在RabbitMQ宕機或者重啓後,未消費的消息仍然存在,即時保存在RabbitMQ宕機或者重啓後不存在

      • c . Auto delete:自動刪除

    • 2 . 引入RabbitMQ的Repository

      <dependency>
          <groupId>com.rabbitmq</groupId>
          <artifactId>amqp-client</artifactId>
          <version>4.1.0</version>
      </dependency>
    • 3 . 消息生產者MessageProducer.java

      package com.evans.rabbitmq;
      
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Connection;
      import com.rabbitmq.client.ConnectionFactory;
      
      import java.io.IOException;
      import java.util.concurrent.TimeoutException;
      
      /**
       * Created by Evans 
       */
      public class MessageProducer {
          //隊列名稱
          private static final String QUEUE_NAME = "first";
          //主機IP
          private static final String HOST="127.0.0.1";
          //端口
          private static final Integer PORT=5672;
          //用戶名
          private static final String USERNAME="evans";
          //密碼
          private static final String PASSWORD="evans";
      
          public static void main(String[] args) throws Exception {
              //建立工廠類
              ConnectionFactory factory = new ConnectionFactory();
              //設置參數
              factory.setHost(HOST);
              factory.setPort(PORT);
              factory.setUsername(USERNAME);
              factory.setPassword(PASSWORD);
              //建立鏈接
              Connection connection =factory.newConnection();
              //建立Channel
              Channel channel=connection.createChannel();
              //聲明Queue
              /*
               * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
               * 隊列名稱,是否持久保存,是否爲專用的隊列,是否容許自動刪除,配置參數
               * 此處的配置與RabbitMQ管理界面的配置一致
               */
              channel.queueDeclare(QUEUE_NAME,true,false,true,null);
              Long start = System.currentTimeMillis();
              for (int i=0;i<100;i++){
                  //發佈消息
                  /*
                   * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
                   * exchange名稱,RoutingKey,消息參數(消息頭等)(持久化時須要設置),消息體
                   * MessageProperties有4中針對不一樣場景能夠進行選擇
                   */
                  channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,("Message:"+i).getBytes());
              }
              Long end = System.currentTimeMillis();
              System.out.println("System cost :"+(end-start));
              channel.close();
              connection.close();
          }
      }
    • 4 . 運行MessageProduce的Main方法,在管理界面會出現詳細的監控數據,此時消息已經成功發送至RabbitMQ的隊列中clipboard.pngclipboard.png

    • 5 . 消息消費者MessageConsumer.java

      package com.evans.rabbitmq;
      
      import com.rabbitmq.client.*;
      
      import java.io.IOException;
      import java.util.concurrent.TimeoutException;
      
      /**
       * Created by Evans on 2017/7/15.
       */
      public class MessageConsumer {
      
          //隊列名稱
          private static final String QUEUE_NAME = "first";
          //主機IP
          private static final String HOST="10.0.0.37";
          //端口
          private static final Integer PORT=5672;
          //用戶名
          private static final String USERNAME="evans";
          //密碼
          private static final String PASSWORD="evans";
      
          public static void main(String[] args) throws IOException, TimeoutException {
              //建立工廠類
              ConnectionFactory factory = new ConnectionFactory();
              //設置參數
              factory.setHost(HOST);
              factory.setPort(PORT);
              factory.setUsername(USERNAME);
              factory.setPassword(PASSWORD);
              //建立鏈接
              Connection connection =factory.newConnection();
              //建立Channel
              Channel channel=connection.createChannel();
              //聲明Queue
              /*
               * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
               * 隊列名稱,是否持久保存,是否爲專用的隊列,是否容許自動刪除,配置參數
               * 此處的配置與RabbitMQ管理界面的配置一致
               */
              channel.queueDeclare(QUEUE_NAME,true,false,true,null);
              //這裏須要複寫handleDelivery方法進行消息自定義處理
              Consumer consumer = new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      String message = new String(body);
                      System.out.println("Consume Get Message : "+message);
                  }
              };
              channel.basicConsume(QUEUE_NAME,consumer);
          }
      }
    • 6 . 運行MessageConsumer的Main方法,會進行消息消費處理,此時控制檯會輸出消費的消息,此時完成了消息的生產與消費的基本操做,當存在多個消費者的處理同一個隊列時,RabbitMQ會自動進行均衡負載處理,多個消費者共同來處理消息

      Consume Get Message : Message:0
      Consume Get Message : Message:1
      Consume Get Message : Message:2
      ...
      Consume Get Message : Message:99
    • 7 . RabbitMQ虛擬主機

      • a . 能夠在管理界面的admin-vhost下設置多個虛擬主機clipboard.pngclipboard.png

      • b . 在程序中經過設置factory參數進行虛擬主機的指定factory.setVirtualHost("firstHost")

    • 8 . Exchange工做模式:topic、direct、fanout

      • a . 廣播模式(fanout):一條消息被全部的消費者進行處理

        ① .將消費者與生產者中的`channel.queueDeclare()`方法替換爲`channel.exchangeDeclare(EXCHANGE_NAME, "fanout")`方法進行Exchange的指定,channel.basicPublish()方法須要指定exchange
        ② .此時再次運行生產者和多個消費者,則一個消息會被多個消費者進行消費處理
      • b . 直連模式(direct):一跳消息根據RoutingKey進行生產者與消費者的匹配,從而達到指定生產者的消息被指定消費者進行處理

        ① .將生產者中的`channel.queueDeclare()`方法替換爲`channel.exchangeDeclare(EXCHANGE_NAME, "direct")`方法進行Exchange的指定,channel.basicPublish()方法須要指定exchange和RoutingKey("mykey")
        ② .將消費者中的`channel.queueDeclare()`方法替換爲
        // 定義EXCHANGE的聲明String
        channel.exchangeDeclare(EXCHANGE_NAME, "direct") ;
        // 經過通道獲取一個隊列名稱                         
        String queueName= channel.queueDeclare().getQueue() ;
        // 進行綁定處理
        channel.queueBind(queueName, EXCHANGE_NAME, "mykey") ;
        ③ .此時RoutingKey做爲惟一標記,這樣就能夠將消息推送到指定的消費者進行處理
      • c . 主題模式(topic):一條消息被全部的消費者進行處理

        ① .將生產者中的`channel.queueDeclare()`方法替換爲`channel.exchangeDeclare(EXCHANGE_NAME, "topic") `方法進行Exchange的指定,channel.basicPublish()方法須要指定exchange和RoutingKey("mykey-01")
        ② .將消費者中的`channel.queueDeclare()`方法替換爲
        // 定義EXCHANGE的聲明String
        channel.exchangeDeclare(EXCHANGE_NAME, "topic") ;
        // 經過通道獲取一個隊列名稱                         
        String queueName= channel.queueDeclare().getQueue() ;
        // 進行綁定處理
        channel.queueBind(queueName, EXCHANGE_NAME, "mykey-01");
        ③ .此時主題模式即爲廣播模式與直連模式的混合使用。

    三. RabbitMQ整合Spring

    • 1 . 引入srping-rabbit的Repository

      <dependency>
          <groupId>org.springframework.amqp</groupId>
          <artifactId>spring-rabbit</artifactId>
          <version>1.7.3.RELEASE</version>
      </dependency>
    • 2 . 創建rabbitmq.properties,對RabbitMQ的屬性參數進行設置

      # RabbitMQ的主機IP
      mq.rabbit.host=192.168.68.211
      # RabbitMQ的端口
      mq.rabbit.port=5672
      # RabbitMQ的VHost
      mq.rabbit.vhost=hello
      # RabbitMQ的exchange名稱
      mq.rabbit.exchange=spring.rabbit
      # 用戶名
      mq.rabbit.username=evans
      # 密碼
      mq.rabbit.password=evans
    • 3 . 生產者XML(需增長xmlns:rabbit命名空間)

      <?xml version="1.0" encoding="UTF-8"?>
      <beans xmlns="http://www.springframework.org/schema/beans"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xmlns:context="http://www.springframework.org/schema/context"
             xmlns:rabbit="http://www.springframework.org/schema/rabbit"
             xsi:schemaLocation="http://www.springframework.org/schema/beans
              http://www.springframework.org/schema/beans/spring-beans-4.3.xsdhttp://www.springframework.org/schema/context 
              http://www.springframework.org/schema/context/spring-context-4.3.xsdhttp://www.springframework.org/schema/rabbit 
              http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd">
        <context:component-scan base-package="com.evans.rabbitmq"/>
        <!--定義rabbitmq配置的相關屬性文件信息-->
        <context:property-placeholderlocation="classpath:rabbitmq.properties"/>
        <!--若是要想進行RabbiMQ的操做管理,則首先必定要準備出一個鏈接工廠類-->
        <rabbit:connection-factoryid="connectionFactory" host="${mq.rabbit.host}" port="${mq.rabbit.port}" username="${mq.rabbit.username}" password="${mq.rabbit.password}" virtual-host="${mq.rabbit.vhost}"/>
        <!--全部的鏈接工廠要求被RabbitMQ所管理-->
        <rabbit:adminconnection-factory="connectionFactory"/>
        <!--建立一個隊列信息-->
        <rabbit:queueid="myQueue" durable="true" auto-delete="true" exclusive="false" name="queue.first"/>
        <!--下面實現一個直連的操做模式-->
        <rabbit:direct-exchangeid="mq-direct" name="${mq.rabbit.exchange}" durable="true"a uto-delete="true">
          <rabbit:bindings>
            <!--如今要求綁定到指定的隊列之中-->
            <rabbit:bindingqueue="myQueue" key="key01"/>
          </rabbit:bindings>
        </rabbit:direct-exchange>
        <!--全部整合的消息系統都會有一個模版-->
        <rabbit:templateid="amqpTemplate" exchange="${mq.rabbit.exchange}" connection-factory="connectionFactory"/>
      </beans>
    • 4 . 消費者XML(需增長xmlns:rabbit命名空間)

      <?xml version="1.0" encoding="UTF-8"?>
      <beans xmlns="http://www.springframework.org/schema/beans"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xmlns:context="http://www.springframework.org/schema/context"
             xmlns:rabbit="http://www.springframework.org/schema/rabbit"
             xsi:schemaLocation="http://www.springframework.org/schema/beans
              http://www.springframework.org/schema/beans/spring-beans-4.3.xsdhttp://www.springframework.org/schema/context
               http://www.springframework.org/schema/context/spring-context-4.3.xsdhttp://www.springframework.org/schema/rabbit
                http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd">
        <!--定義rabbitmq配置的相關屬性文件信息-->
        <context:property-placeholderlocation="classpath:rabbitmq.properties"/>
        <!--若是要想進行RabbiMQ的操做管理,則首先必定要準備出一個鏈接工廠類-->
        <rabbit:connection-factoryid="connectionFactory" host="${mq.rabbit.host}" port="${mq.rabbit.port}" username="${mq.rabbit.username}" password="${mq.rabbit.password}" virtual-host="${mq.rabbit.vhost}"/>
        <!--全部的鏈接工廠要求被RabbitMQ所管理-->
        <rabbit:adminconnection-factory="connectionFactory"/>
        <!--建立一個隊列信息-->
        <rabbit:queueid="myQueue" durable="true" auto-delete="true" exclusive="false" name="queue.first"/>
        <!--下面實現一個直連的操做模式-->
        <rabbit:direct-exchangeid="mq-direct" name="${mq.rabbit.exchange}" durable="true" auto-delete="true">
          <rabbit:bindings>
            <!--如今要求綁定到指定的隊列之中-->
            <rabbit:bindingqueue="myQueue" key="key01"/>
          </rabbit:bindings>
        </rabbit:direct-exchange>
        <!--定義具體的消費處理類-->
        <beanid="messageConsumer" class="cn.evans.rabbitmq.MessageConsumer"/>
        <!--啓動消費監聽程序-->
        <rabbit:listener-containerconnection-factory="connectionFactory">
          <rabbit:listenerref="messageConsumer"queues="myQueue"/>
        </rabbit:listener-container>
      </beans>
    • 5 . 生產者

      • a . 定義消息Service

        package com.evans.rabbitmq;
        
        /**
         * Created by Evans 
         */
        public interface MessageService {
            /**
             * 發送消息
             * @param message
             */
            public void sendMessage(String message);
        }
      • b . 定義MessageService的實現類

        package com.evans.rabbitmq;
        
        import org.springframework.amqp.core.AmqpTemplate;
        
        import javax.annotation.Resource;
        
        /**
         * Created by Evans
         */
        public class MessageServiceImpl implements MessageService {
            
            @Resource
            private AmqpTemplate template;
            
            @Override
            public void sendMessage(String message) {
                template.convertAndSend("key01",message);
            }
        }
    • 5 . 消費者

      • a .消費者須要實現MessageListener接口

      • b .消息處理類

        package com.evans.rabbitmq;
        
        import org.springframework.amqp.core.Message;
        import org.springframework.amqp.core.MessageListener;
        
        /**
         * Created by Evans 
         */
        public class MessageConsumer implements MessageListener {
            
            @Override
            public void onMessage(Message message) {
                System.out.println("Consumer Message: "+ message);    
            }
        }

    四. RabbitMQ整合SpringBoot

    • 1 . 引入SpringBoot的RabbitMQ腳手架

      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-amqp</artifactId>
      </dependency>
    • 2 . 配置Application.yml

      spring:
        rabbitmq:
          host: 10.0.0.37
          port: 5672
          username: evans
          password: evans
    • 3 . 配置類

      package com.evans.rabbitmq;
      
      import org.springframework.amqp.core.Queue;
      import org.springframework.context.annotation.Bean;
      
      import org.springframework.context.annotation.Configuration;
      
      /**
       * Created by Evans 
       */
      @Configuration
      public class RabbitConfigure {
          @Bean
          public Queue firstQueue(){
              return new Queue("firstQueue");
          }
      }
    • 4 . 生產者

      package com.evans.rabbitmq;
      
      import org.springframework.amqp.rabbit.core.RabbitTemplate;
      import org.springframework.stereotype.Component;
      
      import javax.annotation.Resource;
      import java.time.LocalDateTime;
      
      /**
       * Created by Evans
       */
      @Component
      public class MessageProducer {
      
          @Resource
          private RabbitTemplate rabbitTemplate;
          
          public void send(){
              LocalDateTime current =LocalDateTime.now();
              System.out.println("Send Message : "+current);
              rabbitTemplate.convertAndSend("firstQueue","Send Message"+ current);
          }
      }
    • 5 . 消費者

      package com.evans.rabbitmq;
      
      import org.springframework.amqp.rabbit.annotation.RabbitHandler;
      import org.springframework.amqp.rabbit.annotation.RabbitListener;
      import org.springframework.stereotype.Component;
      
      /**
       * Created by Evans 
       */
      @Component
      @RabbitListener(queues = "firstQueue")
      public class MessageConsumer {
      
          @RabbitHandler
          public void consumer(String message){
              System.out.println("Consumer Message : "+message);
          }
      }
    • 6 . FanoutExchange配置

      @Configuration
      public class FanoutConfiguration {
      
          @Bean
          public Queue fanoutFirstQueue() {
              return new Queue("fanout.first");
          }
      
          @Bean
          public Queue fanoutSecondQueue() {
              return new Queue("fanout.second");
          }
      
          @Bean
          public Queue fanoutThirdQueue() {
              return new Queue("fanout.third");
          }
      
          @Bean
          public FanoutExchange fanoutExchange() {
              return new FanoutExchange("fanoutExchange");
          }
      
          @Bean
          public Binding bindingExchangeFanoutFirst(Queue fanoutFirstQueue, FanoutExchange fanoutExchange) {
              return BindingBuilder.bind(fanoutFirstQueue).to(fanoutExchange);
          }
      
          @Bean
          public Binding bindingExchangeFanoutSecond(Queue fanoutSecondQueue, FanoutExchange fanoutExchange) {
              return BindingBuilder.bind(fanoutSecondQueue).to(fanoutExchange);
          }
      
          @Bean
          public Binding bindingExchangeFanoutThird(Queue fanoutThirdQueue, FanoutExchange fanoutExchange) {
              return BindingBuilder.bind(fanoutThirdQueue).to(fanoutExchange);
          }
      
      }
    • 7 . TopicExchange配置

      @Configuration
      public class TopicConfiguration {
      
          @Bean
          public Queue topicFirstQueue() {
              return new Queue("topic.first");
          }
      
          @Bean
          public Queue topicAnyQueue() {
              return new Queue("topic.any");
          }
      
          @Bean
          public TopicExchange topicExchange() {
              return new TopicExchange("topicExchange");
          }
      
          @Bean
          public Binding bindingExchangeTopicFirst(Queue topicFirstQueue, TopicExchange topicExchange) {
              return BindingBuilder.bind(topicFirstQueue).to(topicExchange).with("topic.first");
          }
      
          @Bean
          public Binding bindingExchangeTopicAny(Queue topicAnyQueue, TopicExchange topicExchange) {
              return BindingBuilder.bind(topicAnyQueue).to(topicExchange).with("topic.#");
          }
      
      }
    相關文章
    相關標籤/搜索