RabbitMQ安裝及基礎

參考文檔:
  1. 【rabbitmq-完整安裝版】安裝https://blog.csdn.net/csdn_azuo/article/details/100552423
  2. php的amqp擴展 安裝(windows) rabbitmq學習篇https://blog.csdn.net/weiwenjuan0923/article/details/79986951
時間軸:
  1. 2019/10/31 安裝centOS7虛擬機
  2. 2019/11/04 linux下安裝rabbitMQ服務
  3. 2019/11/05 win下給安裝php擴展amqp
  4. 2019/11/12 win下php鏈接linus下rabbitMQ及生產者產生消息
  5. 2019/11/13 解決頁面訪問消費者超時問題
  6. 後續以爲仍是看不懂博客舉的例子和文檔,在B站看了下牧碼人入門視頻(基礎概念,實例較水)
 
 
問題:
  1. 2019/11/05 php安裝amqp擴展,Apache涼了, 擴展包下錯了,PHP的architecture
Architecture
x86
  1. 2019/11/12 Windows 下 PHP AMQP 連接錯誤 報錯信息:Fatal error: Uncaught AMQPConnectionException: Socket error: could not connect to host. in E:\PHPTutorial\WWW\index.php:11 Stack trace: #0 E:\PHPTutorial\WWW\index.php(11): AMQPConnection->connect() #1 {main} thrown in E:\PHPTutorial\WWW\index.php on line 11(解決:網頁使用15672端口,客戶端鏈接使用5672端口, 5672端口未開啓)
  1. 2019/11/12 Deprecated: Function AMQPExchange::declare() is deprecated in E:\PHPTutorial\WWW\index.php on line 31 Exchange Status:1(解決:declare() 方法以及棄用,變爲declareExchange())
  2. 2019/11/13 消費者取數據時,響應時間很長,響應出現500錯誤 , 可是消息被領走
    1. 錯誤信息: Internal Server Error
The server encountered an internal error or misconfiguration and was unable to complete your request.
Please contact the server administrator at admin@php.cn to inform them of the time this error occurred, and the actions you performed just before this error.
More information about this error may be available in the server error log.
    1. mq後臺裏隊列有個消費者列表,在裏面的話通常都能消費https://bbs.csdn.net/topics/392410948?list=9168275
    2. 緣由:頁面使用Apache訪問,Apache有響應時間
    3. 解決:請求時經過cmd 進入php文件夾, php.exe 文件路徑 ,來執行消費者文件
    4. 這裏須要注意的是這個方法:$queue->consume,queue對象有兩個方法可用於取消息:consume和get。前者是阻塞的,無消息時會被掛起,適合循環中使用;後者則是非阻塞的,取消息時有則取,無則返回false。就是說用了consume以後,會同步阻塞,該程序常駐內存,不能用nginx,apache調用。 
 
涉及基礎:
1.消息隊列解決哪些問題
a.異步處理 : 提升系統執行速度
b.應用解耦 : 分佈式(eg:超買超賣,解耦下降粘合度)
c.流量削鋒 : 高併發(eg:搶紅包,秒殺MQ進行過濾)
d.日誌處理 : 大數據(eg:統計廣告位數據,異步將數據日誌存入文件->日誌收集 flume單線程 td-agent單線程 logstash可開多進程->消息中間件->進行流計算 實時統計 || 歸檔後離線計算後數據清洗導入mysql後可出報表)
...
2.virtual hosts 管理
a.即vhosts,至關於mysql的db
b.通常以/開頭
c.對用戶進行受權
3.Overview->Ports and contents
協議
監聽端口
備註
http
15672
自身服務器頁面訪問端口
瀏覽器與rabbitMQ
服務器通訊使用http協議
amqp
5672
通訊協議,與rabbitMQ服務器通訊使用的協議
clustering
25672
支持集羣
 
1.簡單隊列
P----Queue----C
a.三個對象: 生產者,隊列,消費者 11對應
b.發佈消息: 連接mq服務->建立通道->聲明隊列->經過channel發佈消息到隊列
c.消費消息: 連接mq服務->建立通道->定義隊列的消費者channel->監聽隊列
d.不足: 耦合性高, 生產者 一一對應消費者; 不能實現多個消費者消費隊列中的消息; 隊列名須要同時變動
 
2.工做隊列
|----C1
P----Queue----|----C2
|----C3
a.模型: 一個生產者對應多個消費者
d.出現緣由: 簡單隊列11對應; 實際開發中,生產者發送消息絕不費力, 而消費者通常是要跟業務相 結合的, 消費者接受到消息後須要處理,可能須要花費時間, 隊列就可能擠壓不少消息
c.生產者: 連接mq->建立通道->聲明隊列->發佈消息->關閉通道
d.消費者: 連接mq->建立通道->通道聲明隊列->定義消費者接收消息
e.消費者1和2處理消息均分,1偶數2奇數數據,這種現象叫 輪詢分發,無論誰忙誰閒都不會多給消息,任務消息老是你一個我一個
 
3.公平分發
|----C1
P----Queue----|----C2
|----C3
a.使用公平分發必須關閉自動應答(ack)改爲 手動反饋
b.生產者: 每一個消費者發送確認消息以前,消息隊列不發送下一個消息到消費者,一次處理一個消息; 限制發送給同一個消費者不得超過一條消息
c.消費者2處理比消費者1多(能者多勞)
 
