Rabbitmq 簡單介紹,安裝和go客戶端使用

Rabbitmq 簡單介紹,安裝和go客戶端使用

1,消息隊列介紹

1.1 什麼是消息隊列?

消息隊列(英語:Message queue)是一種進程間通訊或同一進程的不一樣線程間的通訊方式,軟件的貯列用來處理一系列的輸入,一般是來自用戶。消息隊列提供了異步的通訊協議,每個貯列中的紀錄包含詳細說明的數據,包含發生的時間,輸入設備的種類,以及特定的輸入參數,也就是說:消息的發送者和接收者不須要同時與消息隊列互交。消息會保存在隊列中,直到接收者取回它。
消息隊列,通常咱們會簡稱他爲MQ(Message Queue),消息隊列能夠簡單的理解爲:把要傳輸的數據放在隊列中
image.pnghtml

說明:node

  • Producer:消息生產者,負責產生和發送消息到 Broker;
  • Broker:消息處理中心。負責消息存儲、確認、重試等,通常其中會包含多個 queue;
  • Consumer:消息消費者,負責從 Broker 中獲取消息,並進行相應處理;

1.2, 爲何要用消息隊列?

1.2.1 應用結偶

好比在咱們如今公司的業務常見中:
1,給客戶打完電話以後咱們須要根據通話錄音進行打標籤;
2,給客戶打完電話以後咱們須要給他發送短信
3,給客戶打完電話以後咱們須要發送他的通話給機器人,讓機器人自學習
簡單架構圖以下:
image.pnggit

若是沒有消息隊列,在A服務裏面要寫上3個API接口分別對應後面三個服務,忽然有一天這個客戶說我不須要發短信功能了,若是按照上面這種方式,咱們就須要聯繫開發開始刪代碼,而後升級,剛升級好沒幾天,客戶說我有要這個功能,那開發又要升級代碼,這個時候開發集體離職了,(這天天干的徹底是無用功)
可是若是有消息隊列那就徹底不同了,就會變成下面這個樣子:
image.pnggithub

A只須要寫一個接口對接MQ了,後面不論是添加和刪除都對A沒有影響了,刪除直接取消去消息就好了,大大減小了開發人員的工做量web

1.2.2 異步處理

還拿上面那個場景來簡述這個:A是公司的主要業務,打電話業務,BCD爲非主要業務。
假設A調用BCD 接口須要50ms,那等A把全部接口調用完成以後須要150ms,對主業務消耗特別大,若是咱們不用搭理BCD的處理,A直接把他交給消息隊列,有消息隊列去處理BCD,A只要把數據給消息隊列就好了,那麼A的壓力就會很小,也不會影響主要業務流程,提升用戶的docker

1.2.3 流量削峯

打個比方,咱們目前有A B兩個服務,A服務的OPS峯值爲100W,可是B服務的OPS峯值只有10w,這個時候來了90w個請求,A服務能處理過來沒問題,可是這個時候B服務直接就崩潰了
image.png編程

若是這個時候咱們在A和B之間加一個rabbitmq,咱們讓B每次去取9w,這樣B服務就不會掛了,瀏覽器

1.3,消費者怎麼獲得消息隊列的數據?

  • 生產者將數據放到消息隊列中,消息隊列有數據了,主動叫消費者去拿(俗稱push)
  • 消費者不斷去輪訓消息隊列,看看有沒有新的數據,若是有就消費(俗稱pull)

2, RabbitMQ消息隊列

2.1 RabbitMQ的優勢

2.1.1 可靠性

RabbitMQ提供了多種技術可讓你在性能和可靠性之間進行權衡。這些技術包括持久性機制、投遞確認、發佈者證明和高可用性機制。bash

2.1.2 靈活的路由

消息在到達隊列前是經過交換機進行路由的。RabbitMQ爲典型的路由邏輯提供了多種內置交換機類型。若是你有更復雜的路由需求,能夠將這些交換機組合起來使用,你甚至能夠實現本身的交換機類型,而且當作RabbitMQ的插件來使用。服務器

2.1.3 集羣

在相同局域網內的多個RabbitMQ服務器能夠聚合在一塊兒,做爲一個獨立的邏輯代理來使用

2.1.4 聯合

