RabbitMQ是如何運轉的?

原文: RabbitMQ是如何運轉的?

前言

  以前已經介紹了RabbitMQ交換機模型的相關簡單概念,都是做爲此篇的基礎鋪墊,若是對此篇不懂的能夠先看我上一篇的介紹認識RabbitMQ交換機模型,或者聯繫評論,分享《RabbitMQ實戰指南》電子書給你們,裏面雖然有些許錯誤,但整體仍是很棒的一本書!html

  本文主要介紹RabbitMQ的消息是怎麼產生和經過它是怎麼接收消息的(RabbitMQ如何運轉)、Connection和Channel概念、RabbitMQ的簡單部署、Java代碼簡單實踐三個部分java

  

 


 

 

1、RabbitMQ的運轉流程

  一、生產者流程  

      1) 生產者鏈接到RabbitMQ Broker,創建Connection,開啓信道Channel(Connection與Channel概念下面會介紹)node

      2) 生產者聲明一個交換器,設置相關屬性。shell

      3) 生產者聲明一個隊列並設置相關屬性json

      4) 生產者經過路由鍵將交換器和隊列綁定起來網絡

      5) 生產者發送消息到RabbitMQ Broker,包括路由鍵、交換器信息等app

      6) 相應的交換器根據路由鍵查找匹配的隊列async

      7) 若是找到則消息存入相應隊列中maven

      8) 若是沒找到則根據配置的屬性丟棄或者回退給生產者ide

      9) 關閉信道

      10)關閉鏈接

  二、消費者流程

      1) 消費者鏈接到RabbitMQ Broker,創建Connection,開啓Channel

      2) 消費者向RabbitMQ Broker請求消費相應隊列中消息,可能會設置相應的回調函數。

      3) 等待RabbitMQ Broker迴應並投遞相應隊列中的消息,消費者接收消息。

      4) 消費者確認ack接收到的消息。

      5) RabbitMQ從隊列中刪除相應已經被確認的消息。

      6) 關閉信道。

      7) 關閉鏈接

 

    其實,最主要最很差理解的也就是Connection與Channel這兩個概念,若是隻是光看這些流程會至關不理解,爲何先創建Connection再創建Channel,這兩個又是什麼區別?因此再往下就是介紹Connection與Channel了!

 

2、Connection與Channel概念

  

  一、 Connection:實際就是一條TCP鏈接,TCP一旦創建起來,客戶端緊接着能夠建立AMQP信道。

  二、 Channel:每一個Channel都有惟一的ID,都是創建在Connection上的虛擬鏈接,RabbitMQ處理每條AMQP指令都是經過信道完成的

       

                                (結合兩張圖,更好理解Connection與Channel兩個概念)

  

  三、單TCP複用鏈接與多信道的優點

      1)爲何TCP鏈接只有一條,而每一個生產者都會建立一條惟一的信道呢?想象下,實際狀況,會有不少的生產者生產消息,多個消費者消費消息,那麼就不得不建立多個線程,創建多個TCP鏈接。多個TCP鏈接的創建必然會對操做系統性能消耗較高,也不方便管理。從而選擇一種相似於NIO(非阻塞I/O, Non-blocking I/O)技術是頗有必要的,多信道的在TCP基礎上的創建就是這麼實現的。

      2)每一個線程都有本身的一個信道,複用了Connection的TCP鏈接,信道之間相互獨立,相互保持神祕,節約TCP鏈接資源,固然自己信道的流量很大的話,也能夠建立多個適當的Connection的TCP鏈接,須要根據具體業務狀況制定

   

3、RabbitMQ部署

  主要以Linux CentOS 7舉例部署,

一、準備Erlang環境

安裝運行RabbitMQ以前,先安裝Erlang環境,由於RabbitMQ是relang語言寫的。下載http://www.erlang.org/downloads獲得otp_src_21.2.tar.gz包

 1)解壓到/opt/erlang目錄下,./configure配置生成make make install

[root@hidden]# tar xvf otp_src_21.2.tar.gz 
[root@hidden]# cd otp_src_21.2 
[root@hidden otp_src_21.2]#./configure --prefix=/opt/er1ang

 若是安裝過程出現"No curses library functions found",則須要安裝ncurses

[root@hidden otp_src_21.2]# yum install ncurses-devel

 2)編譯安裝make & make install

[root@hidden otp_src_21.2]# make & make install

 3)修改/etc/profile文件,增長以下語句

ERLANG_HOME=/opt/erlang
export PATH=$PATH:$ERLANG_HOME/bin
export ERLANG_HOME

 4)執行/etc/profile配置文件

[root@hidden otp_src_21.2]# source /etc/profile

 5)測試是否安裝成功

[root@hidden otp_src_21.2]#erl

若是出現以下語句,則說明安裝成功

Erlang/OTP 21 [erts-10.2] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1] [hipe]

Eshell V10.2  (abort with ^G)
1>     

 二、安裝RabbiMQ

 RabbitMQ安裝比Erlang安裝簡單不少,下載generic壓縮包:rabbitmq-server-generic-unix-3.7.11.tar.xz

 1)解壓壓縮到到Erlang同目錄/opt下

[root@hidden]# tar zvxf rabbitmq-server-generic-unix-3.7.11.tar.xz
[root@hidden]# cd /opt
[root@hidden]# mv rabbitmq-server-generic-unix-3.7.11.tar.xz rabbitmq

 2)修改/etc/profile文件,增長以下語句

