消息隊列通常有三個角色:
隊列服務端
隊列生產者
隊列消費者
消息隊列工做流程就如同一個流水線,有產品加工,一個輸送帶,一個打包產品
輸送帶就是 不停運轉的消息隊列服務端
加工產品的就是 隊列生產者
在傳輸帶結尾打包產品的 就是隊列消費者
RabbitMQ Erlang編寫的消息隊列產品,企業級消息隊列軟件,支持消息負載均衡,數據持久化等。 ZeroMQ saltstack軟件使用此消息,速度最快。 Redis key-value的系統,也支持隊列數據結構,輕量級消息隊列 Kafka 由Scala編寫,目標是爲處理實時數據提供一個統1、高通量、低等待的平臺
1)程序解耦 容許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵照一樣的接口約束。 2)冗餘: 消息隊列把數據進行持久化直到它們已經被徹底處理,經過這一方式規避了數據丟失風險。 許多消息隊列所採用的"插入-獲取-刪除"範式中,在把一個消息從隊列中刪除以前,須要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。 3)峯值處理能力: (大白話,就是原本公司業務只須要5臺機器,可是臨時的秒殺活動,5臺機器確定受不了這個壓力,咱們又不可能將總體服務器架構提高到10臺,那在秒殺活動後,機器不就浪費了嗎?所以引入消息隊列) 在訪問量劇增的狀況下,應用仍然須要繼續發揮做用,可是這樣的突發流量並不常見。 若是爲以能處理這類峯值訪問爲標準來投入資源隨時待命無疑是巨大的浪費。 使用消息隊列可以使關鍵組件頂住突發的訪問壓力,而不會由於突發的超負荷的請求而徹底崩潰。 4)可恢復性: 系統的一部分組件失效時,不會影響到整個系統。 消息隊列下降了進程間的耦合度,因此即便一個處理消息的進程掛掉,加入隊列中的消息仍然能夠在系統恢復後被處理。 5)順序保證: 在大多使用場景下,數據處理的順序都很重要。 大部分消息隊列原本就是排序的,而且能保證數據會按照特定的順序來處理。(Kafka保證一個Partition內的消息的有序性) 6)緩衝: 有助於控制和優化數據流通過系統的速度,解決生產消息和消費消息的處理速度不一致的狀況。 7)異步通訊: 不少時候,用戶不想也不須要當即處理消息。好比發紅包,發短信等流程。 消息隊列提供了異步處理機制,容許用戶把一個消息放入隊列,但並不當即處理它。想向隊列中放入多少消息就放多少,而後在須要的時候再去處理它們。
生活裏的消息隊列,如同郵局的郵箱,
若是沒郵箱的話,
郵件必須找到郵件那我的,遞給他,才玩完成,那這個任務會處理的很麻煩,很慢,效率很低
可是若是有了郵箱,
郵件直接丟給郵箱,用戶只須要去郵箱裏面去找,有沒有郵件,有就拿走,沒有就下次再來,這樣能夠極大的提高郵件收發效率!
rabbitmq是一個消息代理,它接收和轉發消息,能夠理解爲是生活的郵局。
你能夠將郵件放在郵箱裏,你能夠肯定有郵遞員會發送郵件給收件人。
歸納:
rabbitmq是接收,存儲,轉發數據的。
官方教程:http://www.rabbitmq.com/tutorials/tutorial-one-python.html
消息(Message)是指在應用間傳送的數據。消息能夠很是簡單,好比只包含文本字符串,也能夠更復雜,可能包含嵌入對象。html
消息隊列(Message Queue)是一種應用間的通訊方式,消息發送後能夠當即返回,由消息系統來確保消息的可靠傳遞。消息發佈者只管把消息發佈到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而無論是誰發佈的。這樣發佈者和使用者都不用知道對方的存在。前端
想必同窗們都點過外賣,點擊下單後的業務邏輯可能包括:檢查庫存、生成單據、發紅包、短信通知等,若是這些業務同步執行,完成下單率會很是低,如發紅包,短信通知等沒必要要的流程,異步執行便可。python
此時使用MQ,能夠在覈心流程(扣減庫存、生成訂單記錄)等完成後發送消息到MQ,快速結束本次流程。消費者拉取MQ消息時,發現紅包、短信等消息時,再進行處理。web
場景:雙11是購物狂節,用戶下單後,訂單系統須要通知庫存系統,傳統的作法就是訂單系統調用庫存系統的接口正則表達式
這種作法有一個缺點:安全
當庫存系統出現故障時,訂單就會失敗。(這樣馬雲將少賺好多好多錢錢。。。。)服務器
訂單系統和庫存系統高耦合.網絡
引入消息隊列:數據結構
訂單系統:用戶下單後,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。架構
庫存系統:訂閱下單的消息,獲取下單消息,進行庫操做。 就算庫存系統出現故障,消息隊列也能保證消息的可靠投遞,不會致使消息丟失(馬雲這下高興了,鈔票快快的來呀~~).
流量削峯通常在秒殺活動中應用普遍 場景:秒殺活動,通常會由於流量過大,致使應用掛掉,爲了解決這個問題,通常在應用前端加入消息隊列。
做用:
1.能夠控制活動人數,超過此必定閥值的訂單直接丟棄(怪不得我一次秒殺都沒搶到過。。。。。wtf???)
2.能夠緩解短期的高流量壓垮應用(應用程序按本身的最大處理能力獲取訂單)
3.用戶的請求,服務器接收到以後,寫入消息隊列,超過定義的閾值就直接丟棄請求,或者跳轉錯誤頁面。
4.業務系統取出隊列中的消息,再作後續處理。
rabbitmq的安裝使用 1.經過阿里雲的yum源,在epel源中有這個rabbitmq yum install rabbitmq-server erlang -y 2.啓動rabbitmq-server systemctl start rabbitmq-server 若是 rabbitmq-server啓動不了,就改一下 hosts文件,寫入 127.0.0.1 和你本身的主機名 3.開啓後臺管理界面 rabbitmq-plugins enable rabbitmq_management 4.建立rabbitmq的帳號密碼 rabbitmqctl add_user henry 123456 5.設置用戶爲管理員 sudo rabbitmqctl set_user_tags henry administrator 6.設置用戶有權限訪問全部隊列 #語法:
對何種資源具備配置、寫、讀的權限經過正則表達式來匹配,具體命令以下:
rabbitmqctl set_permissions [-p <vhostpath>] <user> <conf> <write> <read> rabbitmqctl set_permissions -p "/" henry ".*" ".*" ".*" 7.重啓rabbitmq服務端,讓用戶生效 systemctl restart rabbitmq-server 8.訪問web管理界面,登陸,查看隊列信息 http://192.168.16.142:15672/#/queues 9.用python操做rabbitmq,實現生產消費者模型 1.安裝pika模塊,模塊版本須要指定,由於代碼參數發生了變化 pip3 install -i https://pypi.douban.com/simple pika==0.13.1
// 新建用戶 rabbitmqctl add_user {用戶名} {密碼} // 設置權限 rabbitmqctl set_user_tags {用戶名} {權限} // 查看用戶列表 rabbitmqctl list_users // 爲用戶受權 添加 Virtual Hosts : rabbitmqctl add_vhost <vhost> // 刪除用戶 rabbitmqctl delete_user Username // 修改用戶的密碼 rabbitmqctl change_password Username Newpassword // 刪除 Virtual Hosts : rabbitmqctl delete_vhost <vhost> // 添加 Users : rabbitmqctl add_user <username> <password> rabbitmqctl set_user_tags <username> <tag> ... rabbitmqctl set_permissions [-p <vhost>] <user> <conf> <write> <read> // 刪除 Users : delete_user <username> // 使用戶user1具備vhost1這個virtual host中全部資源的配置、寫、讀權限以便管理其中的資源 rabbitmqctl set_permissions -p vhost1 user1 '.*' '.*' '.*' // 查看權限 rabbitmqctl list_user_permissions user1 rabbitmqctl list_permissions -p vhost1 // 清除權限 rabbitmqctl clear_permissions [-p VHostPath] User //清空隊列步驟 rabbitmqctl reset 須要提早關閉應用rabbitmqctl stop_app , 而後再清空隊列,啓動應用 rabbitmqctl start_app 此時查看隊列rabbitmqctl list_queues 查看全部的exchange: rabbitmqctl list_exchanges 查看全部的queue: rabbitmqctl list_queues 查看全部的用戶: rabbitmqctl list_users 查看全部的綁定(exchange和queue的綁定信息): rabbitmqctl list_bindings 查看消息確認信息: rabbitmqctl list_queues name messages_ready messages_unacknowledged 查看RabbitMQ狀態,包括版本號等信息:rabbitmqctl status #開啓web界面rabbitmq rabbitmq-plugins enable rabbitmq_management #訪問web界面 http://server-name:15672/
AMQP AMQP協議是一個高級抽象層消息通訊協議,RabbitMQ是AMQP協議的實現。它主要包括如下組件: 1.Server(broker): 接受客戶端鏈接,實現AMQP消息隊列和路由功能的進程。 2.Virtual Host:實際上是一個虛擬概念,相似於權限控制組,一個Virtual Host裏面能夠有若干個Exchange和Queue,可是權限控制的最小粒度是Virtual Host 3.Exchange:接受生產者發送的消息,並根據Binding規則將消息路由給服務器中的隊列。ExchangeType決定了Exchange路由消息的行爲,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三種,不一樣類型的Exchange路由的行爲是不同的。 4.Message Queue:消息隊列,用於存儲還未被消費者消費的消息。 5.Message: 由Header和Body組成,Header是由生產者添加的各類屬性的集合,包括Message是否被持久化、由哪一個Message Queue接受、優先級是多少等。而Body是真正須要傳輸的APP數據。 6.Binding:Binding聯繫了Exchange與Message Queue。Exchange在與多個Message Queue發生Binding後會生成一張路由表,路由表中存儲着Message Queue所需消息的限制條件即Binding Key。當Exchange收到Message時會解析其Header獲得Routing Key,Exchange根據Routing Key與Exchange Type將Message路由到Message Queue。Binding Key由Consumer在Binding Exchange與Message Queue時指定,而Routing Key由Producer發送Message時指定,二者的匹配方式由Exchange Type決定。 7.Connection:鏈接,對於RabbitMQ而言,其實就是一個位於客戶端和Broker之間的TCP鏈接。 8.Channel:信道,僅僅建立了客戶端到Broker之間的鏈接後,客戶端仍是不能發送消息的。須要爲每個Connection建立Channel,AMQP協議規定只有經過Channel才能執行AMQP的命令。一個Connection能夠包含多個Channel。之因此須要Channel,是由於TCP鏈接的創建和釋放都是十分昂貴的,若是一個客戶端每個線程都須要與Broker交互,若是每個線程都創建一個TCP鏈接,暫且不考慮TCP鏈接是否浪費,就算操做系統也沒法承受每秒創建如此多的TCP鏈接。RabbitMQ建議客戶端線程之間不要共用Channel,至少要保證共用Channel的線程發送消息必須是串行的,可是建議儘可能共用Connection。 9.Command:AMQP的命令,客戶端經過Command完成與AMQP服務器的交互來實現自身的邏輯。例如在RabbitMQ中,客戶端能夠經過publish命令發送消息,txSelect開啓一個事務,txCommit提交一個事務。
P 是生產者
C 是消費者
中間hello是消息隊列
能夠有多個P、多個C
P發送消息給hello隊列,C消費者從隊列中獲取消息,默認輪詢方式
生產者send.py
咱們的第一個程序send.py將向隊列發送一條消息。咱們須要作的第一件事是創建與RabbitMQ服務器的鏈接。
#!/usr/bin/env python3 import pika # 建立憑證,使用rabbitmq用戶密碼登陸 # 去郵局取郵件,必須得驗證身份 credentials = pika.PlainCredentials("henry","123456") # 新建鏈接,這裏localhost能夠更換爲服務器ip # 找到這個郵局,等於鏈接上服務器 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.142',credentials=credentials)) # 建立頻道 # 建造一個大郵箱,隸屬於這家郵局的郵箱,就是個鏈接 channel = connection.channel() # 聲明一個隊列,用於接收消息,隊列名字叫「水許傳」 channel.queue_declare(queue='水許傳') # 注意在rabbitmq中,消息想要發送給隊列,必須通過交換(exchange),初學可使用空字符串交換(exchange=''),它容許咱們精確的指定發送給哪一個隊列(routing_key=''),參數body值發送的數據 channel.basic_publish(exchange='', routing_key='水許傳', body='武大郎出攤賣燒餅了') print("已經發送了消息") # 程序退出前,確保刷新網絡緩衝以及消息發送給rabbitmq,須要關閉本次鏈接 connection.close()
能夠同時存在多個接受者,等待接收隊列的消息,默認是輪訓方式分配消息
接受者receive.py,能夠運行屢次,運行多個消費者
import pika # 創建與rabbitmq的鏈接 credentials = pika.PlainCredentials("henry","123456") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.142',credentials=credentials)) channel = connection.channel() channel.queue_declare(queue="水許傳") def callbak(ch,method,properties,body): print("消費者接收到了數據:%r"%body.decode("utf8")) # 有消息來臨,當即執行callbak,沒有消息則夯住,等待消息 # 老百姓開始去郵箱取郵件啦,隊列名字是水許傳 channel.basic_consume(callbak,queue="水許傳",no_ack=True) # 開始消費,接收消息 channel.start_consuming()
默認狀況下,生產者發送數據給隊列,消費者取出消息後,數據將被清除。
特殊狀況,若是消費者處理過程當中,出現錯誤,數據處理沒有完成,那麼這段數據將從隊列丟失
爲了保證消息發送的可靠性,不丟失消息,使消息持久化了。同時爲了防止接收端在處理消息時down掉,只有在消息處理完成後才發送ack消息。
不確認機制也就是說每次消費者接收到數據後,無論是否處理完畢,rabbitmq-server都會把這個消息標記完成,從隊列中刪除
沒有確認機制的消息隊列是數據不安全的
ACK機制用於保證消費者若是拿了隊列的消息,客戶端處理時出錯了,那麼隊列中仍然還存在這個消息,提供下一位消費者繼續取
機制流程:
1.生產者無須變更,發送消息 2.消費者若是no_ack=True啊,數據消費後若是出錯就會丟失 反之no_ack=False,數據消費若是出錯,數據也不會丟失 3.ack機制在消費者代碼中演示
只負責發送數據便可,無須變更
#!/usr/bin/env python3 import pika # 建立憑證,使用rabbitmq用戶密碼登陸 # 去郵局取郵件,必須得驗證身份 credentials = pika.PlainCredentials("henry","123456") # 新建鏈接,這裏localhost能夠更換爲服務器ip # 找到這個郵局,等於鏈接上服務器 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.142',credentials=credentials)) # 建立頻道 # 建造一個大郵箱,隸屬於這家郵局的郵箱,就是個鏈接 channel = connection.channel() # 新建一個hello隊列,用於接收消息 # 這個郵箱能夠收發各個班級的郵件,經過 channel.queue_declare(queue='西遊記') # 注意在rabbitmq中,消息想要發送給隊列,必須通過交換(exchange),初學可使用空字符串交換(exchange=''),它容許咱們精確的指定發送給哪一個隊列(routing_key=''),參數body值發送的數據 channel.basic_publish(exchange='', routing_key='西遊記', body='大師兄,師傅被蔡許坤抓走了') print("已經發送了消息") # 程序退出前,確保刷新網絡緩衝以及消息發送給rabbitmq,須要關閉本次鏈接 connection.close()
給與ack回覆
拿到消息必須給rabbitmq服務端回覆ack信息,不然消息不會被刪除,防止客戶端出錯,數據丟失
import pika credentials = pika.PlainCredentials("selfju","cxk") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.142',credentials=credentials)) channel = connection.channel() # 聲明一個隊列(建立一個隊列) channel.queue_declare(queue='西遊記') def callback(ch, method, properties, body): print("消費者接受到了任務: %r" % body.decode("utf-8")) # int('asdfasdf') 模擬處理消息的時候發生了錯誤 # 我告訴rabbitmq服務端,我已經取走了消息 # 回覆方式在這 ch.basic_ack(delivery_tag=method.delivery_tag) # 關閉no_ack,表明給與服務端ack回覆,確認給與回覆 channel.basic_consume(callback,queue='西遊記',no_ack=False) channel.start_consuming()
演示 1.執行生產者,向隊列寫入數據,產生一個新隊列queue 2.重啓服務端,隊列丟失 3.開啓生產者數據持久化後,重啓rabbitmq,隊列不丟失 4.依舊能夠讀取數據
消息的可靠性是RabbitMQ的一大特點,那麼RabbitMQ是如何保證消息可靠性的呢——消息持久化。 爲了保證RabbitMQ在退出或者crash等異常狀況下數據沒有丟失,須要將queue,exchange和Message都持久化。
生產者:
import pika # 有密碼 credentials = pika.PlainCredentials("selfju","cxk") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.142',credentials=credentials)) channel = connection.channel() # 聲明一個隊列(建立一個隊列) # 默認此隊列不支持持久化,若是服務掛掉,數據丟失 # durable=True 開啓持久化,必須新開啓一個隊列,本來的隊列已經不支持持久化了 ''' 實現rabbitmq持久化條件 delivery_mode=2 使用durable=True聲明queue是持久化 ''' channel.queue_declare(queue='LOL',durable=True) channel.basic_publish(exchange='', routing_key='LOL', # 消息隊列名稱 body='我用雙手成就你的夢想', # 支持數據持久化 properties=pika.BasicProperties( delivery_mode=2,#表明消息是持久的 2 ) ) connection.close()
消費者:
import pika credentials = pika.PlainCredentials("selfju","cxk") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.142',credentials=credentials)) channel = connection.channel() # 確保隊列持久化 channel.queue_declare(queue='LOL',durable=True) ''' 必須確保給與服務端消息回覆,表明我已經消費了數據,不然數據一直持久化,不會消失 ''' def callback(ch, method, properties, body): print("消費者接受到了任務: %r" % body.decode("utf-8")) # 模擬代碼報錯 # int('asdfasdf') # 此處報錯,沒有給予回覆,保證客戶端掛掉,數據不丟失 # 告訴服務端,我已經取走了數據,不然數據一直存在 ch.basic_ack(delivery_tag=method.delivery_tag) # 關閉no_ack,表明給與回覆確認 channel.basic_consume(callback,queue='LOL',no_ack=False) channel.start_consuming()
沒有持久化的消息隊列中的數據會消失;
消息隊列持久化後即便生產者的服務器掛掉,重啓後消息隊列中的數據也不會消失;