團長大人的學習筆記——RabbitMQ

1、RibbitMQ的基礎介紹

1. 爲何要使用MQ 2. 與其餘MQ的區別java

  • **ActiveMQ:**使用Java開發,遵循JMS規範,使用方便,支持多種協議。可是有丟失消息的風險而且速度較慢
  • **RabbitMQ:**使用Erlang開發(用於解決高併發的問題),能夠解決併發問題。可是隻支持AMQP協議且不能動態擴展

2、RabbitMQ的安裝

1. 安裝Erlang環境(這一步參照博客 www.jianshu.com/p/27197d58e…)c++

  • 安裝阿里的yum源(我在安裝的時候下載速度很慢,因此這邊使用阿里的yum源來安裝erlang)算法

    wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repo
    yum-y install make gcc gcc-c++kernel-devel m4 ncurses-devel openssl-devel java-devel  unixODBC-devel
    複製代碼
  • 安裝erlang的yum源shell

    wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
    rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
    rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.asc
    複製代碼
  • 而後就能夠直接安裝erlang了bash

    yum -y install erlang
    複製代碼
  • 經過下面的命令就能夠查看是否安裝完成了服務器

    erl
    複製代碼

2. 安裝RibbitMQ數據結構

  • 下載Rabbit的yum源(下載下來的時候名字很亂,改個名字)併發

    wget https://bintray.com/rabbitmq/rpm/download_file?file_path=rabbitmq-server%2Fv3.7.x%2Fel%2F7%2Fnoarch%2Frabbitmq-server-3.7.14-1.el7.noarch.rpm
    mv download_file\?file_path\=rabbitmq-server%2Fv3.7.x%2Fel%2F7%2Fnoarch%2Frabbitmq-server-3.7.14-1.el7.noarch.rpm rabbitmq-server-3.7.14-el7.noarch.rpm
    複製代碼
  • 安裝app

    yum -y install rabbitmq-server-3.7.14-el7.noarch.rpm
    複製代碼
    • 若是報有依賴須要解決,就直接使用yum下載這個依賴就行了
  • 啓動服務ide

    rabbitmq-server start
    複製代碼
  • 後臺啓動

    rabbitmq-server -datached
    複製代碼
  • 啓動後使用amqp協議,默認在5672端口

3、RabbitMQ初步使用

1. 搭建管理平臺

  • 初步搭建沒有任何插件,咱們使用下面的命令下載並啓用RabbitMQ的管理地址

    rabbitmq-plugins enable rabbitmq_management
    複製代碼
  • 如今就能夠訪問該節點的15672端口使用guest/guest來登錄管理界面

    • 若是不是在localhost下訪問,咱們還須要修改/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.14/ebin/rabbit.app文件,將{loopback_users, [<<」guest」>>]} 改成{loopback_users, []}

      {default_user, <<"guest">>},
      {default_pass, <<"guest">>},
      {default_user_tags, [administrator]},
      {default_vhost, <<"/">>},
      {default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
      {loopback_users, []},
      {password_hashing_module, rabbit_password_hashing_sha256},
      {server_properties, []},
      複製代碼

      再重啓就OK了

    • Virtural Host 用於區分不一樣業務,每一個VH都是獨立的,互不影響的。不一樣的團隊用不一樣的VH,相互隔離

2. 點對點簡單隊列

點對點簡單隊列:一個生產者投遞消息給隊列,只容許一個消費者進行消費,(若是存在消費者集羣,則會均攤消費,使用取模算法)每一個消息只會消費一次

生產者生產的消息直接投遞給隊列服務器,而後隊列服務器直接推送或消費者自行拉取消息

  • ACK應答模式

    • 自動應答:當消費者收到消息後,不管是否處理,消費者都會自動應答消費。
    • 手動應答:消費者在代碼裏顯式的回覆ACK
  • 導入依賴

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>4.0.2</version>
    </dependency>
    複製代碼
  • 封裝一個連接工具

    public class MQConnectionUtils {
        /** * 建立新的連接 * @return */
        public static Connection connect() throws IOException, TimeoutException {
            //建立連接工廠
            ConnectionFactory connectionFactory = new ConnectionFactory();
    
            //設置連接參數
            connectionFactory.setHost("192.168.3.203");
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
    
            return connectionFactory.newConnection();
        }
    }
    複製代碼
  • 生產者

    public class Producer {
        /**隊列名稱*/
        private static final String QUEUE_NAME = "libi_QUEUE";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //創建連接
            Connection connection = MQConnectionUtils.connect();
            //建立通道
            Channel channel = connection.createChannel();
            //建立一個隊列
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //建立消息
            String message = "Libi_Message";
            //發送消息
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            //關閉通道和連接
            channel.close();
            connection.close();
            System.out.println("消息投遞成功!");
        }
    }
    複製代碼
  • 消費者

    public class Consumer {
        /**隊列名稱*/
        private static final String QUEUE_NAME = "libi_QUEUE";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //創建連接
            Connection connection = MQConnectionUtils.connect();
            //建立通道
            Channel channel = connection.createChannel();
            //消費者關聯一個隊列
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                //使用匿名內部類重寫獲取消息的方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    String msg = new String(body, "UTF-8");
                    System.out.println("活動生產者消息:"+msg);
                }
            };
            //設置應答模式,true表示自動應答,false表示手動應答
            channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
            //關閉通道和連接
            channel.close();
            connection.close();
            System.out.println("消息消費成功!");
        }
    }
    複製代碼

