RabbitMQ---消息隊列

1、什麼是MQ?

MQ全稱爲Message Queue 消息隊列(MQ)是一種應用程序對應用程序的通訊方法。linux

MQ是消費-生產者模型的一個典型的表明,一端往消息隊列中不斷寫入消息,而另外一端則能夠讀取隊列中的消息。這樣發佈者和使用者都不用知道對方的存在。git

隊列

概念:先進先出的一種數據結構。(聯想到棧,棧是先進後出的數據結構)github

消息隊列

消息隊列能夠簡單理解爲:把要傳輸的數據放在隊列中。windows

2、消息隊列是用來幹什麼的?

消息隊列是一種中間件,它是分佈式系統中重要的組件,主要解決應用解耦,異步消息,流量削峯,實現高性能,高可用,可伸縮和最終一致性架構。centos

 外賣系統消息推送服務器

 解耦:系統與系統之間直接對接會存在耦合性太高的狀況,致使代碼在維護的時候會消耗大量的時間和精力。數據結構

異步消息:消費者這從消息隊列中獲取消息後,互不干擾,各自拿到須要的數據後,各作各的。架構

流量削峯:在接收請求的系統以前建立一個消息隊列,每次請求過來都先通過消息隊列,這樣極大的保護了系統,免受請求過多而宕機的危險異步

 

 3、RabbitMQ

RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。

rabbitMQ是一款基於AMQP協議的消息中間件,它可以在應用之間提供可靠的消息傳輸。在易用性,擴展性,高可用性上表現優秀。使用消息中間件利於應用之間的解耦,生產者(客戶端)無需知道消費者(服務端)的存在。並且兩端可使用不一樣的語言編寫,大大提供了靈活性。 分佈式

安裝:內含erlang環境包,以及RabbitMQ軟件包 windows版

連接:https://pan.baidu.com/s/1RvC0V8t-HIjqGlDTY6SiCg
提取碼:np9h

windows安裝rabbitmq 直接下載erlang安裝包,和rabbitmq-server安裝包 1.下載erlang的安裝包 http://erlang.org/download/otp_win64_22.2.exe 2.下載rabbitmq-server服務端安裝包 https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.2/rabbitmq-server-3.8.2.exe 3.先安裝erlang,再安裝rabiitmq 4.配置環境變量,加載erlang和rabbitmq 在admin用戶的PATH中,追加erlang和rabbitmq的可執行命令目錄,路徑便可 ############路徑按照本身的來
C:\Program Files\erl10.6\bin; C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.2\sbin 5.從新開啓cmd命令行,從新加載PATH值 6.開啓rabbitmq後臺管理頁面,測試命令是否能夠用 rabbitmq-plugins enable rabbitmq_management 7.此時能夠訪問本地的後臺管理界面 http://127.0.0.1:15672/

8.建立rabbitmq用戶,用於鏈接服務端,建立消息隊列 rabbitmqctl add_user admin admin #建立用戶,密碼 
rabbitmqctl set_user_tags admin administrator            #設置用戶管理員權限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"        #設置用戶容許訪問全部的隊列

9.重啓rabbitmq服務端,用yuanhao用戶登陸 在win中找到服務 -> 找到rabbitmq -> 重啓此服務 10.用你的帳號密碼登陸rabbitmq-server服務端 linux安裝rabbitmq centos系列 yum install rabbitmq-server erlang  -y
RabbitMQ安裝步驟

4、RabbitMQ工做模式

1. 簡單模式

 生產者

### 生產者
import pika # 鏈接RabbitMQ服務端
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 實例化一個對象
channel = connection.channel() # 申明一個隊列
channel.queue_declare(queue='hello') # 將數據放入隊列
channel.basic_publish(exchange='',         #空字符串表示默認爲簡單模式
                      routing_key='hello', #指定隊列名稱
                      body='Hello World!') #放入的數據

print(" [x] Sent 'Hello World!'")

  消費者

### 消費者
import pika # 鏈接RabbitMQ服務端
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # 申明一個隊列
channel.queue_declare(queue='hello') # 定義回調函數,當消費者從消息隊列中拿到數據時,會將數據傳給回調函數,並執行。
def callback(ch, method, properties, body): """ 回調函數 """
    print(" [x] Received %r" % body) # 從消息隊列中獲取數據(注意pika版本,這裏pika的版本是1.11.0,前面版本的參數會有不一樣)