對於服務器來講,他比集羣須要更多的鬆散和非可靠連接,爲此RabbitMQ提供了聯合模型

2.1.5 高可用的隊列

在同一個集羣裏,隊列能夠被鏡像到多個機器中,以確保當前某些硬件出現故障後,你的消息仍然能夠被使用

2.1.6 多協議

RabbitMQ支持多種消息協議的消息傳遞

2.1.7 普遍的客戶端

只要是你能想到的編程語言幾乎都有與其相適配的RabbitMQ客戶端。

2.1.8 可視化管理工具

RabbitMQ附帶了一個易於使用的可視化管理工具,它能夠幫助你監控消息代理的每個環節。

2.1.9 追蹤

若是你的消息系統有異常行爲,RabbitMQ還提供了追蹤的支持,讓你可以發現問題所在。

2.1.10 插件系統

RabbitMQ附帶了各類各樣的插件來對本身進行擴展。你甚至也能夠寫本身的插件來使用。

2.2 RabbitMQ 的概念模型

全部 MQ 產品從模型抽象上來講都是同樣的過程:
消費者(consumer)訂閱某個隊列。生產者(producer)建立消息,而後發佈到隊列(queue)中,最後將消息發送到監聽的消費者。
image.png

2.3 RabbitMQ基本概念

RabbitMQ基本流程圖:
image.png

2.3.1 Message

消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其餘消息的優先權)、delivery-mode(指出該消息可能須要持久性存儲)等。

2.3.2 Publisher

消息的生產者,也是一個向交換器發佈消息的客戶端應用程序。

2.3.3 Exchange

交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。

2.3.4 Binding

綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列鏈接起來的路由規則,因此能夠將交換器理解成一個由綁定構成的路由表。

2.3.5 Queue

消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列裏面,等待消費者鏈接到這個隊列將其取走。

2.3.6 Connection

網絡鏈接,好比一個TCP鏈接。

2.3.7 Channel

信道,多路複用鏈接中的一條獨立的雙向數據流通道。信道是創建在真實的TCP鏈接內地虛擬鏈接,AMQP 命令都是經過信道發出去的,不論是發佈消息、訂閱隊列仍是接收消息,這些動做都是經過信道完成。由於對於操做系統來講創建和銷燬 TCP 都是很是昂貴的開銷,因此引入了信道的概念,以複用一條 TCP 鏈接。

2.3.8 Consumer

消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。

2.3.9 Virtual Host

虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每一個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有本身的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在鏈接時指定,RabbitMQ 默認的 vhost 是 / 。

2.3.10 Broker

表示消息隊列服務器實體。

2.4 安裝RabbitMQ

2.4.1 Docker 安裝RabbitMQ

注意獲取鏡像的時候要獲取management版本的,不要獲取last版本的,management版本的才帶有管理界面。
咱們訪問docker鏡像倉庫而後查找RabbitMQ的鏡像
找到咱們合適的鏡像版本
image.png

咱們使用命令拉去

# docker pull rabbitmq:3.8-rc-management
# docker images
REPOSITORY                                                       TAG                 IMAGE ID            CREATED             SIZE
rabbitmq                                                         3.8-rc-management   90cce17c1af8        2 days ago          179MB

啓動RabbitMQ:

docker run -d --hostname my-rabbit --name some-rabbit --net host -e RABBITMQ_DEFAULT_USER=zsf -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq:3.8-rc-management

參數解釋:

--hostname 指定docker容器內部的名稱
--name。    指定docker容器的名稱
--net host。容器內部使用主機的網絡
-e RABBITMQ_DEFAULT_USER 設置rabbitmq管理界面的用戶
-e RABBITMQ_DEFAULT_PASS 設置rabbitmq管理界面用戶的密碼

查看日誌是否啓動成功:

# docker logs -f some-rabbit
2019-09-12 01:25:40.742 [info] <0.8.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.742 [info] <0.8.0> Feature flags:   [ ] drop_unroutable_metric
2019-09-12 01:25:40.742 [info] <0.8.0> Feature flags:   [ ] empty_basic_get_metric
2019-09-12 01:25:40.742 [info] <0.8.0> Feature flags:   [ ] implicit_default_bindings
2019-09-12 01:25:40.742 [info] <0.8.0> Feature flags:   [ ] quorum_queue
2019-09-12 01:25:40.743 [info] <0.8.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.743 [info] <0.8.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.757 [info] <0.261.0> ra: meta data store initialised. 0 record(s) recovered
2019-09-12 01:25:40.757 [info] <0.266.0> WAL: recovering []
2019-09-12 01:25:40.758 [info] <0.270.0>
 Starting RabbitMQ 3.8.0-rc.1 on Erlang 22.0.7
 Copyright (C) 2007-2019 Pivotal Software, Inc.
 Licensed under the MPL.  See https://www.rabbitmq.com/

  ##  ##
  ##  ##      RabbitMQ 3.8.0-rc.1. Copyright (C) 2007-2019 Pivotal Software, Inc.
  ##########  Licensed under the MPL.  See https://www.rabbitmq.com/
  ######  ##
  ##########  Logs: <stdout>

              Starting broker...
2019-09-12 01:25:40.758 [info] <0.270.0>
 node           : rabbit@my-rabbit
 home dir       : /var/lib/rabbitmq
 config file(s) : /etc/rabbitmq/rabbitmq.conf
 cookie hash    : mhZwmGv/TzyC2kVekZ7Yvg==
 log(s)         : <stdout>
 database dir   : /var/lib/rabbitmq/mnesia/rabbit@my-rabbit
