消息隊列(MQ)

消息隊列主要解決問題python

主要解決應用耦合,異步處理,流量削鋒等問題服務器

消息隊列應用場景併發

1.應用耦合:多應用間經過消息隊列對同一消息進行處理,避免調用接口失敗致使整個過程失敗異步

2.異步處理:多應用對消息隊列中同一消息進行處理,應用間併發處理消息, 相比串行處理,減小處理時間函數

3.限流削鋒:普遍應用於秒殺或搶購活動中,避免流量過大致使應用系統掛 掉的狀況spa

4.消息驅動系統:系統分爲消息隊列、消息生產者、消息消費者,生產者 負責產生消息,消費者(可能有多個)負責對消息進行處理code

經常使用的消息隊列cdn

kafka、RabbitMQ、RocketMQ、ActiveMQ、ZeroMQ、MetaMQ,Redis也可實現隊列功能中間件

消息隊列的兩種模式blog

1.點對點模式:一個生產者對應一個消費者,消息一旦被消費就不會存在於隊列中;生產者和消費者彼此不依賴;接收者在成功接收消息以後需向隊列應答成功,以便消息隊列刪除當前接收的消息

2.發佈/訂閱模式:發佈者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者

特色:1.每一個消息能夠有多個訂閱者2.發佈者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須建立一個訂閱者以後,才能消費發佈者的消息3.爲了消費消息,訂閱者須要提早訂閱該角色主題,並保持在線運行

python中使用RabbitMQ

1.生產者

#-*- coding:utf-8 -*-
import pika
import sys
username = 'admin'   #指定遠程rabbitmq的用戶名密碼
pwd = 'admin'
user_pwd = pika.PlainCredentials(username, pwd)
s_conn = pika.BlockingConnection(pika.ConnectionParameters('10.5.190.22', credentials=user_pwd))  #建立鏈接
chan = s_conn.channel()  #在鏈接上建立一個頻道


chan.queue_declare(queue='uploadFileQueue',durable=True)   #聲明一個隊列,生產者和消費者都要聲明一個相同的隊列,用來防止萬一某一方掛了,另外一方能正常運行
chan.queue_bind(exchange='uploadExchange',     # 綁定隊列
                queue="uploadFileQueue",
                routing_key="upload_key")

chan.basic_publish(exchange='uploadExchange',     #交換機 #uploadExchange upload_key
                   routing_key='upload_key',   #路由鍵,寫明將消息發往哪一個隊列,本例是將消息發往隊列hello
                   body = 'hello'
                    
print("[生產者] send 'hello world")

# s_conn.close()#當生產者發送完消息後,可選擇關閉鏈接
複製代碼

2.消費者

#-*- coding:utf-8 -*-

import pika
username = 'admin'#指定遠程rabbitmq的用戶名密碼
pwd = 'admin'
user_pwd = pika.PlainCredentials(username, pwd)
s_conn = pika.BlockingConnection(pika.ConnectionParameters('10.5.190.22', credentials=user_pwd))#建立鏈接
chan = s_conn.channel()#在鏈接上建立一個頻道

chan.queue_declare(queue='uploadFileCallBackQueue',durable=True) #聲明一個隊列,生產者和消費者都要聲明一個相同的隊列,用來防止萬一某一方掛了,另外一方能正常運行
chan.queue_bind(exchange='uploadCallBackExchange',
                queue="uploadFileCallBackQueue",
                routing_key="upload_callback_key")

def callback(ch,method,properties,body): # 定義一個回調函數,用來接收生產者發送的消息
    print(type(body))
    print(body)
    print(body.decode("utf-8"))
    print("[消費者] recv %s" % body)

chan.basic_consume(callback,      # 調用回調函數,從隊列裏取消息
                   queue='uploadFileCallBackQueue', # 指定取消息的隊列名
                   no_ack=True)   # 取完一條消息後,不給生產者發送確認消息,默認是False的,即 默認給rabbitmq發送一個收到消息的確認,通常默認便可
print('[消費者] waiting for msg .')
chan.start_consuming()            # 開始循環取消息
複製代碼

備註:當消費的消息進行處理以後,須要將處理後的消息發送到對應的隊列中,能夠在消費者處理消息完成時候調用生產者的函數進行發送消息

RabbitMQ的瞭解

基於AMQP協議,支持多種語言,用來實現系統與系統之間,程序與程序之間進行通訊的中間件,總體來看是一個異步的過程,由生產者(Publish)來生產消息,這個消息會被先放到一個容器中,當知足必定條件時,這個消息會被消費者(Subscribe )拿走去消費。這個容器就是隊列。生產者和消費者之間遵照的協議就是AMQP協議。其次還能夠對消費者設置一個優先級(Priority),以及對消費者的請求進行限流,對負載進行有效均衡。

AMQP的核心是Producer(消息生產者)、Broker(消息隊列的服務器實體)、Consumer(消息消費者)

Producer/Consumer概念比較好理解,無非就是一個生產者建立一個信息去由消費者去進行相關的邏輯處理。

roker消息隊列的服務器,一個Broker能夠包含多個VirtualHost(虛擬主機),主要起到了一個隔離的做用。 而一個VirtualHost又包括如下三部分

Exchange(交換機):由它按照某些規則 去決定消息最終路由到哪一個隊列。

Binding:綁定,它的做用就是把 Exchange 和 Queue 按照路由規則綁定起來。若是沒有bind,消息會直接被丟掉。

Queue:存儲消息的地方,每一個消息都會被投入到一個或多個隊列。。

相關文章
相關標籤/搜索