RabbitMQ,想說愛你不容易(附詳細安裝教程)

前言

本文講述的只是主要是 RabbitMQ 的入門知識,學習本文主要能夠掌握如下知識點:html

  • MQ 的發展史
  • AMQP 協議
  • Rabbit MQ 的安裝
  • Rabbit MQ 在 Java API 中的使用
  • RabbitMQ 與 SpringBoot 的集成

MQ 的誕生歷史

大部分技術的剛產生時適用範圍都是特定的。好比互聯網的產生,剛開始出現的通訊協議各個產商之間是沒法兼容的,隨着歷史的發展,產生了業內的通訊標準tcp/ip協議,而MQ也是同樣,第一款 MQ 類軟件是由一個在美國印度人 Vivek Ranadive 創辦的一家公司 Teknekron,並實現了世界上第一個消息中間件 The Information Bus(TIB)java

隨着第一款MQ 類軟件TIB的誕生,各大廠商馬上跟進,百花爭鳴,涌現了一批MQ類軟件,好比IBM開發的IBM Wesphere,微軟開發的MSMQ等等,可是正由於標準不統一,這就給咱們使用者帶來了很大的不便,每次切換MQ時都須要重複去實現不一樣的協議和不一樣API的調用。web

2001年,Java語言的老東家Sun公司發佈了一個JMS規範,其在各大產商的MQ上進行了統一封裝,使用者若是須要使用不一樣的MQ只須要選擇不一樣的驅動就能夠了(和咱們使用數據庫驅動一個道理)。JMS規範雖然統一了標準,可是JMS規範卻有一個很大的缺陷就是它是和Java語言進行綁定的,因此依然沒有從根本上解決問題。spring

2004年,AMQP規範出現了,真正作到了跨語言和跨平臺,自此MQ迎來了發展的黃金時代。數據庫

2007年,Rabbit公司基於AMQP規範開發出了一款消息隊列RabbmitMQ。很快的,RabbitMQ就獲得了你們的喜好,被用戶普遍使用。編程

什麼是 MQ

MQ即:Message Queue,稱之爲消息隊列或者消息中間件。MQ的本質是:使用高效可靠的消息傳遞機制來進行與平臺無關的數據傳遞,並基於數據通訊來進行分佈式系統的集成。也就是說MQ主要是用來解決消息的通訊問題,其主要有如下三個特色:vim

  • 一、MQ是一個獨立運行的服務。經過生產者來發送消息,使用消費者來接收消費。
  • 二、內部採用了隊列來進行消息存儲,通常採用的均是先進先出(FIFO)隊列。
  • 三、具備發佈訂閱的模型,消費者能夠根據須要來獲取本身想要的消息。

爲何須要 MQ

Java語言爲例,JDK自己就提供了許多不一樣類型的隊列,那麼爲何還須要用MQ呢?這是由於:api

  • 一、跨語言。各大編程語言內部實現的隊列是和語言綁定的,並且是單機的,在分佈式環境下沒法很好的工做,因此咱們須要能夠單獨部署不依賴於語言的MQ
  • 二、異步解耦。消息隊列能夠實現異步通訊,這樣發送消息方只須要關心消息是否發送成功,而接受消息方只須要關心怎麼處理隊列中的消息,實現了消費和生產者的解耦。
  • 三、流量削峯。由於消息隊列是先進先出,因此若是把須要消費的消息放進隊列,那麼消費者就能夠避免被瞬間大流量擊垮,而是能夠從容的根據本身的能力從隊列中取出消息進行消費。

RabbitMQ

RabbitMQ 中的 Rabbit 是兔子的意思,就是形容跑的和兔子同樣快。其是一款輕量級的,支持多種消息傳遞協議的高可用的消息隊列。RabbitMQ 是由 Erlang 語言編寫的,而 Erlang 語言就是一款天生適合高併發的語言。安全

RabbitMQ 的優點和特性

RabbitMQ 做爲一款很是流行的消息中間件,其有着很是豐富的特性和優點:springboot

  • 高可靠性:RabbitMQ 提供了持久化、發送應答、發佈確認等功能來保證其可靠性。
  • 靈活的路由:經過不一樣的交換機(Exchange)來實現了消息的靈活路由。
  • 集羣與擴展性:多個節點能夠組成一個邏輯上的服務器,支持負載。
  • 高可用性:經過鏡像隊列實現了隊列中數據的複製,保證了在極端狀況下部分節點出現 crash 整個集羣仍然可用。
  • 支持多種協議:RabbitMQ 最初是爲了支持 AMQP 協議而開發的,因此 AMQP 是其核心協議,可是其也支持其餘如:STOMPMOTTHTTP 等協議。
  • 支持多客戶端:RabbitMQ 幾乎支持全部經常使用語言客戶端,如:JavaPythonRubyGo 等。
  • 豐富的插件系統:支持各類豐富的插件擴展,同時也支持自定義插件。好比最經常使用的 RabbitMQ 後臺管理系統就是以插件的形式實現的。