export PATH=$PATH:/opt/rabbitmq/sbin
export RABBITMQ_HOME=/opt/rabbitmq

 3)執行profile文件,使其生效

[root@hidden otp_src_21.2]# source /etc/profile

 4)修改運行爲守護進程模式

[root@hidden otp_src_21.2]# rabbitmq-server -detached

 5)測試是否安裝成功,出現Status of node rabbit@.........以下語句則說明安裝成功

[root@hidden rabbitmq]# rabbitmqctl status

Status of node rabbit@iz2ze49fh77zgs1rzxo0l7z ...
[{pid,11462},
{running_applications,
[{rabbit,"RabbitMQ","3.7.11"},
{mnesia,"MNESIA CXC 138 12","4.15.5"},
{os_mon,"CPO CXC 138 46","2.4.7"},
{sysmon_handler,"Rate-limiting system_monitor event handler","1.1.0"},
{rabbit_common,
"Modules shared by rabbitmq-server and rabbitmq-erlang-client",
"3.7.11"},
{ranch,"Socket acceptor pool for TCP protocols.","1.7.1"},
{ssl,"Erlang/OTP SSL application","9.1"},
{public_key,"Public key infrastructure","1.6.4"},
{asn1,"The Erlang ASN1 compiler version 5.0.8","5.0.8"},
{inets,"INETS CXC 138 49","7.0.3"},
{recon,"Diagnostic tools for production use","2.3.6"},
{xmerl,"XML parser","1.3.18"},
{jsx,"a streaming, evented json parsing toolkit","2.9.0"},

..........

 三、新增用戶與受權

RabbitMQ默認狀況下用戶和密碼都爲「guest」,但只能經過默認的本地網絡localhost訪問,網絡訪問受限,因此須要再單獨新增用戶授予權限

 1)新增root用戶

 新增用戶名爲root,密碼爲root

[root@hidden rabbitmq]# rabbitmqctl add_user root root

 2)受權root用戶到默認vhost可配置、可讀、可寫權限

[root@hidden rabbitmq]# rabbitmqctl set_permissions -p / root ".*" ".*" ".*"

 3)設置root爲管理員角色

[root@hidden rabbitmq]# rabbitmqctl set_user_tags root administrator

4、Java代碼實踐

首先maven下載jar包:

<!-- rabbitmq-->
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.6.0</version>
    </dependency>

一、生產者類

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author jian
 * @date 2019/2/14
 * @description RabbitMQ測試: 消息服務端
 *
 */
public class RabbitProducer {
    // 路由鍵
    private static final String ROUTING_KEY = "routingkey_demo";
    // 交換機名稱
    private static final String EXCHANGE_NAME = "exchange_demo";
    // 隊列名稱
    private static final String QUEUE_NAME = "queue_demo";
    // RabbitMQ地址
    private static final String IP_ADDRESS = "xxx.xxx.xxx.xxx";
    // RabbitMQ默認端口5672
    private static final int PORT = 5672;

    public static void publicMeesage () {
        // 1)經過鏈接工廠創建複用TCP鏈接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(IP_ADDRESS);
        connectionFactory.setPort(PORT);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");
        try {
            Connection connection = connectionFactory.newConnection();
            // 2)創建多信道
            Channel channel = connection.createChannel();
            // 3)聲明交換器:建立一個direct、持久化、非自動刪除的交換器
            channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
            // 4)聲明隊列:建立一個持久化、非排他的、非自動刪除的隊列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            // 5)將交換器與隊列經過路由鍵綁定
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
            // 6) 發送持久化消息
            String message = "hello world!";
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.out.println("producer published message: " + message);
            // 7)關閉信道
            channel.close();
            // 8)關閉鏈接
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

 

二、消費者類 

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * @author jian
 * @date 2019/2/14
 * @description RabbitMQ測試:消費者消費消息
 */
public class RabbitConsumer {

    // 隊列名稱
    private static final String QUEUE_NAME = "queue_demo";
    // RabbitMQ地址
    private static final String IP_ADDRESS = "xxx.xxx.xxx.xxx";
    // RabbitMQ默認端口5672
    private static final int PORT = 5672;

    public static void recevieMessage() {
        Address[] addresses = new Address[]{
                new Address(IP_ADDRESS, PORT)
        };
        // 1)經過鏈接工廠創建複用TCP鏈接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");
        try {
            // 2)創建鏈接:此處與生產者創建創建鏈接是不一樣的
            Connection connection = connectionFactory.newConnection(addresses);
            // 3) 建立channel信道
            Channel channel = connection.createChannel();
            // 設置客戶端最多接收未被ack消息的個數
            channel.basicQos(64);
            // 4)消費者向RabbitMQ Broker請求消費相應隊列中消息: 有消息就會執行回調函數handleDelivery
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("consumer received message: " + new String(body, "UTF-8"));
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            // 5)消費者確認ack接收 到的消息:自動回覆隊列應答
            channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
            // 等待回調函數執行完畢
            TimeUnit.SECONDS.sleep(5);
            // 6) 關閉信道
            channel.close();
            // 7) 關閉鏈接
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

三、測試類

public class RabbitMQTest {

    public static void main(String[] args) {
        RabbitProducer.publicMeesage();
        RabbitConsumer.recevieMessage();
    }
}

四、測試結果

producer published message: hello world!
consumer received message: hello world!
相關文章
相關標籤/搜索