2019-09-12 01:25:40.768 [info] <0.270.0> Running boot step pre_boot defined by app rabbit
2019-09-12 01:25:40.768 [info] <0.270.0> Running boot step rabbit_core_metrics defined by app rabbit
2019-09-12 01:25:40.769 [info] <0.270.0> Running boot step rabbit_alarm defined by app rabbit
2019-09-12 01:25:40.771 [info] <0.276.0> Memory high watermark set to 11998 MiB (12581668454 bytes) of 29997 MiB (31454171136 bytes) total
2019-09-12 01:25:40.774 [info] <0.278.0> Enabling free disk space monitoring
2019-09-12 01:25:40.774 [info] <0.278.0> Disk free limit set to 50MB
2019-09-12 01:25:40.776 [info] <0.270.0> Running boot step code_server_cache defined by app rabbit
2019-09-12 01:25:40.776 [info] <0.270.0> Running boot step file_handle_cache defined by app rabbit
2019-09-12 01:25:40.776 [info] <0.281.0> Limiting to approx 1048476 file handles (943626 sockets)
2019-09-12 01:25:40.776 [info] <0.282.0> FHC read buffering:  OFF
2019-09-12 01:25:40.776 [info] <0.282.0> FHC write buffering: ON
2019-09-12 01:25:40.777 [info] <0.270.0> Running boot step worker_pool defined by app rabbit
2019-09-12 01:25:40.777 [info] <0.271.0> Will use 4 processes for default worker pool
2019-09-12 01:25:40.777 [info] <0.271.0> Starting worker pool 'worker_pool' with 4 processes in it
2019-09-12 01:25:40.777 [info] <0.270.0> Running boot step database defined by app rabbit
2019-09-12 01:25:40.777 [info] <0.270.0> Node database directory at /var/lib/rabbitmq/mnesia/rabbit@my-rabbit is empty. Assuming we need to join an existing cluster or initialise from scratch...
2019-09-12 01:25:40.777 [info] <0.270.0> Configured peer discovery backend: rabbit_peer_discovery_classic_config
2019-09-12 01:25:40.777 [info] <0.270.0> Will try to lock with peer discovery backend rabbit_peer_discovery_classic_config
2019-09-12 01:25:40.777 [info] <0.270.0> Peer discovery backend does not support locking, falling back to randomized delay
2019-09-12 01:25:40.777 [info] <0.270.0> Peer discovery backend rabbit_peer_discovery_classic_config does not support registration, skipping randomized startup delay.
2019-09-12 01:25:40.777 [info] <0.270.0> All discovered existing cluster peers:
2019-09-12 01:25:40.777 [info] <0.270.0> Discovered no peer nodes to cluster with
2019-09-12 01:25:40.779 [info] <0.43.0> Application mnesia exited with reason: stopped
2019-09-12 01:25:40.807 [info] <0.270.0> Waiting for Mnesia tables for 30000 ms, 9 retries left
2019-09-12 01:25:40.825 [info] <0.270.0> Waiting for Mnesia tables for 30000 ms, 9 retries left
2019-09-12 01:25:40.825 [info] <0.270.0> Feature flag `drop_unroutable_metric`: supported, attempt to enable...
2019-09-12 01:25:40.825 [info] <0.270.0> Feature flag `drop_unroutable_metric`: mark as enabled=state_changing
2019-09-12 01:25:40.834 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.834 [info] <0.270.0> Feature flags:   [~] drop_unroutable_metric
2019-09-12 01:25:40.834 [info] <0.270.0> Feature flags:   [ ] empty_basic_get_metric
2019-09-12 01:25:40.834 [info] <0.270.0> Feature flags:   [ ] implicit_default_bindings
2019-09-12 01:25:40.834 [info] <0.270.0> Feature flags:   [ ] quorum_queue
2019-09-12 01:25:40.834 [info] <0.270.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.834 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.841 [info] <0.270.0> Feature flag `drop_unroutable_metric`: mark as enabled=true
2019-09-12 01:25:40.850 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.850 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.850 [info] <0.270.0> Feature flags:   [ ] empty_basic_get_metric
2019-09-12 01:25:40.850 [info] <0.270.0> Feature flags:   [ ] implicit_default_bindings
2019-09-12 01:25:40.850 [info] <0.270.0> Feature flags:   [ ] quorum_queue
2019-09-12 01:25:40.850 [info] <0.270.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.850 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.857 [info] <0.270.0> Feature flag `empty_basic_get_metric`: supported, attempt to enable...
2019-09-12 01:25:40.857 [info] <0.270.0> Feature flag `empty_basic_get_metric`: mark as enabled=state_changing
2019-09-12 01:25:40.866 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.866 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.866 [info] <0.270.0> Feature flags:   [~] empty_basic_get_metric
2019-09-12 01:25:40.866 [info] <0.270.0> Feature flags:   [ ] implicit_default_bindings
2019-09-12 01:25:40.866 [info] <0.270.0> Feature flags:   [ ] quorum_queue
2019-09-12 01:25:40.866 [info] <0.270.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.866 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.873 [info] <0.270.0> Feature flag `empty_basic_get_metric`: mark as enabled=true
2019-09-12 01:25:40.883 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.883 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.883 [info] <0.270.0> Feature flags:   [x] empty_basic_get_metric
2019-09-12 01:25:40.883 [info] <0.270.0> Feature flags:   [ ] implicit_default_bindings
2019-09-12 01:25:40.883 [info] <0.270.0> Feature flags:   [ ] quorum_queue
2019-09-12 01:25:40.883 [info] <0.270.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.883 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.890 [info] <0.270.0> Feature flag `implicit_default_bindings`: supported, attempt to enable...
2019-09-12 01:25:40.890 [info] <0.270.0> Feature flag `implicit_default_bindings`: mark as enabled=state_changing
2019-09-12 01:25:40.899 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.899 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.899 [info] <0.270.0> Feature flags:   [x] empty_basic_get_metric
2019-09-12 01:25:40.899 [info] <0.270.0> Feature flags:   [~] implicit_default_bindings
2019-09-12 01:25:40.899 [info] <0.270.0> Feature flags:   [ ] quorum_queue
2019-09-12 01:25:40.899 [info] <0.270.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.899 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.906 [info] <0.270.0> Waiting for Mnesia tables for 30000 ms, 0 retries left
2019-09-12 01:25:40.906 [info] <0.270.0> Feature flag `implicit_default_bindings`: mark as enabled=true
2019-09-12 01:25:40.916 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.916 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.916 [info] <0.270.0> Feature flags:   [x] empty_basic_get_metric
2019-09-12 01:25:40.916 [info] <0.270.0> Feature flags:   [x] implicit_default_bindings
2019-09-12 01:25:40.916 [info] <0.270.0> Feature flags:   [ ] quorum_queue
2019-09-12 01:25:40.916 [info] <0.270.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.916 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.922 [info] <0.270.0> Feature flag `quorum_queue`: supported, attempt to enable...
2019-09-12 01:25:40.922 [info] <0.270.0> Feature flag `quorum_queue`: mark as enabled=state_changing
2019-09-12 01:25:40.931 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.931 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.931 [info] <0.270.0> Feature flags:   [x] empty_basic_get_metric
2019-09-12 01:25:40.931 [info] <0.270.0> Feature flags:   [x] implicit_default_bindings
2019-09-12 01:25:40.931 [info] <0.270.0> Feature flags:   [~] quorum_queue
2019-09-12 01:25:40.931 [info] <0.270.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.932 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.938 [info] <0.270.0> Waiting for Mnesia tables for 30000 ms, 9 retries left
2019-09-12 01:25:40.939 [info] <0.270.0> Feature flag `quorum_queue`:   migrating Mnesia table rabbit_queue...
2019-09-12 01:25:40.943 [info] <0.270.0> Feature flag `quorum_queue`:   migrating Mnesia table rabbit_durable_queue...
2019-09-12 01:25:40.947 [info] <0.270.0> Feature flag `quorum_queue`:   Mnesia tables migration done
2019-09-12 01:25:40.947 [info] <0.270.0> Feature flag `quorum_queue`: mark as enabled=true
2019-09-12 01:25:40.956 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.956 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.956 [info] <0.270.0> Feature flags:   [x] empty_basic_get_metric
2019-09-12 01:25:40.956 [info] <0.270.0> Feature flags:   [x] implicit_default_bindings
2019-09-12 01:25:40.956 [info] <0.270.0> Feature flags:   [x] quorum_queue
2019-09-12 01:25:40.956 [info] <0.270.0> Feature flags:   [ ] virtual_host_metadata
2019-09-12 01:25:40.956 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.963 [info] <0.270.0> Feature flag `virtual_host_metadata`: supported, attempt to enable...
2019-09-12 01:25:40.963 [info] <0.270.0> Feature flag `virtual_host_metadata`: mark as enabled=state_changing
2019-09-12 01:25:40.972 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.972 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.972 [info] <0.270.0> Feature flags:   [x] empty_basic_get_metric
2019-09-12 01:25:40.972 [info] <0.270.0> Feature flags:   [x] implicit_default_bindings
2019-09-12 01:25:40.972 [info] <0.270.0> Feature flags:   [x] quorum_queue
2019-09-12 01:25:40.972 [info] <0.270.0> Feature flags:   [~] virtual_host_metadata
2019-09-12 01:25:40.972 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.979 [info] <0.270.0> Waiting for Mnesia tables for 30000 ms, 9 retries left
2019-09-12 01:25:40.983 [info] <0.270.0> Feature flag `virtual_host_metadata`: mark as enabled=true
2019-09-12 01:25:40.992 [info] <0.270.0> Feature flags: list of feature flags found:
2019-09-12 01:25:40.992 [info] <0.270.0> Feature flags:   [x] drop_unroutable_metric
2019-09-12 01:25:40.992 [info] <0.270.0> Feature flags:   [x] empty_basic_get_metric
2019-09-12 01:25:40.992 [info] <0.270.0> Feature flags:   [x] implicit_default_bindings
2019-09-12 01:25:40.992 [info] <0.270.0> Feature flags:   [x] quorum_queue
2019-09-12 01:25:40.992 [info] <0.270.0> Feature flags:   [x] virtual_host_metadata
2019-09-12 01:25:40.992 [info] <0.270.0> Feature flags: feature flag states written to disk: yes
2019-09-12 01:25:40.999 [info] <0.270.0> Waiting for Mnesia tables for 30000 ms, 9 retries left
2019-09-12 01:25:41.018 [info] <0.270.0> Waiting for Mnesia tables for 30000 ms, 9 retries left
2019-09-12 01:25:41.018 [info] <0.270.0> Peer discovery backend rabbit_peer_discovery_classic_config does not support registration, skipping registration.
2019-09-12 01:25:41.018 [info] <0.270.0> Running boot step database_sync defined by app rabbit
2019-09-12 01:25:41.018 [info] <0.270.0> Running boot step feature_flags defined by app rabbit
2019-09-12 01:25:41.018 [info] <0.270.0> Running boot step codec_correctness_check defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step external_infrastructure defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_registry defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_auth_mechanism_cr_demo defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_queue_location_random defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_event defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_auth_mechanism_amqplain defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_auth_mechanism_plain defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_exchange_type_direct defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_exchange_type_fanout defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_exchange_type_headers defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_exchange_type_topic defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_mirror_queue_mode_all defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_mirror_queue_mode_exactly defined by app rabbit
2019-09-12 01:25:41.019 [info] <0.270.0> Running boot step rabbit_mirror_queue_mode_nodes defined by app rabbit
2019-09-12 01:25:41.020 [info] <0.270.0> Running boot step rabbit_priority_queue defined by app rabbit
2019-09-12 01:25:41.020 [info] <0.270.0> Priority queues enabled, real BQ is rabbit_variable_queue
2019-09-12 01:25:41.020 [info] <0.270.0> Running boot step rabbit_queue_location_client_local defined by app rabbit
2019-09-12 01:25:41.020 [info] <0.270.0> Running boot step rabbit_queue_location_min_masters defined by app rabbit
2019-09-12 01:25:41.020 [info] <0.270.0> Running boot step kernel_ready defined by app rabbit
2019-09-12 01:25:41.020 [info] <0.270.0> Running boot step rabbit_sysmon_minder defined by app rabbit
2019-09-12 01:25:41.020 [info] <0.270.0> Running boot step rabbit_epmd_monitor defined by app rabbit
2019-09-12 01:25:41.047 [info] <0.270.0> Running boot step guid_generator defined by app rabbit
2019-09-12 01:25:41.194 [info] <0.270.0> Running boot step rabbit_node_monitor defined by app rabbit
2019-09-12 01:25:41.194 [info] <0.518.0> Starting rabbit_node_monitor
2019-09-12 01:25:41.194 [info] <0.270.0> Running boot step delegate_sup defined by app rabbit
2019-09-12 01:25:41.195 [info] <0.270.0> Running boot step rabbit_memory_monitor defined by app rabbit
2019-09-12 01:25:41.195 [info] <0.270.0> Running boot step core_initialized defined by app rabbit
2019-09-12 01:25:41.195 [info] <0.270.0> Running boot step upgrade_queues defined by app rabbit
2019-09-12 01:25:41.213 [info] <0.270.0> message_store upgrades: 1 to apply
2019-09-12 01:25:41.213 [info] <0.270.0> message_store upgrades: Applying rabbit_variable_queue:move_messages_to_vhost_store
2019-09-12 01:25:41.213 [info] <0.270.0> message_store upgrades: No durable queues found. Skipping message store migration
2019-09-12 01:25:41.213 [info] <0.270.0> message_store upgrades: Removing the old message store data
2019-09-12 01:25:41.213 [info] <0.270.0> message_store upgrades: All upgrades applied successfully
2019-09-12 01:25:41.231 [info] <0.270.0> Running boot step rabbit_connection_tracking defined by app rabbit
2019-09-12 01:25:41.231 [info] <0.270.0> Running boot step rabbit_connection_tracking_handler defined by app rabbit
2019-09-12 01:25:41.231 [info] <0.270.0> Running boot step rabbit_exchange_parameters defined by app rabbit
2019-09-12 01:25:41.231 [info] <0.270.0> Running boot step rabbit_mirror_queue_misc defined by app rabbit
2019-09-12 01:25:41.231 [info] <0.270.0> Running boot step rabbit_policies defined by app rabbit
2019-09-12 01:25:41.232 [info] <0.270.0> Running boot step rabbit_policy defined by app rabbit
2019-09-12 01:25:41.232 [info] <0.270.0> Running boot step rabbit_queue_location_validator defined by app rabbit
2019-09-12 01:25:41.232 [info] <0.270.0> Running boot step rabbit_quorum_memory_manager defined by app rabbit
2019-09-12 01:25:41.232 [info] <0.270.0> Running boot step rabbit_vhost_limit defined by app rabbit
2019-09-12 01:25:41.232 [info] <0.270.0> Running boot step rabbit_mgmt_reset_handler defined by app rabbitmq_management
2019-09-12 01:25:41.232 [info] <0.270.0> Running boot step rabbit_mgmt_db_handler defined by app rabbitmq_management_agent
2019-09-12 01:25:41.232 [info] <0.270.0> Management plugin: using rates mode 'basic'
2019-09-12 01:25:41.233 [info] <0.270.0> Running boot step recovery defined by app rabbit
2019-09-12 01:25:41.233 [info] <0.270.0> Running boot step load_definitions defined by app rabbitmq_management
2019-09-12 01:25:41.233 [info] <0.270.0> Running boot step empty_db_check defined by app rabbit
2019-09-12 01:25:41.233 [info] <0.270.0> Adding vhost '/' (description: 'Default virtual host')
2019-09-12 01:25:41.238 [info] <0.559.0> Making sure data directory '/var/lib/rabbitmq/mnesia/rabbit@my-rabbit/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L' for vhost '/' exists
2019-09-12 01:25:41.241 [info] <0.559.0> Starting message stores for vhost '/'
2019-09-12 01:25:41.241 [info] <0.563.0> Message store "628WB79CIFDYO9LJI6DKMI09L/msg_store_transient": using rabbit_msg_store_ets_index to provide index
2019-09-12 01:25:41.241 [info] <0.559.0> Started message store of type transient for vhost '/'
2019-09-12 01:25:41.242 [info] <0.566.0> Message store "628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent": using rabbit_msg_store_ets_index to provide index
2019-09-12 01:25:41.242 [warning] <0.566.0> Message store "628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent": rebuilding indices from scratch
2019-09-12 01:25:41.242 [info] <0.559.0> Started message store of type persistent for vhost '/'
2019-09-12 01:25:41.243 [info] <0.270.0> Creating user 'zsf'
2019-09-12 01:25:41.244 [info] <0.270.0> Setting user tags for user 'zsf' to [administrator]
2019-09-12 01:25:41.244 [info] <0.270.0> Setting permissions for 'zsf' in '/' to '.*', '.*', '.*'
2019-09-12 01:25:41.245 [info] <0.270.0> Running boot step rabbit_looking_glass defined by app rabbit
2019-09-12 01:25:41.245 [info] <0.270.0> Running boot step rabbit_core_metrics_gc defined by app rabbit
2019-09-12 01:25:41.245 [info] <0.270.0> Running boot step background_gc defined by app rabbit
2019-09-12 01:25:41.245 [info] <0.270.0> Running boot step connection_tracking defined by app rabbit
2019-09-12 01:25:41.246 [info] <0.270.0> Setting up a table for connection tracking on this node: 'tracked_connection_on_node_rabbit@my-rabbit'
2019-09-12 01:25:41.247 [info] <0.270.0> Setting up a table for per-vhost connection counting on this node: 'tracked_connection_per_vhost_on_node_rabbit@my-rabbit'
2019-09-12 01:25:41.247 [info] <0.270.0> Running boot step routing_ready defined by app rabbit
2019-09-12 01:25:41.247 [info] <0.270.0> Running boot step pre_flight defined by app rabbit
2019-09-12 01:25:41.247 [info] <0.270.0> Running boot step notify_cluster defined by app rabbit
2019-09-12 01:25:41.247 [info] <0.270.0> Running boot step networking defined by app rabbit
2019-09-12 01:25:41.295 [info] <0.612.0> started TCP listener on [::]:5672
2019-09-12 01:25:41.318 [info] <0.270.0> Running boot step cluster_name defined by app rabbit
2019-09-12 01:25:41.318 [info] <0.270.0> Running boot step direct_client defined by app rabbit
2019-09-12 01:25:41.359 [info] <0.662.0> Management plugin: HTTP (non-TLS) listener started on port 15672
2019-09-12 01:25:41.359 [info] <0.768.0> Statistics database started.
2019-09-12 01:25:41.359 [info] <0.767.0> Starting worker pool 'management_worker_pool' with 3 processes in it
 completed with 3 plugins.