channel.basic_consume(queue='hello', #指定從哪一個消息隊列中拿
                      auto_ack=True, # 默認(自動)應答,消息隊列收到消費者的迴應後會銷燬數據記錄。(no_ack=True)
                      on_message_callback=callback) # 設置回調函數 (callback)


print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() # 啓動監聽隊列(相似於線程的啓動p.start()) 

注意:爲何要在生產者和消費者中都申明同一個隊列?由於生產環境中咱們並不能肯定是哪一個程序先啓動的,若是隻在生產者中申明瞭隊列,而消費者中沒有,那麼當消費者先啓動是會由於找不到對應的隊列而報錯,致使程序終止。

 因此爲了防止意外,二者都進行申明,而且當隊列存在時,再次申明不會報錯也不會作任何操做。

2.參數

應答參數:auto_ack

 做用:應答參數應用於消費者程序中,是告訴消息隊列,我已經取到了這個數據,當消息隊列獲得這個迴應後,它會銷燬這條數據的記錄。

   手動應答的應用場景:當消費者程序意外終止時,消息隊列還會保留該條數據,消費者程序再次啓動時,又能夠拿到數據。

  若是爲自動應答,那麼消費者程序會在拿到數據的第一時間給隊列發消息告訴隊列拿到了數據,隊列也會銷燬該數據記錄,當回調函數執行過程時,程序發生意外終止時,數據就會丟失。

  若是爲手動應答,那麼消費者程序會在回調函數執行 ch.basic_ack(delivery_tag=method.delivery_tag)語句時給隊列發送迴應,隊列纔會銷燬該數據記錄,當回調函數執行過程時,程序發生意外終止時,迴應語句並不會執行,消息隊列也就收不到迴應,那麼數據記錄也不會在第一時間銷燬,當消費者程序再次啓動時又能夠拿到這條數據。

   默認(自動)應答:

auto_ack=True

代碼:

channel.basic_consume(queue='hello', #指定從哪一個消息隊列中拿
                      auto_ack=True, # 默認(自動)應答 
                      on_message_callback=callback) # 設置回調函數

 手動應答:

auto_ack=False ch.basic_ack(delivery_tag=method.delivery_tag)
代碼:
# 定義回調函數,當消費者從消息隊列中拿到數據時,會將數據傳給回調函數,並執行。
def callback(ch, method, properties, body): """ 回調函數 """
    print(" [x] Received %r" % body)  ch.basic_ack(delivery_tag=method.delivery_tag)  # 手動應答

# 從消息隊列中獲取數據(注意pika版本,這裏pika的版本是1.11.0,前面版本的參數會有不一樣)
channel.basic_consume(queue='hello', #指定從哪一個消息隊列中拿
                      auto_ack=False, # 手動應答
                      on_message_callback=callback) # 設置回調函數

持久化參數

做用:將消息隊列中的數據寫入磁盤。

爲何使用持久化存儲?消息隊列中的數據通常都存儲在內存中,而當服務器意外斷電時,內存中的數據就會丟失,因此爲了保護數據,開啓持久化存儲。

生產者代碼:

......
#
聲明queue channel.queue_declare(queue='hello2', durable=True) # 若聲明過,則換一個名字 channel.basic_publish(exchange='', routing_key='hello2', body='Hello World!', properties=pika.BasicProperties(delivery_mode=2,) # 注意:在生產者程序中須要加這個參數,消費者模式中不須要,由於這個方法中沒有這個關鍵字參數 )
......

消費者代碼:

# 申明一個隊列
channel.queue_declare(queue='hello4',durable=True) ...... # 從消息隊列中獲取數據(注意pika版本,這裏pika的版本是1.11.0,前面版本的參數會有不一樣)
channel.basic_consume(queue='hello4',  # 指定從哪一個消息隊列中拿
                      auto_ack=False,  # 默認(自動)應答,消息隊列收到消費者的迴應後會銷燬數據記錄。(no_ack=True)
                      on_message_callback=callback)  # 設置回調函數 (callback)
 ......

分發參數

有兩個消費者同時監聽一個的隊列。其中一個線程sleep2秒,另外一個消費者線程sleep1秒,可是處理的消息是同樣多。這種方式叫輪詢分發(round-robin)無論誰忙,都不會多給消息,老是你一個我一個。想要作到公平分發(fair dispatch),必須關閉自動應答ack,改爲手動應答。使用basicQos(perfetch=1)限制每次只發送不超過1條消息到同一個消費者,消費者必須手動反饋告知隊列,纔會發送下一個。