AMQP 模型

AMQP 全稱是:Advanced Message Queuing Protocol。RabitMQ 最核心的協議就是基於 AMQP 模型的 AMQP 協議,AMQP 模型目前最新的版本是 1.0 版本,可是目前官方推薦使用者的最佳版本還是基於 0.9.1 版本的 AMQP 模型,0.9.1 版本在 RabbitMQ官網中也將其稱之爲 AMQP 0-9-1模型。

AMQP 0-9-1(高級消息隊列協議)是一種消息傳遞協議,它容許符合標準的客戶端應用程序與符合標準的消息傳遞中間件代理進行通訊。消息傳遞代理(Broker)從發佈者(Publisher,即發佈消息的應用程序,也稱爲生產者:Producter)接收消息,並將其路由到使用者(消費者:Consumer,即處理消息的應用程序)。

AMQP 0-9-1 模型的核心思想爲:消息被髮布到交換處,一般被比做郵局或郵箱。而後,交換機使用稱爲綁定的規則將消息副本分發到隊列。而後,代理將消息傳遞給訂閱了隊列的使用者,或者使用者根據須要從隊列中獲取/提取消息。

下圖就是一個 AMQP 模型簡圖,理解了這幅圖,那麼就基本理解了 RabbitMQ 的工做模式。

Producer 和 Consumer

Producer 即生產者,通常指的是應用程序客戶端,生產者會產生消息發送給 RabbitMQ,等待消費者進行處理。

Consumer 即消費者,消費者會從特定的隊列中取出消息,進行消費。當消息傳遞給消費者時,消費者會自動通知 BrokerBroker 只有在收到關於該消息的通知時纔會從隊列中徹底刪除該消息。

Connection:我是一個 TCP 長鏈接

生產者發送消息和消費者接收消息以前都必需要和 Broker 創建一個 tcp 長鏈接,才能進行通訊。

Channel:我是被虛構出來的

消息隊列的做用之一就是用來作削峯,因此消息隊列在高併發場景可能會有大量的生產者和消費者,那麼假如每個生產者在發送消息時或者每個消費者在消費消息時都須要不斷的建立和銷燬 tcp 鏈接,那麼這對 Broker 會是一個很大的消耗,爲了下降這個 tcp 鏈接的建立頻率,AMQP 模型引入了 Channel(通道或者信道)。

Channel 是一個虛擬的的鏈接,能夠被認爲是「輕量級的鏈接,其共享同一個 tcp 鏈接」。在同一個 tcp 長鏈接裏面能夠經過建立和銷燬不一樣的 Channel 來減小了建立和銷燬 tcp 鏈接的頻率,從而大大減小了資源的消耗。

客戶端(生產者/消費者)執行的每一個協議操做都發生在通道上。特定 Channel 上的通訊徹底獨立於另外一個 Channel 上的通訊,所以每一個協議方法還攜帶一個Channel ID(又稱通道號)。

Channel 只存在於鏈接的上下文中,不會獨立存在,因此當一個 tcp 鏈接被關閉時,其中全部 Channel 也都會被關閉。

Channel 是線程不安全的,因此對於使用多個線程/進程進行處理的應用程序,須要爲每一個線程/進程建立一個 Channel,而不是共享同一個 Channel

Broker:我只是一個普通的代理商

Broker 直接翻譯成中文就是:中介/代理,因此若是咱們使用的是 RabbitMQ,那麼這個 Broker 就是指的 RabbitMQ 服務端。

Exchange:我只是作一個消息的映射

Echange 即交換機,由於要實現生產者和消費者的多對多關係,因此只有一個隊列是沒法知足要求的,那麼若是有多個隊列,每次咱們發送的消息應該存儲到哪裏呢?交換機就是起到了中間角色的做用,咱們發送消息到交換機上,而後經過交換機發送到對應的隊列,交換機和隊列之間須要提早綁定好對應關係,這樣消息就到了各自指定的隊列內,而後消費者就能夠直接從各自負責的隊列內取出消息進行消費。

Queue:我纔是真正存儲消息的地方