2019-09-12 01:25:41.439 [info] <0.8.0> Server startup complete; 3 plugins started.
 * rabbitmq_management
 * rabbitmq_management_agent
 * rabbitmq_web_dispatch

查看端口是否啓動徹底:

# netstat -ntalp | grep 5672
tcp        0      0 0.0.0.0:25672           0.0.0.0:*               LISTEN      6423/beam.smp
tcp        0      0 0.0.0.0:15672           0.0.0.0:*               LISTEN      6423/beam.smp
tcp6       0      0 :::5672                 :::*                    LISTEN      6423/beam.smp
//15672 web界面管理接口
//5672  業務端口,客戶端操做鏈接的端口

瀏覽器訪問管理界面
image.png
-w1091

2.5 RabbitMQ使用中的一些概念

在上面的RabbitMQ的基本流程圖裏面咱們能夠看到,RabbitMQ的總體工做流程是,生產者產生數據交給RabbitMQ,而後RabbitMQ經過Exchange更具規則來選擇綁定到那個隊列(Queues)中,而後消費者在到對應的隊列裏面去取數據,咱們下面就來說解下Exchange的四種類型

2.5.1 Exchange的四種類型

2.5.1.1 direct精準匹配

image.png

消息中的路由鍵(routing key)若是和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。路由鍵與隊列名徹底匹配,若是一個隊列綁定到交換機要求路由鍵爲「dog」,則只轉發 routing key 標記爲「dog」的消息,不會轉發「dog.puppy」,也不會轉發「dog.guard」等等。它是徹底匹配、單播的模式。

