消息(Message)是指在應用間傳送的數據。消息能夠很是簡單,好比只包含文本字符串,也能夠很複雜,能夠包含嵌入對象。node
消息隊列是一種應用間的通訊方式,消息發送後能夠當即返回,由消息系統來確保消息的可靠傳遞。消息發佈者只管把消息發佈到 MQ 中不用考慮哪一個消費者來取數據,消息使用者只管從MQ 中取消息而不用管是誰發佈的。這樣發佈者和使用者都不用知道對方的存在。python
消息隊列是一種應用間的異步協做機制,那麼何時使用 MQ呢後端
一常見的訂單系統爲例,用戶點擊下單按鈕以後的業務邏輯可能包括:扣減庫存、生成相應單據、發紅包、發短信通知。在業務發展出力這些邏輯可能放在一塊兒同步執行,隨着業務的發展訂單量增加,須要提高系統服務的性能,這時能夠將一些不須要當即生效的操做拆分出來異步執行,好比發紅包、發短信、郵件通知等。這種場景下能夠用 MQ,在下單的主流程(好比扣減庫存、生成相應單據)完成以後發送一條消息到 MQ 讓主流程快速完成,而由另外的單獨線程拉取 MQ 的消息(或者由 MQ 推送消息),當發現 MQ中有發紅包或其餘的消息時,執行相應的業務邏輯。安全
RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。服務器
AMQP:Advanced Message Queue Protocol,高級消息隊列協議。它是應用層協議的一個開放標準,爲面向消息的中間件設計,基於此協議的客戶端與消息中間件可傳遞消息,並不受產品、開發語言等條件的限制。網絡
RabbitMQ 最初起源於金融系統,用與在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現很好。具體特色以下:併發
可靠性(Reliability)app
RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發佈確認。框架
靈活的路由(Flexible Routing)運維
在消息進入隊列以前,經過 Exchange 來路由消息。對於典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實現。針對更復雜的路由功能,能夠將多個 Exchange 綁定在一塊兒,也能夠經過插件實現本身的 Exchange。
消息集羣(Clustering)
多個 RabbitMQ 服務器能夠組成一個集羣,造成一個邏輯 Broker。
高可用(Highly Available Queues)
隊列能夠在集羣中的機器上進行鏡像,使得在部分節點出問題的狀況下隊列仍可使用。
多種協議(Multi-Protocol)
RabbitMQ 支持多種消息隊列協議,如 STOMP、MQTT 等。
多語言客戶端(Many Clients)
RabbitMQ 幾乎支持全部經常使用語言,好比 Python、Java、.NET等。
管理界面(Management UI)
RabbitMQ 提供了一個易用的用戶界面,使得用戶能夠監控和管理消息 Broker 的許多方面。
跟蹤機制(Tracing)
如消息異常,RabbitMQ 提供了消息追蹤機制,使用者能夠找出發生了什麼。
插件機制(Plugin System)
RabbitMQ 提供了許多插件,能夠擴展插件,也能夠本身編寫插件。
全部 MQ 產品從模型抽象上來講都是同樣的過程:
消費者(consumer)訂閱某個隊列。生產者(producer)建立消息,而後發佈到隊列(queue)中,最後將消息發送到監聽的消費之。
上面只是最簡單抽象的描述,具體到 RabbitMQ 則有更詳細的概念須要解釋。上面介紹過 RabbitMQ 是 AMQP 協議的一個開源實現,因此其內部實際上也是 AMQP 中的基本概念:
Message
消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括 routing-key(路由鍵)、priority(相對於其餘消息的優先權)、delivery-mode(指出該消息可能須要持久性存儲)等。。
Publisher
消息的生產者,也是一個向交換器發佈消息的客戶端應用程序。
Exchange
交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。
Binding
綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列鏈接起來的路由規則,因此能夠將交換器理解成一個由綁定構成的路由表。
Queue
消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息能夠投入一個或多個隊列。消息一直在隊列裏面,等待消費者鏈接到這個隊列將其取走。
Connection
網絡鏈接,好比一個 TCP 鏈接。
Channel
信道,多路複用鏈接中的一條獨立的雙向數據流通道。信道是創建在真實的 TCP 鏈接內的虛擬鏈接,AMQP 命令都是經過信道發出去的,無論是發佈消息、訂閱隊列仍是接收消息,這些動做都是經過信道完成。由於對於操做系統來講創建和銷燬 TCP 都是很是昂貴的開銷,因此引入了信道的概念,以複用一條 TCP 鏈接。
Consumer
消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
Virtual Host
虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每一個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有本身的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在鏈接時指定,RabbitMQ 是默認的 vhost。
Broker
表示消息隊列服務器實體。
AMQP 中消息的路由過程增長了 Exchange 和 Binding 的角色。生產者把消息發佈到 Exchange 上,消息最終到達隊列並被消費者接受,而 Binding 決定交換器的消息應該發送到哪一個隊列中。
Exchange 分發消息時根據類型的不一樣分發策略有區別,目前共四種類型:direct、fanout、topic、headers。headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器徹底一致,但性能差不少,目前幾乎用不到了。
消息中的路由鍵(routing key)若是和 Binding 中的 bind key 一致,交換器就將消息發送到相應的隊列中。路由鍵與隊列名徹底匹配,若是一個隊列綁定到交換機要求路由鍵爲‘’dog‘’,則只轉發 routing key 標記爲‘’dog‘’的消息,不會轉發‘’dog.puppy‘’等。這是徹底匹配、單薄的模式。
每一個發到 fanout 類型交換器的消息都會分到全部綁定的隊列上去。fanout 交換器不處理路由鍵,只是簡單地將隊列綁定到交換器上,每一個發送到交換器的消息都會被轉發到與該交換器綁定的全部隊列上。很像子網廣播,每臺子網內的主機都得到一份複製的消息。fanout 類型轉發消息是最快的。
topic 交換器經過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列須要綁定到一個模式上。它將路由鍵和綁定鍵的字符串分紅單詞,這些單詞之間用點隔開。它一樣會識別兩個通配符:符號#
和符號*
。#
匹配0個或多個單詞,*
匹配一個單詞。
找到安裝後的 RabbitMQ 所在目錄下的 sbin 目錄,能夠看到該目錄下有7個以 rabbitmq 開頭的可執行文件,直接執行 rabbitmq-server 便可,下面
啓動會看到一些啓動過程和最後的 completed with 6 plugins,這說明啓動的時候默認加載了6個插件。
後臺啓動
若是想讓RabbitMQ 以守護程序的方式在後臺運行,能夠在啓動的時候加上-detached 參數:
rabbitmq-server -detached
查詢服務器狀態
sbin 目錄下有個很重要的文件叫 rabbitmqctl,它提供了 RabbitMQ 管理須要的幾乎一站式解決方案,絕大部分的運維命令它均可以提供。
查詢 RabbitMQ 服務器的狀態信息能夠用參數 status
rabbitmqctl status
該命令將輸出服務器不少信息,好比 RabbitMQ 和 Erlang的版本、os 名稱、內存等信息。
關閉 RabbitMQ 節點
在 Erlang 中有兩個概念:節點和應用程序。節點就是 Erlang 虛擬機的每一個實例,而多個 Erlang 應用程序能夠運行在同一個節點之上。節點之間能夠進行本地通訊(無論他們是否是運行在同一臺服務器之上)。好比一個運行在節點 A 上的應用程序能夠調用節點 B 上應用程序的方法,就好像調用本地函數同樣。若是應用程序因爲某些緣由崩潰,Erlang 節點會自動嘗試重啓應用程序。
若是要關閉整個 RabbitMQ 節點可使用參數 stop
rabbitmqctl stop
它會和本地節點通訊並指示其乾淨的關閉,也能夠指定關閉不一樣的節點,包括遠程節點,只須要傳入產生參數-n
rabbitmqctl -n rabbit@server.example.com stop
-n node默認 node 名稱是 rabbit@server,若是你的主機名是 server.example.com,那麼 node 名稱就是 rabbit@server.example.com。
關閉 RabbitMQ 應用程序
若是隻想關閉應用程序,同時保持 Erlang 節點運行則可使用stop_app
rabbitmqctl stop_app
啓動 RabbitMQ 節點
rabbitmqctl start_app
重置RabbitMQ 節點
rabbitmqctl reset
查看已聲明的隊列
rabbitmqctl list_queues
查看交換器
rabbitmqctl list_exchanges
該命令還能夠附加參數,好比列出交換器的名稱、類型、是否持久化、是否自動刪除:
rabbitmqctl list_exchanges name type durable auto_delete
查看綁定
rabbitmqctl list_bindings
實際案例:在用戶進行註冊的時候須要進行郵件發送和驗證碼發送,在沒有使用消息隊列以前,這些操做是同步進行的,在用戶量較少的狀況下,是不會影響用戶體驗的,可是一旦用戶量大了起來,仍是使用同步的話會極大地影響用戶體驗,在這種狀況下就可使用消息隊列。
當用戶註冊的時候先直接返回註冊成功頁面,以後將須要發送郵件和驗證碼的任務存在隊列裏面,以後由服務器來消費隊列裏面的任務已達到應用解耦的目的。
在淘寶雙十一或者春運時的12306的用戶訪問量是特別大的,這種狀況下若是全部的請求直接打到服務器上,服務器會直接掛掉。在這種狀況下,可使用動態擴容服務器的數量來處理高併發,可是並非全部時間都須要不少服務器進行處理,爲了成本的考慮能夠採用消息隊列進行處理併發,就是全部的請求並非直接請求服務器,而是先把用戶的請求存儲在消息隊列裏面,以後服務器在慢慢地進行處理請求。
durable=True
,表明隊列持久化properties=pika.BasicProperties(delivery_mode=2)
auto_ack=False, ch.basic_ack(delivery_tag=method.delivery_tag)
# -*- coding: utf-8 -*- # __author__: MUSIBII # __email__ : shaozuanzuan@gmail.com # __file__ : producer.py # __time__ : 2019-04-03 16:56 import pika connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost') ) channel = connection.channel() # 聲明隊列 channel.queue_declare(queue='test', durable=True) channel.basic_publish(exchange='', routing_key='test', body='Hello World!', properties=pika.BasicProperties( delivery_mode=1, # make message persistent )) print('[x] sent "hello world!"') connection.close()
# -*- coding: utf-8 -*- # __author__: MUSIBII # __email__ : shaozuanzuan@gmail.com # __file__ : consumer.py # __time__ : 2019-04-03 16:56 import pika connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost') ) channel = connection.channel() channel.queue_declare(queue='test', durable=True) def callback(ch, method, properties, body): ''' :param ch: 信道 :param method: :param properties: :param body: message :return: ''' print(ch, method, properties, body) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume( queue='test', on_message_callback=callback, auto_ack=False ) channel.start_consuming()
# -*- coding: utf-8 -*- # __author__: MUSIBII # __email__ : shaozuanzuan@gmail.com # __file__ : producer.py # __time__ : 2019-04-03 18:36 import sys import pika connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost') ) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') message = ''.join(sys.argv[1:]) or 'info: hello world!' channel.basic_publish(exchange='logs', routing_key='', body=message) print('[x] sent %r' % message) connection.close()
# -*- coding: utf-8 -*- # __author__: MUSIBII # __email__ : shaozuanzuan@gmail.com # __file__ : consumer.py # __time__ : 2019-04-03 18:41 import pika connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost') ) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') result = channel.queue_declare('', exclusive=True) # exclusive 排他性,惟一性 queue_name = result.method.queue print('queue_name:', queue_name) channel.queue_bind(exchange='logs', queue=queue_name) print('[*] waiting for logs. To exit press CTRL+c') def callback(ch, method, properties, body): print('[x] %r' % body) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True ) channel.start_consuming()
# -*- coding: utf-8 -*- # __author__: MUSIBII # __email__ : shaozuanzuan@gmail.com # __file__ : producer.py # __time__ : 2019-04-03 18:49 import sys import pika connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost') ) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') log_levels = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish( exchange='direct_logs', routing_key=log_levels, body=message ) print('[x] Sent %r:%r' % (log_levels, message)) connection.close()
# -*- coding: utf-8 -*- # __author__: MUSIBII # __email__ : shaozuanzuan@gmail.com # __file__ : consumer.py # __time__ : 2019-04-03 18:53 import sys import pika connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost') ) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') result = channel.queue_declare('', exclusive=True) queue_name = result.method.queue log_levels = sys.argv[1:] if not log_levels: sys.stderr.write('Usage: %s [info] [warning] [error]\n' % sys.argv[0]) for severity in log_levels: channel.queue_bind( exchange='direct_logs', queue=queue_name, routing_key=severity ) print('[*] Waiting for logs. To exit press CTRL+c') def callback(ch, method, properties, body): print('[x] %r:%r' % (method.routing_key, body)) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True ) channel.start_consuming()
# -*- coding: utf-8 -*- # __author__: MUSIBII # __email__ : shaozuanzuan@gmail.com # __file__ : producer.py # __time__ : 2019-04-03 19:07 import sys import pika connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost') ) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic') routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish( exchange='topic_logs', routing_key=routing_key, body=message ) print('[x] Sent %r:%r' % (routing_key, message)) connection.close()
# -*- coding: utf-8 -*- # __author__: MUSIBII # __email__ : shaozuanzuan@gmail.com # __file__ : consumer.py # __time__ : 2019-04-03 19:11 import sys import pika connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost') ) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic') result = channel.queue_declare('', exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write('Usage: %s [binding_key]...\n' % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind( exchange='topic_logs', queue=queue_name, routing_key=binding_key ) print('[*] Waiting for logs. To exit press CTRL+c') def callback(ch, method, properties, body): print('[x %r:%r' % (method.routing_key, body)) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True ) channel.start_consuming()
進行消息傳遞的時候先啓動 consumer,這樣當生產者發送消息的時候能再消費者後端看到消息記錄。接着運行 producer,發佈一條消息,在 consumer 的控制檯能看到接收的消息。
RabbitMQ 最優秀的功能之一就是內建集羣,這個功能設計的目的是容許消費者和生產者在節點崩潰的狀況下繼續運行,以及經過添加更多的節點來線性擴展消息通訊吞吐量。RabbitMQ 內部利用 Erlang 提供的分佈式通訊框架 OTP 來知足上述需求,使客戶端在失去一個 RabbitMQ 節點鏈接的狀況下,依舊可以從新鏈接到集羣中的任何其餘節點繼續生產、消費。
RabbitMQ 會始終記錄如下四種類型的內部元數據:
隊列元數據
包括隊列名稱和它們的屬性,好比是否可持久化、是否自動刪除
交換器元數據
交換器名稱、類型、屬性
綁定元數據
內部是一張表格記錄如何將消息路由到隊列
vhost 元數據
爲 vhost 內部的隊列、交換器、綁定提供命名空間和安全屬性。
在單一節點中,RabbitMQ 會將全部這些信息存儲在內存中,同時將標記爲可持久化的隊列、交換器、綁定存儲在硬盤上。存到硬盤上能夠確保隊列和交換器在節點重啓後可以重建。而在集羣模式下一樣也提供兩種選擇:存到硬盤上(獨立節點的默認設置),存到內存中。
若是在集羣中建立隊列,集羣只會在單個節點而不是全部節點上建立完整的隊列信息(元數據、狀態、內容)。結果是隻有隊列的全部者節點知道有關隊列的全部信息,所以當集羣節點崩潰時,該節點的的隊列和綁定就消失了,而且任何匹配該隊列的綁定的新消息也會丟失。
RabbitMQ 集羣中能夠共享 user、vhost、exchange等,全部的數據和狀態都是必須在全部節點上覆制的,例外就是上面所說的消息隊列。RabbitMQ 節點能夠動態的加入到集羣中。
當在集羣中聲明隊列、交換器、綁定的時候,這些操做會直到全部集羣節點都成功提交元數據變動後才返回。集羣中有內存節點和磁盤節點兩種類型,內存節點雖然不寫入磁盤,可是它的執行比磁盤節點要好。內存節點能夠提供出色的性能,磁盤節點能保障配置信息在節點重啓後仍然可用,那集羣中如何平衡這二者呢?
RabbitMQ 只要求集羣中至少有一個磁盤節點,全部其餘節點能夠是內存節點,當節點加入火離開集羣時,它們必需要將該變動通知到至少一個磁盤節點。若是隻有一個磁盤節點,恰好又是該節點崩潰了,那麼集羣能夠繼續路由消息,但不能建立隊列、建立交換器、建立綁定、添加用戶、更改權限、添加或刪除集羣節點。換句話說集羣中的惟一磁盤節點崩潰的話,集羣仍然能夠運行,但知道該節點恢復,不然沒法更改任何東西。
若是是在一臺機器上同時啓動多個 RabbitMQ 節點來組建集羣的話,只用上面介紹的方式啓動第2、第三個節點將會由於節點名稱和端口衝突致使啓動失敗。因此在每次調用 rabbitmq-server 命令前,設置環境變量 RABBITMQ_NODENAME 和 RABBITMQ_NODE_PORT 來明確指定惟一的節點名稱和端口。下面的例子端口號從5672開始,每一個新啓動的節點都加1,節點也分別命名爲test_rabbit_一、test_rabbit_二、test_rabbit_3。
啓動第一個節點:
RABBITMQ_NODENAME=test_rabbit_1 RABBITMQ_NODE_PORT=5672 ./sbin/rabbitmq-server -detached
啓動第二個節點:
RABBITMQ_NODENAME=test_rabbit_2 RABBITMQ_NODE_PORT=5673 ./sbin/rabbitmq-server -detached
啓動第2個節點前建議將 RabbitMQ 默認激活的插件關掉,不然會存在使用了某個插件的端口號衝突,致使節點啓動不成功。
如今第2個節點和第1個節點都是獨立節點,它們並不知道其餘節點的存在。集羣中除第一個節點外後加入的節點須要獲取集羣中的元數據,因此要先中止 Erlang 節點上運行的 RabbitMQ 應用程序,並重置該節點元數據,再加入而且獲取集羣的元數據,最後從新啓動 RabbitMQ 應用程序。
中止第二個節點的應用程序:
rabbitmqctl -n test_rabbit_2 stop_app
重置第二個節點元數據:
rabbitmqctl -n test_rabbit_2 reset
第二個節點加入到第一個節點組成的集羣:
rabbitmqctl -n test_rabbit_2 join_cluster test_rabbit_1@localhost
啓動第二個節點的應用程序
rabbitmqctl -n test_rabbit_2 start_app
第三個節點的配置相似
RABBITMQ_NODENAME=test_rabbit_3 RABBITMQ_NODE_PORT=5674 rabbitmq-server -detached rabbitmqctl -n test_rabbit_3 stop_app rabbitmqctl -n test_rabbit_3 reset rabbitmqctl -n test_rabbit_3 join_cluster test_rabbit_1@localhost rabbitmqctl -n test_rabbit_3 start_app
中止某個指定的節點,好比中止第二個節點
RABBITMQ_NODENAME=test_rabbit_2 rabbitmqctl stop
查看節點三的集羣狀態
rabbitmqctl -n test_rabbit_3 cluster_status