3. 公平隊列(須要本身在代碼裏實現)

  • 均攤消費的缺點:當消費者處理消息的能力不一致時,若是仍是均攤處理信息,則會形成資源浪費(對消費慢點節點不公平),須要實現"能者多勞"

  • 公平隊列的實現思路(BaseQos方法):當有n個消費者在上一條消息尚未處理完成時(尚未發送ACK),消息隊列就不會發送下一條消息給它,給另一個消費者。在生產者經過以下代碼開啓Qos

    channel.basicQos(n);
    複製代碼
  • 若是這時消費者在代碼裏忘記應答了,那麼就會陷入阻塞

4.發佈訂閱模式

生產者投遞消息給交換機,交換機根據路由策略轉發到不一樣的隊列服務器中,隊列服務器再給消費者進行消費

  • 交換機策略

    • Direct:直接交換機,一種帶路由功能的交換機,一個隊列會和一個交換機綁定,除此以外再綁定一個routing_key,當消息被髮送的時候,須要指定一個binding_key,這個消息被送達交換機的時候,就會被這個交換機送到指定的隊列裏面去。一樣的一個binding_key也是支持應用到多個隊列中的。

      就是說直接交換機能夠更具生產者的routing_key和消費者的binding_key進行匹配,只有同樣纔會轉發這個消息

    • Fanout:扇形交換機,它所能作的事情很是簡單———廣播消息。扇形交換機會把能接收到的消息所有發送給綁定在本身身上的隊列。由於廣播不須要「思考」,因此扇形交換機處理消息的速度也是全部的交換機類型裏面最快的。

    • Topic:主題交換機,發送到主題交換機上的消息須要攜帶指定規則的routing_key,主題交換機會根據這個規則將數據發送到對應的(多個)隊列上。

      主題交換機的routing_key須要有必定的規則,交換機和隊列的binding_key須要採用*.#.*.....的格式,每一個部分用.分開,其中:

      • *表示一個單詞
      • #表示任意數量(零個或多個)單詞。

      假設有一條消息的routing_keyfast.rabbit.white,那麼帶有這樣binding_key的幾個隊列都會接收這條消息:

    • Handler:首都交換機,首部交換機是忽略routing_key的一種路由方式。路由器和交換機路由的規則是經過Headers信息來交換的,這個有點像HTTPHeaders。將一個交換機聲明成首部交換機,綁定一個隊列的時候,定義一個Hash的數據結構,消息發送的時候,會攜帶一組hash數據結構的信息,當Hash的內容匹配上的時候,消息就會被寫入隊列。

  • 生產者(在發送消息的時候傳入exchange的參數)

    /** * 使用Fanout類型的交換機,交換器轉給發所有的隊列 */
    public class Producer {
        //交換機名稱
        static final String EXCHANGE_NAME = "fanout_destination";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = MQConnectionUtils.connect();
            Channel channel = connection.createChannel();
    
            //綁定交換機
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
            String msg = "my_fanout_meg";
            //發送消息(路由策略爲空串)
            channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
    
            channel.close();
            connection.close();
        }
    }
    複製代碼
  • 消費者

    public class EmailConsumer {
        private static String QUEUE_NAME = "Email_Queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = MQConnectionUtils.connect();
            Channel channel = connection.createChannel();
    
            //消費者聲明隊列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //消費者綁定交換機
            channel.queueBind(QUEUE_NAME, Producer.EXCHANGE_NAME,"");
            //監聽消息
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "UTF-8");
                    System.out.println("郵件消費者:" + msg);
                }
            };
    
            channel.basicConsume(QUEUE_NAME, defaultConsumer);
        }
    }
    複製代碼

    消費者的chanel和connection沒有關閉,能夠多啓動幾個,就會發現全部的消費者均可以收到生產者傳入的信息

相關文章
相關標籤/搜索