交換機(exchange): redis
聲明交換機:數據庫
Name服務器
Durability (消息代理重啓後,交換機是否還存在)網絡
Auto-delete (當全部與之綁定的消息隊列都完成了對此交換機的使用後,刪掉它)app
Arguments(依賴代理自己)異步
交換機狀態: 持久(durable)、暫存(transient) async
交換機類型:編碼
直連交換機(direct exchange): (empty string) and amq.direct/名字綁定到同名的隊列代理
默認交換機(=direct exchange): empty string/名字爲""的直連交換機生命週期
扇型交換機(fanout exchange): amq.fanout/交換機會將消息的拷貝分別發送給這全部的N個隊列
主題交換機(topic exchange): amq.topic/路由鍵和隊列到交換機的綁定模式之間的匹配,將消息路由給一個或多個隊列。
頭交換機(headers exchange): amq.match/headers頭
隊列(queue):
聲明隊列:
Name
Durable(消息代理重啓後,隊列依舊存在)
Exclusive(只被一個鏈接(connection)使用,並且當鏈接關閉後隊列即被刪除)
Auto-delete(當最後一個消費者退訂後即被刪除)
Arguments(一些消息代理用他來完成相似與TTL的某些額外功能)
隊列狀態:
持久化隊列(Durable queues):broker重啓的時候,它依舊存在
暫存隊列(Transient queues):
綁定(Binding): 交換機(exchange)將消息(message)路由給隊列(queue)所需遵循的規則
若是AMQP的消息沒法路由到隊列(例如,發送到的交換機沒有綁定隊列),消息會被就地銷燬或者返還給發佈者。
*消息確認:
confirm模式:
將信道設置成confirm模式(發送方確認模式),則全部在信道上發佈的消息都會被指派一個惟一的ID。
一旦消息被投遞到目的隊列後,或者消息被寫入磁盤後(可持久化的消息),信道會發送一個確認給生產者(包含消息惟一ID)。
若是RabbitMQ發生內部錯誤從而致使消息丟失,會發送一條nack(not acknowledged,未確認)消息。
發送方確認模式是異步的,生產者應用程序在等待確認的同時,能夠繼續發送消息。當確認消息到達生產者應用程序,生產者應用程序的回調方法就會被觸發來處理確認消息。
避免重複:
在消息生產時,MQ內部針對每條生產者發送的消息生成一個inner-msg-id,做爲去重的依據(消息投遞失敗並重傳),避免重複的消息進入隊列;
在消息消費時,要求消息體中必需要有一個bizId(對於同一業務全局惟一,如支付ID、訂單ID、帖子ID等)做爲去重的依據,避免同一條消息被重複消費。
(1)當拒絕某條消息時,應用能夠告訴消息代理如何處理這條消息——銷燬它或者從新放入隊列。
(2)當此隊列只有一個消費者時,請確認不要因爲拒絕消息而且選擇了從新放入隊列的行爲而引發消息在同一個消費者身上無限循環的狀況發生。
消息(最大64b):
Content type(內容類型)
Content encoding(內容編碼)
Routing key(路由鍵)
Delivery mode (persistent or not)
投遞模式(持久化 或 非持久化)
Message priority(消息優先權)
Message publishing timestamp(消息發佈的時間戳)
Expiration period(消息有效期)
Publisher application id(發佈應用的ID)
虛擬主機(virtual hosts):
爲了在一個單獨的代理上實現多個隔離的環境,當鏈接被創建的時候,AMQP客戶端來指定使用哪一個虛擬主機
use API:
Basic:
basic.publish(): 發送消息
basic.ack(): 對一條或多條消息發佈成功與否進行確認
basic.cancel():
basic.consume():
basic.deliver():
basic.get(): 直接訪問隊列
basic.nack(): 被拒絕的是否從新入隊列
basic.qos(): Quality of Service服務質量,告訴RabbitMQ不要同時給一個消費者推送多於N個消息,即一旦有N個消息尚未ack,則該consumer將block掉,直到有消息ack
basic.recover(): 重入隊列,是否發送給同一個消費者
basic.recover-async():
basic.reject(): 被拒絕的是否從新入隊列
basic.return():
Exchange:
exchange.bind(): 此方法爲RabbitMQ特有的AMQP擴展將兩個交換機進行綁定。
*exchange.declare(): 驗證交換機是否存在,不存在則新建,存在則使用。
exchange.delete(): 此方法用於刪除交換機。當一個交換機被刪除後,與其綁定的全部隊列都會被清除。
exchange.unbind(): 此方法爲RabbitMQ特有的AMQP擴展解除兩個交換機之間的綁定關係。
Queue:
queue.bind(): 將隊列綁定到交換機。
*queue.declare(): 聲明隊列,若是隊列不存在建立,存在則使用。
queue.delete(): 刪除隊列。若是服務器設置了死信隊列(dead-letter queue),當隊列被某個刪除時,
任何依存於此隊列的消息都會被髮送到死信隊列中,隊列上的全部消費者都會被清除掉。
queue.purge(): 清空隊列。此方法會將隊列中的全部不處於等待 確認回執(acknowledgment)狀態的消息所有移除。
queue.unbind(): 解除隊列與交換機的綁定。
*生命週期: 這個擴展決定了一條消息從發佈到被服務器丟棄的生存時間。此方法中設置生存時間的參數爲 x-message-ttl。
*過時時間: 隊列能夠在聲明時指定租約時限。租約時限指的是若是隊列一直未被使用,多久以後服務器會將其自動刪除。租約時限由此方法的x-expires參數指定。
Tx(事務):
tx.commit() ➔ commit-ok: 提交事務。
tx.rollback() ➔ rollback-ok: 回滾事務。
tx.select() ➔ select-ok: 選擇標準事務模式。
消息丟失、重複:
發送階段:
(1)事務批量回滾。
(2)開啓消息確認confirm模式,同步可重發固定次數,超過次數手動處理,異步能夠用db記錄發送數據,若爲nack時重發並記錄重發次數。
隊列階段:
防止機器掛掉,將交換機、隊列、消息所有持久化到磁盤。
消費階段:
(1)若消息發送到mq後因爲網絡波動,發送者未收到確認,可能觸發重試機制致使隊列存在消息重複。
(2)消息重複:
a.消息源多條: 採用消息惟一識別碼,db數據庫建惟一主鍵、使用redis原子操做、db記錄消息處理狀態。
b.消費時,已將消息消費將來得及返回ack時出現異常,改成手動ack,try異常後查詢數據庫訂單狀態是否改變。