2.5.1.2 fanout 廣播

image.png

每一個發到 fanout 類型交換器的消息都會分到全部綁定的隊列上去。fanout 交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每一個發送到交換器的消息都會被轉發到與該交換器綁定的全部隊列上。很像子網廣播,每臺子網內的主機都得到了一份複製的消息。fanout 類型轉發消息是最快的。

2.5.1.3 topic 正則匹配

image.png
topic 交換器經過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列須要綁定到一個模式上。它將路由鍵和綁定鍵的字符串切分紅單詞,這些單詞之間用點隔開。它一樣也會識別兩個通配符:符號「#」和符號「」。#匹配0個或多個單詞,匹配很少很多一個單詞。

2.5.1.4 headers 根據頭部匹配(幾乎不用)

2.6 使用go客戶端操做rabbitMQ

2.6.1 生產者代碼

package send

import (
    "log"
    "rabbitmq/src/github.com/streadway/amqp" //根據實際狀況來定
    "strconv"
    "time"
)

//建立一個返回錯誤打印日誌的函數
func FailOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}
func Send() {
    //打開一個鏈接
    conn, err := amqp.Dial("amqp://zsf:123456@172.16.1.194:5672")
    FailOnError(err, "failed to connect to RabbitMQ")
    defer conn.Close()

    //打開一個通道
    ch, err := conn.Channel()
    FailOnError(err, "failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "test number",
        false,
        false,
        false,
        false,
        nil,
    )
    FailOnError(err, "Failed to declare a queue")

    for i := 0; i < 60; i++ {
        body := "hello, ZhangShouFu  " + strconv.Itoa(i)
        err = ch.Publish(
            "",
            q.Name,
            false,
            false,
            amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(body),
            })
        FailOnError(err, "Failed to publish a message")
        time.Sleep(1 * time.Second)
    }

}