消息發送到 Broker 以後,經過交換機的映射,存儲到指定的 Queue裏面。

VHost:我只是一個命名空間而已

VHost 相似於命名空間,主要做用就是用來隔離數據的,好比咱們由不少個業務系統都須要用到 RabbitMQ,若是是一臺服務器徹底能夠知足要求,那就不必安裝多個 RabbitMQ 了,這時候就能夠定義不一樣的 VHost ,不一樣的 VHost 就能夠實現各個業務系統間的數據隔離。

RabbitMQ 的安裝

RabbitMQ 是用 Erlang 語言開發的,因此在安裝 RabbitMQ 以前,須要先安裝 ErlangRabbitMQErlang 之間有版本對應關係,這個須要注意,本文以 Erlang 21.3RabbitMQ3.8.4 爲例進行安裝 。

  1. 安裝 Erlang
yum -y install gcc glibc-devel make ncurses-devel openssl-devel xmlto perl wget //提早安裝一些依賴,我的電腦依賴不一樣,可根據實際狀況選擇未安裝的依賴進行安裝
wget http://erlang.org/download/otp_src_21.3.tar.gz # 下載(也能夠下載好傳到服務器)
tar -xvf otp_src_21.3.tar.gz //解壓
mkdir erlang //在指定目錄,如/usr/local下建立erlang目錄
cd otp_src_21.3 //切換到解壓後的目錄
./configure --prefix=/usr/local/erlang //編譯(路徑根據實際狀況選擇)
make && make install //安裝
  1. 配置 Erlang 環境變量:
vim /etc/profile  //編輯環境變量文件(CentOS系統默認環境變量文件,其餘系統可能不同)
export PATH=$PATH:/usr/local/erlang/bin //在末尾加入環境變量配置(路徑根據實際狀況選擇)
source /etc/profile //實時生效
  1. 輸入 erl 驗證 Erlang 是否安裝成功。若是出現以下顯示版本號的界面則說明安裝成功(能夠輸入 halt(). 命令進行退出):

  1. 安裝 RabbitMQ
wget https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.8.4/rabbitmq-server-generic-unix-3.8.4.tar.xz  //下載RabbitMQ
xz -d rabbitmq-server-generic-unix-3.8.4.tar.xz //解壓
tar -xvf rabbitmq-server-generic-unix-3.8.4.tar //解壓
  1. 一樣的,這裏須要進行環境變量配置:
vim /etc/profile  //編輯環境變量文件(CentOS系統默認環境變量文件,其餘系統可能不同)
export PATH=$PATH:/usr/local/rabbitmq_server-3.8.4/sbin //在末尾加入環境變量配置(路徑根據實際狀況選擇)
source /etc/profile //實時生效
  1. 啓動 RabbitMQ ,默認端口爲 6752
/usr/local/rabbitmq_server-3.8.4/sbin/./rabbitmq-server -detached  //在後臺啓動。根據本身實際路徑選擇,或者也能夠選擇service或者systemctl等命令啓動
  1. 若是沒有報錯則說明啓動成功,啓動以後默認會建立一個 guest/guest 帳戶,只能本地鏈接,因此還須要再從新建立一個用戶,並給新用戶受權(固然,咱們也能夠直接給 guest 用戶受權):
./rabbitmqctl add_user admin 123456   //建立用戶admin
./rabbitmqctl set_user_tags admin administrator  //添加標籤
./rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"  //受權
  1. RabbitMQ 默認還提供了可視化管理界面,須要手動開啓一下,默認端口爲 15672
./rabbitmq-plugins enable rabbitmq_management //啓動後臺管理系統插件(禁止的話換成disable便可)
  1. 開啓插件以後,能夠經過訪問:http://ip:15672/ 訪問後臺管理系統,並進行一些參數設置,帳號密碼就是上面添加的 admin/123456

安裝過程常見錯誤

安裝過程當中可能會出現以下圖所示錯誤:

  1. odbc:ODBC library - link check failed:

    解決方法:執行命令 yum install unixODBC.x86_64 unixODBC-devel.x86_64 進行安裝。

  2. wx:wxWidgets not found, wx will NOT be usable:

    解決方法:這個屬於 APPLOICATION INFORMATION ,能夠不處理。

  3. fakefop to generate placeholder PDF files,documentation: fop is missing.Using fakefop to generate placeholder PDF files:

    解決方法:執行命令 yum install fop.noarch 進行安裝。

利用 Java API 實現一個生產者和消費者

