消息隊列工做流程:html
消息隊列通常有三個角色: 隊列服務端 隊列生產者 隊列消費者 - 消息隊列工做流程就如同一個流水線,有產品加工,一個輸送帶,一個打包產品 - 輸送帶就是 不停運轉的消息隊列服務端 - 加工產品的就是 隊列生產者 - 在傳輸帶結尾打包產品的 就是隊列消費者
隊列產品前端
RabbitMQ Erlang編寫的消息隊列產品,企業級消息隊列軟件,支持消息負載均衡,數據持久化等。 ZeroMQ saltstack軟件使用此消息,速度最快。 Redis key-value的系統,也支持隊列數據結構,輕量級消息隊列 Kafka 由Scala編寫,目標是爲處理實時數據提供一個統1、高通量、低等待的平臺
1)程序解耦 容許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵照一樣的接口約束。 2)冗餘: 消息隊列把數據進行持久化直到它們已經被徹底處理,經過這一方式規避了數據丟失風險。 許多消息隊列所採用的"插入-獲取-刪除"範式中,在把一個消息從隊列中刪除以前,須要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。 3)峯值處理能力: (大白話,就是原本公司業務只須要5臺機器,可是臨時的秒殺活動,5臺機器確定受不了這個壓力,咱們又不可能將總體服務器架構提高到10臺,那在秒殺活動後,機器不就浪費了嗎?所以引入消息隊列) 在訪問量劇增的狀況下,應用仍然須要繼續發揮做用,可是這樣的突發流量並不常見。 若是爲以能處理這類峯值訪問爲標準來投入資源隨時待命無疑是巨大的浪費。 使用消息隊列可以使關鍵組件頂住突發的訪問壓力,而不會由於突發的超負荷的請求而徹底崩潰。 4)可恢復性: 系統的一部分組件失效時,不會影響到整個系統。 消息隊列下降了進程間的耦合度,因此即便一個處理消息的進程掛掉,加入隊列中的消息仍然能夠在系統恢復後被處理。 5)順序保證: 在大多使用場景下,數據處理的順序都很重要。 大部分消息隊列原本就是排序的,而且能保證數據會按照特定的順序來處理。(Kafka保證一個Partition內的消息的有序性) 6)緩衝: 有助於控制和優化數據流通過系統的速度,解決生產消息和消費消息的處理速度不一致的狀況。 7)異步通訊: 不少時候,用戶不想也不須要當即處理消息。好比發紅包,發短信等流程。 消息隊列提供了異步處理機制,容許用戶把一個消息放入隊列,但並不當即處理它。想向隊列中放入多少消息就放多少,而後在須要的時候再去處理它們。
場景說明:用戶註冊後,須要發註冊郵件和註冊短信。傳統的作法有兩種 1.串行的方式;2.並行方式python
訂單系統:用戶下單後,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功 庫存系統:訂閱web
下單的消息,採用拉/推的方式,獲取下單信息,庫存系統根據下單信息,進行庫存操做 假如:在下單時庫存系統不數據庫
能正常使用。也不影響正常下單,由於下單後,訂單系統寫入消息隊列就再也不關心其餘的後續操做了。實現訂單系統安全
與庫存系統的應用解耦服務器
流量削鋒也是消息隊列中的經常使用場景,通常在秒殺或團搶活動中使用普遍。 應用場景:秒殺活動,通常會由於流量網絡
過大,致使流量暴增,應用掛掉。爲解決這個問題,通常須要在應用前端加入消息隊列。 a、能夠控制活動的人數數據結構
b、能夠緩解短期內高流量壓垮應用架構
用戶的請求,服務器接收後,首先寫入消息隊列。假如消息隊列長度超過最大數量,則直接拋棄用戶請求或跳轉到錯
誤頁面。 秒殺業務根據消息隊列中的請求信息,再作後續處理
點對點通信
聊天時通信
rabbitMQ安裝
[root@xujunk ~]#yum install erlang [root@xujunk ~]#yum install rabbitmq-server -y
啓動rabbitmq-server
[root@xujunk ~]#systemctl start rabbitmq-server
配置rabbitmq建立管理用戶以及後臺管理頁面
[root@xujunk ~]#rabbitmqctl add_user xjk 123
給新用戶設置管理員角色
[root@xujunk ~]#rabbitmqctl set_user_tags xjk adminstarator
給當前用戶,設置權限:能夠對全部的隊列,進行可讀可寫操做
#語法:set_permissions [-p <vhostpath>] <user> <conf> <write> <read> [root@xujunk ~]#rabbitmqctl set_permissions -p "/" xjk ".*" ".*" ".*"
添加rabbtimq管理界面
[root@xujunk ~]#rabbitmq-plugins enable rabbitmq_management
web端訪問:
http://192.168.58.131:15672/
登錄rabbitmq服務器輸入帳號密碼
在一個文件夾建立2個py文件,一個消費者,一個表明生產者
import pika # 創建與rabbitmq的鏈接 credentials = pika.PlainCredentials("xjk","123") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.58.131',credentials=credentials)) channel = connection.channel() channel.queue_declare(queue="水續傳") def callbak(ch,method,properties,body): print("消費者接收到了任務:%r"%body.decode("utf8")) # 有消息來臨,當即執行callbak,沒有消息則夯住,等待消息 # 老百姓開始去郵箱取郵件啦,隊列名字是水許傳 channel.basic_consume("水續傳",callbak) # 開始消費,接收消息 channel.start_consuming() #!!注意:因rabbitmq版本不一樣,channel.basic_consume的參數位置會有所變化,報錯「got multiple values for keyword argument 'queue'」須要根據源碼調整參數位置。
#!/usr/bin/env python import pika # 建立憑證,使用rabbitmq用戶密碼登陸 # 去郵局取郵件,必須得驗證身份 credentials = pika.PlainCredentials("xjk","123") # 新建鏈接,這裏localhost能夠更換爲服務器ip # 找到這個郵局,等於鏈接上服務器 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.58.131',credentials=credentials)) # 建立頻道 # 建造一個大郵箱,隸屬於這家郵局的郵箱,就是個鏈接 channel = connection.channel() # 聲明一個隊列,用於接收消息,隊列名字叫「水許傳」 channel.queue_declare(queue='水續傳') # 注意在rabbitmq中,消息想要發送給隊列,必須通過交換(exchange),初學可使用空字符串交換 (exchange=''),它容許咱們精確的指定發送給哪一個隊列(routing_key=''),參數body值發>送的數據 channel.basic_publish(exchange='', routing_key='水續傳', body='大郎 起來喝藥了') print("已經發送了消息") # 程序退出前,確保刷新網絡緩衝以及消息發送給rabbitmq,須要關閉本次鏈接 connection.close()
ACK機制:
ACK機制用於保證消費者若是拿了隊列的消息,客戶端
處理時出錯了,那麼隊列中仍然還存在這個消息,提供下一位消費者繼續取
官網資料:http://www.rabbitmq.com/tutorials/tutorial-two-python.html
生產者pro_ack.py
#!/usr/bin/env python import pika # 建立憑證,使用rabbitmq用戶密碼登陸 # 去郵局取郵件,必須得驗證身份 credentials = pika.PlainCredentials("xjk","123") # 新建鏈接,這裏localhost能夠更換爲服務器ip # 找到這個郵局,等於鏈接上服務器 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.58.131',credentials=credentials)) # 建立頻道 # 建造一個大郵箱,隸屬於這家郵局的郵箱,就是個鏈接 channel = connection.channel() # 新建一個hello隊列,用於接收消息 # 這個郵箱能夠收發各個班級的郵件,經過 channel.queue_declare(queue='西遊記') # 注意在rabbitmq中,消息想要發送給隊列,必須通過交換(exchange),初學可使用空字 符串交換(exchange=''),它容許咱們精確的指定發送給哪一個隊列(routing_key=''),參數body值發送的數據 channel.basic_publish(exchange='', routing_key='西遊記', body='大師兄,師傅被妖怪抓走了') print("已經發送了消息") # 程序退出前,確保刷新網絡緩衝以及消息發送給rabbitmq,須要關閉本次鏈接 connection.close()
消費者代碼cus_ack
import pika credentials = pika.PlainCredentials("xjk","123") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.58.131',credentials=credentials)) channel = connection.channel() # 聲明一個隊列(建立一個隊列) channel.queue_declare(queue='西遊記') def callback(ch, method, properties, body): print("消費者接受到了任務: %r" % body.decode("utf-8")) #int('asdfasdf') # 我告訴rabbitmq服務端,我已經取走了消息 # 回覆方式在這,告訴服務端,我正確消費了消息,你能夠標記 清除了 ch.basic_ack(delivery_tag=method.delivery_tag) # 關閉no_ack,表明給與服務端ack回覆,確認給與回覆 channel.basic_consume(on_message_callback=callback,queue='西>遊記',auto_ack=False) channel.start_consuming()
向隊列push:python3 pro_ack.py
從隊列pull:python3 cus_ack.py
注意:reids版本不一樣:
#若是用此方法報錯: channel.basic_consume(callback,queue='西遊記',no_ack=False) #改爲: channel.basic_consume(on_message_callback=callback,queue='西>遊記',auto_ack=False) #緣由版本的問題
顯示效果:
當另外一終端執行:[root@xujunk cs]#python3 cus_ack.py
消息的可靠性是RabbitMQ的一大特點,那麼RabbitMQ是如何保證消息可靠性的呢——消息持久化。 爲了保證RabbitMQ在退出或者crash等異常狀況下數據沒有丟失,須要將queue,exchange和Message都持久化。
生產者:Persist_pro.py
import pika # 有密碼 channel = connection.channel() # 聲明一個隊列(建立一個隊列) # 默認此隊列不支持持久化,若是服務掛掉,數據丟失 # durable=True 開啓持久化,必須新開啓一個隊列,本來的隊列已經不支持持久化了 ''' 實現rabbitmq持久化條件 delivery_mode=2 使用durable=True聲明queue是持久化 ''' channel.queue_declare(queue='python',durable=True) #這 裏實現隊列建立的時候,就是持久化的 channel.basic_publish(exchange='', routing_key='python', # 消息隊列名稱 body='life is short,i use python ', # 支持數據持>久化 properties=pika.BasicProperties( delivery_mode=2,#表明消息是持久的 2 ) ) connection.close()
消費者 :Persist_cus.py
import pika credentials = pika.PlainCredentials("xjk","123") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.58.131',credentials=credentials)) channel = connection.channel() # 確保隊列持久化 channel.queue_declare(queue='python',durable=True) ''' 必須確保給與服務端消息回覆,表明我已經消費了數據,不然數據 一>直持久化,不會消失 ''' def callback(ch, method, properties, body): print("成功取出了消息 >>: %r" % body.decode("utf-8")) # 模擬代碼報錯 # int('asdfasdf') # 此處報錯,沒有給予回覆,保>證客戶端掛掉,數據不丟失 # 告訴服務端,我已經取走了數據,不然數據一直存在 ch.basic_ack(delivery_tag=method.delivery_tag) # 關閉no_ack,表明給與回覆確認 channel.basic_consume(on_message_callback=callback,queue='python',auto_ack=False) channel.start_consuming()
顯示效果以下,D
表示數據持久化,這樣不管重啓服務器,隊列不會丟失
前面的效果都是一對一發,若是作一個廣播效果可不能夠,這時候就要用到exchange了 。exchange必須精確的知道
收到的消息要發給誰。exchange的類型決定了怎麼處理, 類型有如下幾種:
fanout:exchange將消息發送給和該exchange鏈接的全部queue;也就是所謂的廣播模式;此模式下忽略
routing_key
driect:經過routingKey和exchange決定的那個惟一的queue能夠接收消息,只有routing_key爲「black」時纔將
其發送到隊列queue_name;
topic: 全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息
須要queue和exchange綁定,由於消費者不是和exchange直連的,消費者是連在queue上,queue綁定在exchange 上,消費者只會在queue裏讀取消息
發送端:fanout_send.py
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() # 注意:這裏是廣播,不須要聲明queue channel.exchange_declare(exchange='logs',exchange_type='fanout') # 聲明廣播管道 # message = ' '.join(sys.argv[1:]) or "info: Hello World!" message = "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) # 注意此處空,必須有 print(" [x] Sent %r" % message) connection.close()
接收端:fanout_recv.py
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs',exchange_type='fanout') #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 result = channel.queue_declare('',exclusive=True) # 獲取隨機的queue名字 queue_name = result.method.queue print("random queuename:", queue_name) # queue綁定到轉發器上 channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(on_message_callback=callback, queue=queue_name, auto_ack=True) channel.start_consuming()
路由模式,經過routing_key將消息發送給對應的queue; 以下面這句便可設置exchange爲direct模式,只有 routing_key爲「black」時纔將其發送到隊列queue_name;
channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key='black')
在上圖中,Q1和Q2能夠綁定同一個key,如綁定routing_key=‘KeySame’,那麼收到routing_key爲KeySame的消息
時將會同時發送給Q1和Q2,退化爲廣播模式;
發送端:direct_send.py
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs',exchange_type='direct')#重要程度級別,這裏默認定義爲 info severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
接收端:direct_recv.py
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs',exchange_type='direct') result = channel.queue_declare('',exclusive=True) queue_name = result.method.queue # 獲取運行腳本全部的參數 severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) # 循環列表去綁定 for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(on_message_callback=callback, queue=queue_name, auto_ack=True) channel.start_consuming()
效果演示:
以前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵
字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。
消費者:key_cus1.py
import pika credentials = pika.PlainCredentials("xjk","123") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.58.131',credentials=credentials)) channel = connection.channel() # exchange='m1',exchange(祕>書)的名稱 # exchange_type='fanout' , 祕書工做方式將消息發送 給全部的隊列 channel.exchange_declare(exchange='m2',exchange_type='direct') # 隨機生成一個隊列,隊列退出時,刪除這個隊列 result = channel.queue_declare('cus1',exclusive=True) queue_name = result.method.queue#讓exchange和queque進行綁定 ,只要 channel.queue_bind(exchange='m2',queue=queue_name,routing_key='bmw') channel.queue_bind(exchange='m2',queue=queue_name,routing_key='benz') def callback(ch, method, properties, body): print("消費者接受到了任務: %r" % body) channel.basic_consume(on_message_callback=callback,queue=queue_name,auto_ack=True) channel.start_consuming()
消費者:key_cus2.py
import pika credentials = pika.PlainCredentials("xjk","123") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.58.131',credentials=credentials)) channel = connection.channel() # exchange='m1',exchange(祕>書)的名稱 # exchange_type='fanout' , 祕書工做方式將消息發送 給全部的隊列 channel.exchange_declare(exchange='m2',exchange_type='direct') # 隨機生成一個隊列,隊列退出時,刪除這個隊列 result = channel.queue_declare('cus2',exclusive=True) queue_name = result.method.queue#讓exchange和queque進行綁定 ,只要 channel.queue_bind(exchange='m2',queue=queue_name,routing_key='bmw') def callback(ch, method, properties, body): print("消費者接受到了任務: %r" % body) channel.basic_consume(on_message_callback=callback,queue=queue_name,auto_ack=True) channel.start_consuming()
生產者:key_pub.py
import pika credentials = pika.PlainCredentials("xjk","123") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.58.131',credentials=credentials)) channel = connection.channel() # 路由模式的交換機會發送給綁定的 key和routing_key匹配的隊列 channel.exchange_declare(exchange='m2',exchange_type='direct') # 發送消息,給有關benz的路由關鍵字 channel.basic_publish(exchange='m2', routing_key='benz', body='benz is good car') connection.close()
#新建用戶 [root@xujunk cs]#rabbitmqctl add_user 用戶名 密碼 #設置權限 [root@xujunk cs]#rabbitmqctl set_user_tags {用戶名} {權限} #查看用戶列表 [root@xujunk cs]#rabbitmqctl list_users #爲用戶受權 添加 Virtual Hosts : rabbitmqctl add_vhost <vhost> #修改用戶密碼 rabbitmqctl change_password 用戶名 密碼 [root@xujunk cs]#rabbitmqctl change_password xm 1234 #刪除 Users : rabbitmqctl delete_user 用戶名 [root@xujunk cs]#rabbitmqctl delete_user xm #使用戶user1具備vhost1這個virtual host中全部資源的配置、寫、讀權限以便管理其中的資源 rabbitmqctl set_permissions -p vhost1 user1 '.*' '.*' '.*' [root@xujunk ~]#rabbitmqctl set_permissions -p "/" xjk ".*" ".*" ".*" #查看權限 rabbitmqctl list_user_permissions user1 [root@xujunk cs]#rabbitmqctl list_user_permissions xjk #查看隊列 [root@xujunk cs]#rabbitmqctl list_queues #清空隊列步驟: 1.關閉應用:rabbitmqctl stop_app 2.從新啓動:rabbitmqctl reset 3.此時查看隊列:rabbitmqctl list_queues 查看全部的exchange:rabbitmqctl list_exchanges 查看全部的queue: rabbitmqctl list_queues 查看全部的用戶:rabbitmqctl list_users 查看全部的綁定(exchange和queue的綁定信息):rabbitmqctl list_bindings
AMQP AMQP協議是一個高級抽象層消息通訊協議,RabbitMQ是AMQP協議的實現。它主要包括如下組件: 1.Server(broker): 接受客戶端鏈接,實現AMQP消息隊列和路由功能的進程。 2.Virtual Host:實際上是一個虛擬概念,相似於權限控制組,一個Virtual Host裏面能夠有若干個Exchange和Queue,可是權限控制的最小粒度是Virtual Host 3.Exchange:接受生產者發送的消息,並根據Binding規則將消息路由給服務器中的隊列。ExchangeType決定了Exchange路由消息的行爲,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三種,不一樣類型的Exchange路由的行爲是不同的。 4.Message Queue:消息隊列,用於存儲還未被消費者消費的消息。 5.Message: 由Header和Body組成,Header是由生產者添加的各類屬性的集合,包括Message是否被持久化、由哪一個Message Queue接受、優先級是多少等。而Body是真正須要傳輸的APP數據。 6.Binding:Binding聯繫了Exchange與Message Queue。Exchange在與多個Message Queue發生Binding後會生成一張路由表,路由表中存儲着Message Queue所需消息的限制條件即Binding Key。當Exchange收到Message時會解析其Header獲得Routing Key,Exchange根據Routing Key與Exchange Type將Message路由到Message Queue。Binding Key由Consumer在Binding Exchange與Message Queue時指定,而Routing Key由Producer發送Message時指定,二者的匹配方式由Exchange Type決定。 7.Connection:鏈接,對於RabbitMQ而言,其實就是一個位於客戶端和Broker之間的TCP鏈接。 8.Channel:信道,僅僅建立了客戶端到Broker之間的鏈接後,客戶端仍是不能發送消息的。須要爲每個Connection建立Channel,AMQP協議規定只有經過Channel才能執行AMQP的命令。一個Connection能夠包含多個Channel。之因此須要Channel,是由於TCP鏈接的創建和釋放都是十分昂貴的,若是一個客戶端每個線程都須要與Broker交互,若是每個線程都創建一個TCP鏈接,暫且不考慮TCP鏈接是否浪費,就算操做系統也沒法承受每秒創建如此多的TCP鏈接。RabbitMQ建議客戶端線程之間不要共用Channel,至少要保證共用Channel的線程發送消息必須是串行的,可是建議儘可能共用Connection。 9.Command:AMQP的命令,客戶端經過Command完成與AMQP服務器的交互來實現自身的邏輯。例如在RabbitMQ中,客戶端能夠經過publish命令發送消息,txSelect開啓一個事務,txCommit提交一個事務。