2.6.2 消費者代碼

package receive

import (
    "log"
    "rabbitmq/src/github.com/streadway/amqp"
    "rabbitmq/src/send"
    "time"
)

//建立一個返回錯誤打印日誌的函數

func Receive() {
    conn, err := amqp.Dial("amqp://zsf:123456@172.16.1.194:5672/")
    send.FailOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    send.FailOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "test number", // name
        false,         // durable
        false,         // delete when usused
        false,         // exclusive
        false,         // no-wait
        nil,           // arguments
    )
    send.FailOnError(err, "Failed to declare a queue")
    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    send.FailOnError(err, "Failed to register a consumer")

    forever := make(chan bool)
    for i := 0; i < 60; i++ {
        go func() {
            for d := range msgs {
                log.Printf("Received a message: %s", d.Body)
            }
        }()
        time.Sleep(5 * time.Second)
    }
    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

2.6.3 主函數

package main

import (
    "rabbitmq/src/receive"
    "rabbitmq/src/send"
)

func main() {
    send.Send()
    receive.Receive()
}

運行主函數以後,到rabbitmq管理界面能看到咱們剛纔建立的隊列
image.png

參考:
https://www.rabbitmq.com/
https://rabbitmq.mr-ping.com/installation/Installing_on_Debian_Ubuntu.html
https://www.jianshu.com/p/79ca08116d57

相關文章
相關標籤/搜索