AMQP(Advanced Message Queuing Protocol, 高級消息隊列協議)是一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端/中間件不一樣產品,不一樣的開發語言等條件的限制。緩存
RabbitMQ是一個實現了AMQP協議標準的開源消息代理和隊列服務器。安全
一、基本概念服務器
在服務器中,三個主要功能模塊鏈接成一個處理鏈完成預期的功能:網絡
1)「exchange」接收發布應用程序發送的消息,並根據必定的規則將這些消息路由到「消息隊列」。多線程
2)「message queue」存儲消息,直到這些消息被消費者安全處理完爲止。app
3)「binding」定義了exchange和message queue之間的關聯,提供路由規則。函數
使用這個模型咱們能夠很容易的模擬出存儲轉發隊列和主題訂閱這些典型的消息中間件概念。ui
上圖中各個模塊的說明以下:spa
Broker: 接收和分發消息的應用,RabbitMQ Server就是Message Broker。操作系統
Virtual host: 出於多租戶和安全因素設計的,把AMQP的基本組件劃分到一個虛擬的分組中,相似於網絡中的namespace概念。當多個不一樣的用戶使用同一個RabbitMQ server提供的服務時,能夠劃分出多個vhost,每一個用戶在本身的vhost建立exchange、queue等。
Connection: publisher、consumer和broker之間的TCP鏈接。斷開鏈接的操做只會在client端進行,Broker不會斷開鏈接,除非出現網絡故障或broker服務出現問題。
Channel: 若是每一次訪問RabbitMQ都創建一個Connection,在消息量大的時候創建TCP Connection的開銷將是巨大的,效率也較低。Channel是在connection內部創建的邏輯鏈接,若是應用程序支持多線程,一般每一個thread建立單獨的channel進行通信,AMQP method包含了channel id幫助客戶端和message broker識別channel,因此channel之間是徹底隔離的。Channel做爲輕量級的Connection極大減小了操做系統創建TCP connection的開銷。
Exchange: message到達broker的第一站,根據分發規則,匹配查詢表中的routing key,分發消息到queue中去。經常使用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
Queue: 消息最終被送到這裏等待consumer取走。一個message能夠被同時拷貝到多個queue中。
Binding: exchange和queue之間的虛擬鏈接,binding中能夠包含routing key。Binding信息被保存到exchange中的查詢表中,用於message的分發依據。
二、核心概念
1)Exchange和Binding
交換機Exchange拿到一個消息以後會將它路由給隊列。Exchange使用哪一種方式路由是由Binding規則決定的。
a)直連交換機
根據消息攜帶的路由鍵(routing key)將消息投遞給對應隊列。直連交換機用來處理消息的單播路由。
Message中的「routing key」若是和Binding中的「binding key」一致, Direct exchange則將message發到對應的queue中。
b)主題交換機
經過對消息的路由鍵和隊列到交換機的綁定模式之間的匹配,將消息路由給一個或多個隊列。主題交換機用來實現消息的多播路由。
c)扇形交換機
將消息路由給綁定到它身上的全部隊列,且不理會路由鍵。扇形交換機用來處理消息的廣播路由。
2)ACK - 消息確認
默認狀況下,若是Message 已經被某個Consumer正確的接收到了,那麼該Message就會被從queue中移除。固然也可讓同一個Message發送到不少的Consumer。
若是一個queue沒被任何的Consumer Subscribe(訂閱),那麼,若是這個queue有數據到達,那麼這個數據會被cache,不會被丟棄。當有Consumer時,這個數據會被當即發送到這個Consumer,這個數據被Consumer正確收到時,這個數據就被從queue中刪除。
那麼什麼是正確收到呢?經過ack。每一個Message都要被acknowledged(確認,ack)。咱們能夠顯示的在程序中去ack,也能夠自動的ack。若是有數據沒有被ack,那麼:
RabbitMQ Server會把這個信息發送到下一個Consumer。
若是這個app有bug,忘記了ack,那麼RabbitMQ Server不會再發送數據給它,由於Server認爲這個Consumer處理能力有限。
並且ack的機制能夠起到限流的做用(Benefitto throttling):在Consumer處理完成數據後發送ack,甚至在額外的延時後發送ack,將有效的balance Consumer的load。
固然對於實際的例子,好比咱們可能會對某些數據進行merge,好比merge 4s內的數據,而後sleep 4s後再獲取數據。特別是在監聽系統的state,咱們不但願全部的state實時的傳遞上去,而是但願有必定的延時。這樣能夠減小某些IO,並且終端用戶也不會感受到。
3)建立隊列
Consumer和Procuder均可以經過 queue.declare 建立queue。對於某個Channel來講,Consumer不能declare一個queue,卻訂閱其餘的queue。固然也能夠建立私有的queue。這樣只有app自己纔可使用這個queue。queue也能夠自動刪除,被標爲auto-delete的queue在最後一個Consumer unsubscribe後就會被自動刪除。那麼若是是建立一個已經存在的queue呢?那麼不會有任何的影響。須要注意的是沒有任何的影響,也就是說第二次建立若是參數和第一次不同,那麼該操做雖然成功,可是queue的屬性並不會被修改。
那麼誰應該負責建立這個queue呢?是Consumer,仍是Producer?
若是queue不存在,固然Consumer不會獲得任何的Message。可是若是queue不存在,那麼Producer Publish的Message會被丟棄。因此,仍是爲了數據不丟失,Consumer和Producer都try to create the queue!反正無論怎麼樣,這個接口都不會出問題。
queue對load balance的處理是完美的。對於多個Consumer來講,RabbitMQ 使用循環的方式(round-robin)的方式均衡的發送給不一樣的Consumer。
三、RabbitMQ
send.py
# -*- coding:utf-8 -*- import pika from constant import rabbit_config as config from constant import app_name # 權限驗證 credentials = pika.PlainCredentials( config.get('username'), config.get('password') ) # 連接參數 # virtual_host, 在多租戶系統中隔離exchange, queue params = pika.ConnectionParameters( host=config.get('host'), port=config.get('port'), virtual_host=app_name, credentials=credentials ) # 創建連接 connection = pika.BlockingConnection(parameters=params) # 從連接中得到信道 channel = connection.channel() # 聲明交換機 channel.exchange_declare( exchange='exchangeA', exchange_type='direct', passive=False, durable=True, auto_delete=False ) # consumer建立隊列, 若是沒有就建立 # 隊列一旦被建立, 再進行的重複建立會簡單的失效, 因此建議在producer和consumer同時建立隊列, 避免隊列建立失敗 # 建立隊列回調函數, callback. # auto_delete=True, 若是queue失去了最後一個subscriber會自動刪除, 隊列中的message也會失效. # 默認auto_delete=False, 沒有subscriber的隊列會cache message, subscriber出現後將緩存的message發送. channel.queue_declare(queue='standard', auto_delete=True) # delivery_mode=2表示讓消息持久化, 重啓RabbitMQ也不丟失. # 考慮成本, 開啓此功能, 建議把消息存儲到SSD上. props = pika.BasicProperties(content_type='text/plain', delivery_mode=2) # 發佈消息到exchange channel.basic_publish( exchange='exchangeA', routing_key='a_routing_key', body='Hello World!', properties=props ) print(" [x] Sent 'Hello World!'") # 關閉連接 connection.close()
receive.py
# -*- coding:utf-8 -*- import pika from constant import rabbit_config as config from constant import app_name # 權限驗證 credentials = pika.PlainCredentials( config.get('username'), config.get('password') ) # 連接參數 # virtual_host, 在多租戶系統中隔離exchange, queue params = pika.ConnectionParameters( host=config.get('host'), port=config.get('port'), virtual_host=app_name, credentials=credentials ) # 創建連接 connection = pika.BlockingConnection(parameters=params) # 從連接中得到信道 channel = connection.channel() # 聲明交換機, 直連方式, 後面將會建立binding將exchange和queue綁定在一塊兒 channel.exchange_declare( exchange='exchangeA', exchange_type='direct', passive=False, durable=True, auto_delete=False, ) # consumer建立隊列, 若是沒有就建立 # 隊列一旦被建立, 再進行的重複建立會簡單的失效, 因此建議在producer和consumer同時建立隊列, 避免隊列建立失敗 # 建立隊列回調函數, callback. # auto_delete=True, 若是queue失去了最後一個subscriber會自動刪除, 隊列中的message也會失效. # 默認auto_delete=False, 沒有subscriber的隊列會cache message, subscriber出現後將緩存的message發送. channel.queue_declare(queue='standard', auto_delete=True) # 經過binding將隊列queue和交換機exchange綁定 channel.queue_bind( queue='standard', exchange='exchangeA', routing_key='a_routing_key' ) # 處理接收到的消息的回調函數 # method_frame攜帶了投遞標記, header_frame表示AMQP信息頭的對象 def callback(channel, method_frame, header_frame, body): channel.basic_ack(delivery_tag=method_frame.delivery_tag) print(" [x] Received %r" % body) # 訂閱隊列, 咱們設置了不進行ACK, 而把ACK交給了回調函數來完成 channel.basic_consume( callback, queue='standard', no_ack=True, ) try: print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() # 關閉連接 connection.close()