接下來用 Java 原生的 API 來實現一個簡單的生產者和消費者:

  1. pom.xml 文件引入RabbitMQ 客戶端依賴:
<dependency>
     <groupId>com.rabbitmq</groupId>
     <artifactId>amqp-client</artifactId>
     <version>5.6.0</version>
  </dependency>
  1. 新建一個消費者 TestRabbitConsumer 類:
package com.lonelyWolf.rabbitmq;

import com.rabbitmq.client.*;
import java.io.IOException;

public class TestRabbitConsumer {
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://admin:123456@ip:5672");
        Connection conn = factory.newConnection();//創建鏈接
       
        Channel channel = conn.createChannel(); //建立消息通道

        channel.queueDeclare("TEST_QUEUE", false, false, false, null);//聲明隊列
        System.out.println("正在等待接收消息...");

        Consumer consumer = new DefaultConsumer(channel) {//建立消費者
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
                System.out.println("收到消息: " + new String(body, "UTF-8") + ",當前消息ID爲:" + properties.getMessageId());
                System.out.println("收到自定義屬性:"+ properties.getHeaders().get("name"));
            }
        };
        channel.basicConsume("TEST_QUEUE", true, consumer);//消費以後,回調給consumer
    }
}
  1. 新建一個生產者 TestRabbitProducter類:
package com.lonelyWolf.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

public class TestRabbitProducter {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://admin:123456@ip:5672");
        Connection conn = factory.newConnection();// 創建鏈接
        
        Channel channel = conn.createChannel();//建立消息通道
        Map<String, Object> headers = new HashMap<String, Object>(1);
        headers.put("name", "雙子孤狼");//能夠自定義一些自定義的參數和消息一塊兒發送過去

        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .contentEncoding("UTF-8") //編碼
                .headers(headers) //自定義的屬性
                .messageId(String.valueOf(UUID.randomUUID()))//消息id
                .build();

        String msg = "Hello, RabbitMQ";//須要發送的消息
        channel.queueDeclare("TEST_QUEUE", false, false, false, null); //聲明隊列
        channel.basicPublish("", "TEST_QUEUE", properties, msg.getBytes());//發送消息
        channel.close();
        conn.close();
    }
}
  1. 先啓動消費者,啓動以後消費者就會保持和 RabbitMQ 的鏈接,等待消息;而後再運行生產者,消息發送以後,消費者就能夠收到消息:

利用SpringBoot 實現一個生產者和消費者

接下來再看看 SpringBoot 怎麼與 RabbitMQ 集成並實現一個簡單的生產者和消費者:

  1. 引入依賴(我這邊 SpringBoot 用的是 2.4.0 版本,因此若是用的低版本這個版本號也須要修改):
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.4.0</version>
</dependency>
  1. 新增如下配置文件:
spring:
    rabbitmq:
        host: ip
        port: 5672
        username: admin
        password: 123456
  1. 新建一個配置文件 RabbitConfig 類,建立一個隊列:
package com.lonely.wolf.rabbit.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    
    @Bean("simpleRabbitQueue")
    public Queue getFirstQueue(){
        Queue queue = new Queue("SIMPLE_QUEUE");
        return queue;
    }
}
  1. 新建一個消費者 SimpleConsumer 類(注意這裏監聽的名字要和上面定義的保持一致):
package com.lonely.wolf.rabbit.consumer;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@RabbitListener(queues = "SIMPLE_QUEUE")
@Component
public class SimpleConsumer {
    @RabbitHandler
    public void process(String msg){
        System.out.println("收到消息:" + msg);
    }
}
  1. 新建一個消息發送者 HelloRabbitController 類(發送消息的隊列名要和消費者監聽的隊列名一致,不然沒法收到消息),運行以後調用對應接口,消費者類 SimpleConsumer 就能夠收到消息:
package com.lonely.wolf.rabbit.controller;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/hello")
public class HelloRabbitController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping(value="/send")
    public String clearVipInfo(@RequestParam(value = "msg",defaultValue = "no message") String msg){
        rabbitTemplate.convertAndSend("SIMPLE_QUEUE",msg);
        return "succ";
    }
}

總結

本文主要簡單講述了 MQ 的發展歷史,並介紹了爲何要使用 MQMQ 能解決什麼問題,緊接着重點介紹了 AMQP 0.9.1 模型。掌握了 AMQP 模型就基本掌握了 RabbitMQ 的工做原理,最後咱們經過 JAVA APISpringBoot 兩個例子介紹瞭如何使用 RabbitMQ

相關文章
相關標籤/搜索