參考文檔:
時間軸:
- 2019/10/31 安裝centOS7虛擬機
- 2019/11/04 linux下安裝rabbitMQ服務
- 2019/11/05 win下給安裝php擴展amqp
- 2019/11/12 win下php鏈接linus下rabbitMQ及生產者產生消息
- 2019/11/13 解決頁面訪問消費者超時問題
- 後續以爲仍是看不懂博客舉的例子和文檔,在B站看了下牧碼人入門視頻(基礎概念,實例較水)
問題:
- 2019/11/05 php安裝amqp擴展,Apache涼了, 擴展包下錯了,PHP的architecture
- 2019/11/12 Windows 下 PHP AMQP 連接錯誤 報錯信息:Fatal error: Uncaught AMQPConnectionException: Socket error: could not connect to host. in E:\PHPTutorial\WWW\index.php:11 Stack trace: #0 E:\PHPTutorial\WWW\index.php(11): AMQPConnection->connect() #1 {main} thrown in E:\PHPTutorial\WWW\index.php on line 11(解決:網頁使用15672端口,客戶端鏈接使用5672端口, 5672端口未開啓)
- 2019/11/12 Deprecated: Function AMQPExchange::declare() is deprecated in E:\PHPTutorial\WWW\index.php on line 31 Exchange Status:1(解決:declare() 方法以及棄用,變爲declareExchange())
- 2019/11/13 消費者取數據時,響應時間很長,響應出現500錯誤 , 可是消息被領走
- 錯誤信息: Internal Server Error
The server encountered an internal error or misconfiguration and was unable to complete your request.
Please contact the server administrator at admin@php.cn to inform them of the time this error occurred, and the actions you performed just before this error.
More information about this error may be available in the server error log.
- mq後臺裏隊列有個消費者列表,在裏面的話通常都能消費https://bbs.csdn.net/topics/392410948?list=9168275
- 緣由:頁面使用Apache訪問,Apache有響應時間
- 解決:請求時經過cmd 進入php文件夾, php.exe 文件路徑 ,來執行消費者文件
- 這裏須要注意的是這個方法:$queue->consume,queue對象有兩個方法可用於取消息:consume和get。前者是阻塞的,無消息時會被掛起,適合循環中使用;後者則是非阻塞的,取消息時有則取,無則返回false。就是說用了consume以後,會同步阻塞,該程序常駐內存,不能用nginx,apache調用。
涉及基礎:
1.消息隊列解決哪些問題
a.異步處理 : 提升系統執行速度
b.應用解耦 : 分佈式(eg:超買超賣,解耦下降粘合度)
c.流量削鋒 : 高併發(eg:搶紅包,秒殺MQ進行過濾)
d.日誌處理 : 大數據(eg:統計廣告位數據,異步將數據日誌存入文件->日誌收集 flume單線程 td-agent單線程 logstash可開多進程->消息中間件->進行流計算 實時統計 || 歸檔後離線計算後數據清洗導入mysql後可出報表)
...
2.virtual hosts 管理
a.即vhosts,至關於mysql的db
b.通常以/開頭
c.對用戶進行受權
3.Overview->Ports and contents
協議
|
監聽端口
|
備註
|
http
|
15672
|
自身服務器頁面訪問端口
瀏覽器與rabbitMQ
服務器通訊使用http協議
|
amqp
|
5672
|
通訊協議,與rabbitMQ服務器通訊使用的協議
|
clustering
|
25672
|
支持集羣
|
1.簡單隊列
P----Queue----C
a.三個對象: 生產者,隊列,消費者 11對應
b.發佈消息: 連接mq服務->建立通道->聲明隊列->經過channel發佈消息到隊列
c.消費消息: 連接mq服務->建立通道->定義隊列的消費者channel->監聽隊列
d.不足: 耦合性高, 生產者
一一對應消費者; 不能實現多個消費者消費隊列中的消息; 隊列名須要同時變動
2.工做隊列
|----C1
P----Queue----|----C2
|----C3
a.模型: 一個生產者對應多個消費者
d.出現緣由: 簡單隊列11對應; 實際開發中,生產者發送消息絕不費力, 而消費者通常是要跟業務相 結合的, 消費者接受到消息後須要處理,可能須要花費時間, 隊列就可能擠壓不少消息
c.生產者: 連接mq->建立通道->聲明隊列->發佈消息->關閉通道
d.消費者: 連接mq->建立通道->通道聲明隊列->定義消費者接收消息
e.消費者1和2處理消息均分,1偶數2奇數數據,這種現象叫
輪詢分發,無論誰忙誰閒都不會多給消息,任務消息老是你一個我一個
3.公平分發
|----C1
P----Queue----|----C2
|----C3
a.使用公平分發必須關閉自動應答(ack)改爲
手動反饋
b.生產者: 每一個消費者發送確認消息以前,消息隊列不發送下一個消息到消費者,一次處理一個消息; 限制發送給同一個消費者不得超過一條消息
c.消費者2處理比消費者1多(能者多勞)
4.消息應答(autoAck布爾值,Message Acknowledge)
a.消息應答默認是打開的,默認爲false
b. 爲true時,自動確認模式, MQ將消息分發給消費者,該條消息就會將被從內存中消除
這種狀況下,
若是殺死正在執行的消費者,就會丟失正在處理的消息!
c. 爲false時,手動模式,若是有一個消費者掛掉,就會交付給其餘消費者.rabbitMQ支持消息應答,告訴rabbitMQ這個消息已經處理,能夠刪除了,rabbitMQ將刪除內存中的消息,
若是MQ服務器掛了,消息仍會丟失!
5.消息持久化(durable布爾型)
a.已定義過的隊列修改durable屬性,是不能夠的;儘管代碼正確,也不會運行成功
b.
rabbitMQ不容許對已存在的隊列進行從新定義不一樣的參數信息
6.訂閱模式
|----Queue1----C1
P----x----|
|----Queue2----C2
a.模型解讀:
一個生產者,多個消費者;
每個消費者都有本身的隊列;
生產者沒直接把消息發送至隊列,而是發送到了交換機(轉發器exchange);
每一個隊列都要綁定帶交換機上;
生產者發送的消息通過交換機到達隊列,實現一個消息被多個消費者消費
b.應用場景:
註冊後須要發郵件和短信等操做
商品修改後搜索引擎及前臺都須要變動
c.生產者: 聲明交換機->發佈消息到交換機->關閉通道->關閉鏈接
交換機未綁定隊列時,發送消息到隊列會致使消息丟失,由於交換機沒有存儲的能力, 在rabbitMQ中只有隊列有存儲能力
d.消費者: 鏈接->建立channel->聲明隊列->綁定隊列到交換機->定義消費者
7.Exchange(交換機 轉發器)
a.匿名轉發,exchange爲空(""), 上述訂閱模式
b.Fanout(不處理路由鍵): 消息通過交換機,與其交換機綁定的隊列都能收到消息
c.Direct(處理路由鍵): 發送消息時帶一個路由key,若綁定隊列key與路由key相匹配,則將消息轉指相應隊列
8.路由模式
type=direct
| |---error---|---Queue1----C1
P----x--------|
|----info---|
|---error---|---Queue2----C2
|-warning-|
a.生產者:連接->建立通道->聲明交換機->發佈消息->關閉通道->關閉鏈接
b.消費者:連接->建立通道->聲明隊列->綁定交換機->定義消費者
9.話題模式
type=topic
| |---*.orange.*---|---Queue1----C1
P----x--------|
|----*.*.rabbit---|
|------lazy.#-----|---Queue2----C2
a.
將路由和某個模式匹配,通配符
b.井號匹配一個或多個; 星號匹配一個
c.例如: Goods.# =(Goods.insert/delete)
d.場景: 生產者管理商品(增刪改查)
10.消息確認機制(事務+confirm)
a.rabbitMQ中能夠經過持久化數據解決rabbitMQ服務器異常的數據丟失問題
b.背景:
生產者將消息發送出去後,消息到底有沒有到達rabbitMQ服務,默認狀況下是不知道的
c.兩種方式肯定: AMQP實現了事務機制;Confirm模式
11.事務機制(AMQP自己協議自帶):
a.txSelect(將當前channel設置成transaction模式);txCommit(用於提交事務);txRollback(回滾事務)
b.實現: 使用try catch拋出異常及事務機制實現確保消息發送至rabbitMQ服務器
c.不足: 協議通訊次數太多,會
下降消息的吞吐量且耗時,每一個信息都要提交一次至關於一次請求
12.Confirm模式(事務模式隊列不能重複設置)
a.生產者端confirm模式的實現原理: 生產者將信道設置成confirm模式,一旦信道進入confirm模式,全部在
該信道上發佈的消息都會被指派一個惟一的ID(從1開始),一旦消息被投遞到全部匹配的隊列以後,broker就會發送一個確認給生產者(包含消息的惟一ID),這就使生產者知道消息已經正確到達目的隊列了,若是消息和隊列時可持久化的,那麼確認消息會將消息寫入磁盤以後發出,broker回傳給生產者的確認消息中deliver-tag域包含了確認消息的序列號,此外broker也能夠設置basic.ack的multiple域,表示到這個序列號以前的全部消息已經獲得了處理
b.confirm模式的最大好處在於其是異步的;
c.開啓confirm模式: channel.confirmSelect();
d.編程模式:
普通 發一條 waitForConfirms(),等待服務端確認,串行方法;
批量 發一批 waitForConfirms();
異步 提供一個回調方法
Channel對象提供的ConfirmListener()回調方法只包含deliveryTag(當前Channel發出的消息序號),咱們須要本身爲每一個Channel維護一個unconfirm的消息序號集合,每發佈一條數據,集合中元素加1,每回調一次handleAck方法,unconfirm集合刪掉相應的一條(multiple=false)或多條(multiple=true)記錄.從程序運行效率上看,這個unconfirm集合最好採用有序集合SortedSet存儲結構
生產者:連接->建立通道->聲明隊列->(通道添加監聽)
聲明未確認的消息標識集合->發佈成功後回調handleAck()函數,回執有問題執行handleNack()函數->關閉通道->關閉鏈接