# 設置在消費者程序中

# 聲明一個隊列
..... # 將分發模式調整爲公平模式(誰先執行完,給誰發送數據)
channel.basic_qos(prefetch_count=1) # 回調函數
.......

3. 完整的簡單模式代碼

 生產者:

### 生產者
import pika # 鏈接RabbitMQ服務端
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 實例化一個對象
channel = connection.channel() # 申明一個隊列
channel.queue_declare(queue='hello4', durable=True)  # durable 開啓持久化存儲

# 將數據放入隊列
channel.basic_publish(exchange='',  # 空字符串表示默認爲簡單模式
                      routing_key='hello4',  # 指定隊列名稱
                      body='Hello World!',  # 放入的數據
                      properties=pika.BasicProperties(delivery_mode=2,) ) print(" [x] Sent 'Hello World!'")

消費者代碼:

### 消費者
import pika # 鏈接RabbitMQ服務端
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # 申明一個隊列
channel.queue_declare(queue='hello4',durable=True) # 將分發模式調整爲公平模式(誰先執行完,給誰發送數據)
channel.basic_qos(prefetch_count=1) # 定義回調函數,當消費者從消息隊列中拿到數據時,會將數據傳給回調函數,並執行。
def callback(ch, method, properties, body): """ 回調函數 """
    print(" [x] Received %r" % body) # 手動應答,當
 ch.basic_ack(delivery_tag=method.delivery_tag) # 從消息隊列中獲取數據(注意pika版本,這裏pika的版本是1.11.0,前面版本的參數會有不一樣)
channel.basic_consume(queue='hello4',  # 指定從哪一個消息隊列中拿
                      auto_ack=False,  # 默認(自動)應答,消息隊列收到消費者的迴應後會銷燬數據記錄。(no_ack=True)
                      on_message_callback=callback ) # 設置回調函數 (callback)

print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() # 啓動監聽隊列(相似於線程的啓動p.start())

 3. 交換機模式

原理圖:

 

 

 一、發佈訂閱

  發佈訂閱和簡單的消息隊列區別:發佈訂閱會將消息(複製不少份)發送給全部的訂閱者,而消息隊列中的數據被消費一次便消失。因此,RabbitMQ實現發佈和訂閱時,會爲每個訂閱者建立一個隊列,而發佈者發佈消息時,會將消息放置在全部相關隊列中。

生產者代碼:

# 生產者
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() 
# 申明交換機的名字和類型,類型的名字是固定的 channel.exchange_declare(exchange
='logs', exchange_type='fanout') message = "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()

消費者代碼:

# 消費者
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs',exchange_type='fanout') 
# 聲明(隨機名稱)消息隊列,由於每一個消費者都有本身的消息隊列,因此名字隨意,但不能重複 result
= channel.queue_declare("",exclusive=True) queue_name = result.method.queue
# 綁定交換機 channel.queue_bind(exchange
='logs',queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=callback) channel.start_consuming()

二、交換機之關鍵字

生產者:與發佈訂閱的不一樣在於生產者發送數據給交換機時,會賦予這條數據關鍵字,而交換機在分配數據時會根據消費者要接受的的數據的關鍵字去分配,這樣消費者能夠拿到本身想要的數據。

代碼:

# 生產者
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs2', exchange_type='direct')  # 關鍵字模式
 message = "info: Hello Yuan!" channel.basic_publish(exchange='logs2', routing_key='info', # 關鍵字模式
                      body=message) print(" [x] Sent %r" % message) connection.close()

三、交換機之通配符

# 表明多個單詞

* 表明一個單詞

與關鍵字的區別:生產者發送數據給交換機時,會賦予這條數據關鍵字,而交換機在分配數據時會根據消費者要接受的的數據的可以匹配到的關鍵字去分配,這樣消費者也能夠拿到本身想要的數據。

生產者代碼:

# 生產者
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs3', exchange_type='topic') message = "info: Hello ERU!" channel.basic_publish(exchange='logs3', routing_key='europe.weather', # 關鍵字 body=message) print(" [x] Sent %r" % message) connection.close()

消費者代碼:

# 消費者

import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs3', exchange_type='topic') result = channel.queue_declare("",exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs3', queue=queue_name, routing_key="#.news") print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=callback) channel.start_consuming()
相關文章
相關標籤/搜索