4.消息應答(autoAck布爾值,Message Acknowledge)
a.消息應答默認是打開的,默認爲false
b. 爲true時,自動確認模式, MQ將消息分發給消費者,該條消息就會將被從內存中消除
這種狀況下, 若是殺死正在執行的消費者,就會丟失正在處理的消息!
c. 爲false時,手動模式,若是有一個消費者掛掉,就會交付給其餘消費者.rabbitMQ支持消息應答,告訴rabbitMQ這個消息已經處理,能夠刪除了,rabbitMQ將刪除內存中的消息, 若是MQ服務器掛了,消息仍會丟失!
 
5.消息持久化(durable布爾型)
a.已定義過的隊列修改durable屬性,是不能夠的;儘管代碼正確,也不會運行成功
b. rabbitMQ不容許對已存在的隊列進行從新定義不一樣的參數信息
 
6.訂閱模式
    |----Queue1----C1
P----x----|
|----Queue2----C2
a.模型解讀:
一個生產者,多個消費者;
每個消費者都有本身的隊列;
生產者沒直接把消息發送至隊列,而是發送到了交換機(轉發器exchange);
每一個隊列都要綁定帶交換機上;
生產者發送的消息通過交換機到達隊列,實現一個消息被多個消費者消費
b.應用場景:
註冊後須要發郵件和短信等操做
商品修改後搜索引擎及前臺都須要變動
c.生產者: 聲明交換機->發佈消息到交換機->關閉通道->關閉鏈接
交換機未綁定隊列時,發送消息到隊列會致使消息丟失,由於交換機沒有存儲的能力, 在rabbitMQ中只有隊列有存儲能力
d.消費者: 鏈接->建立channel->聲明隊列->綁定隊列到交換機->定義消費者
 
7.Exchange(交換機 轉發器)
a.匿名轉發,exchange爲空(""), 上述訂閱模式
b.Fanout(不處理路由鍵): 消息通過交換機,與其交換機綁定的隊列都能收到消息
c.Direct(處理路由鍵): 發送消息時帶一個路由key,若綁定隊列key與路由key相匹配,則將消息轉指相應隊列
 
8.路由模式
type=direct
| |---error---|---Queue1----C1
P----x--------|
|----info---|
|---error---|---Queue2----C2
|-warning-|
a.生產者:連接->建立通道->聲明交換機->發佈消息->關閉通道->關閉鏈接
b.消費者:連接->建立通道->聲明隊列->綁定交換機->定義消費者
 
9.話題模式
type=topic
| |---*.orange.*---|---Queue1----C1
P----x--------|
|----*.*.rabbit---|
|------lazy.#-----|---Queue2----C2
a. 將路由和某個模式匹配,通配符
b.井號匹配一個或多個; 星號匹配一個
c.例如: Goods.# =(Goods.insert/delete)
d.場景: 生產者管理商品(增刪改查)
 
10.消息確認機制(事務+confirm)
a.rabbitMQ中能夠經過持久化數據解決rabbitMQ服務器異常的數據丟失問題
b.背景: 生產者將消息發送出去後,消息到底有沒有到達rabbitMQ服務,默認狀況下是不知道的
c.兩種方式肯定: AMQP實現了事務機制;Confirm模式
 
11.事務機制(AMQP自己協議自帶):
a.txSelect(將當前channel設置成transaction模式);txCommit(用於提交事務);txRollback(回滾事務)
b.實現: 使用try catch拋出異常及事務機制實現確保消息發送至rabbitMQ服務器
c.不足: 協議通訊次數太多,會 下降消息的吞吐量且耗時,每一個信息都要提交一次至關於一次請求
 
12.Confirm模式(事務模式隊列不能重複設置)
a.生產者端confirm模式的實現原理: 生產者將信道設置成confirm模式,一旦信道進入confirm模式,全部在 該信道上發佈的消息都會被指派一個惟一的ID(從1開始),一旦消息被投遞到全部匹配的隊列以後,broker就會發送一個確認給生產者(包含消息的惟一ID),這就使生產者知道消息已經正確到達目的隊列了,若是消息和隊列時可持久化的,那麼確認消息會將消息寫入磁盤以後發出,broker回傳給生產者的確認消息中deliver-tag域包含了確認消息的序列號,此外broker也能夠設置basic.ack的multiple域,表示到這個序列號以前的全部消息已經獲得了處理
b.confirm模式的最大好處在於其是異步的;
c.開啓confirm模式: channel.confirmSelect();
d.編程模式:
普通 發一條 waitForConfirms(),等待服務端確認,串行方法;
批量 發一批 waitForConfirms();
異步 提供一個回調方法
Channel對象提供的ConfirmListener()回調方法只包含deliveryTag(當前Channel發出的消息序號),咱們須要本身爲每一個Channel維護一個unconfirm的消息序號集合,每發佈一條數據,集合中元素加1,每回調一次handleAck方法,unconfirm集合刪掉相應的一條(multiple=false)或多條(multiple=true)記錄.從程序運行效率上看,這個unconfirm集合最好採用有序集合SortedSet存儲結構
生產者:連接->建立通道->聲明隊列->(通道添加監聽) 聲明未確認的消息標識集合->發佈成功後回調handleAck()函數,回執有問題執行handleNack()函數->關閉通道->關閉鏈接
相關文章
相關標籤/搜索