本系列咱們會以設計分佈式延遲隊列時重點考慮的模塊爲主線,穿插灌輸一些消息隊列的特性實現方法,經過分析Dyno-queues 分佈式延遲隊列的源碼來具體看看設計實現一個分佈式延遲隊列的方方面面。html
Dyno-queues 是 Netflix 實現的基於 Dynomite 和 Redis 構建的隊列。java
Dynomite是一種通用的實現,能夠與許多不一樣的 key-value 存儲引擎一塊兒使用。目前它提供了對Redis序列化協議(RESP)和Memcached寫協議的支持。linux
具體設計目標依據業務系統不一樣而不一樣。git
Dyno-queues 的業務背景是:在 Netflix 的平臺上運行着許多的業務流程,這些流程的任務是經過異步編排進行驅動,如今要實現一個分佈式延遲隊列,這個延遲隊列具備以下特色:github
Netflix 選擇 Dynomite,是由於:redis
Netflix選擇Redis做爲構建隊列的存儲引擎是由於:數據庫
查詢模型:基於Key-Value模型,而不是SQL,即關係模型。存儲對象比較小。apache
ACID屬性:傳統的關係數據庫中,用ACID(A原子性、C一致性、I 隔離性、D持久性)來保證事務,在保證ACID的前提下每每有不好的可用性。Dynamo用弱一致性C來達到高可用,不提供數據隔離 I,只容許單Key更新。編程
其實全部的高可用,是能夠依賴於RPC和存儲的高可用來實現的。json
Netflix 選擇 Dynomite,是由於:
因此 Dyno-queues 的高可用就自動解決了。
怎麼保證冪等呢?最簡單的方式莫過於共享存儲。broker多機器共享一個DB或者一個分佈式文件/kv系統,則處理消息天然是冪等的。就算有單點故障,其餘節點能夠馬上頂上。
對於不共享存儲的隊列,如Kafka使用分區加主備模式,就略微麻煩一些。須要保證每個分區內的高可用性,也就是每個分區至少要有一個主備且須要作數據的同步。
Dynomite 使用 redis 集羣這個共享存儲 作了冪等保證。
消息到達服務端後,若是不通過任何處理就到接收者,broker就失去了它的意義。爲了知足咱們錯峯/流控/最終可達等一系列需求,把消息存儲下來,而後選擇時機投遞就顯得是瓜熟蒂落的了。
這個存儲能夠作成不少方式。好比存儲在內存裏,存儲在分佈式KV裏,存儲在磁盤裏,存儲在數據庫裏等等。但歸結起來,主要有持久化和非持久化兩種。
持久化的形式能更大程度地保證消息的可靠性(如斷電等不可抗外力),而且理論上能承載更大限度的消息堆積(外存的空間遠大於內存)。
但並非每種消息都須要持久化存儲。不少消息對於投遞性能的要求大於可靠性的要求,且數量極大(如日誌)。這時候,消息不落地直接暫存內存,嘗試幾回failover,最終投遞出去也何嘗不可。
Dynomite 使用 redis 集羣這個共享存儲 在必定程度上緩解了消息堆積問題。
咱們來看看若是須要數據落地的狀況下各類存儲子系統的選擇。理論上,從速度來看,文件系統 > 分佈式KV(持久化)> 分佈式文件系統 > 數據庫,而可靠性卻截然相反。仍是要從支持的業務場景出發做出最合理的選擇。
若是大家的消息隊列是用來支持支付/交易等對可靠性要求很是高,但對性能和量的要求沒有這麼高,並且沒有時間精力專門作文件存儲系統的研究,DB是最好的選擇。
可是DB受制於IOPS,若是要求單broker 5位數以上的QPS性能,基於文件的存儲是比較好的解決方案。總體上能夠採用數據文件 + 索引文件的方式處理。
分佈式KV(如MongoDB,HBase)等,或者持久化的Redis,因爲其編程接口較友好,性能也比較可觀,若是在可靠性要求不是那麼高的場景,也不失爲一個不錯的選擇。
由於 場景是 可靠性要求不那麼高,因此 Dynomite 使用 redis 集羣這個存儲子系統 也是能夠的。
下一個重要的事情就是解析發送接收關係,進行正確的消息投遞了。拋開現象看本質,發送接收關係無外乎是單播與廣播的區別。所謂單播,就是點到點;而廣播,是一點對多點。
通常比較通用的設計是支持組間廣播,不一樣的組註冊不一樣的訂閱。組內的不一樣機器,若是註冊一個相同的ID,則單播;若是註冊不一樣的ID(如IP地址+端口),則廣播。
至於廣播關係的維護,通常因爲消息隊列自己都是集羣,因此都維護在公共存儲上,如 config server、zookeeper等。維護廣播關係所要作的事情基本是一致的:
本文後續會介紹如何維護髮送關係。
數據分片的邏輯既能夠實如今客戶端,也能夠實如今 Proxy
層,取決於你的架構如何設計。
傳統的數據庫中間件大多將分片邏輯實如今客戶端,經過改寫物理 SQL
訪問不一樣的 MySQL
庫;而在 NewSQL
數據庫倡導的計算存儲分離架構中,一般將分片邏輯實如今計算層,即 Proxy
層,經過無狀態的計算節點轉發用戶請求到正確的存儲節點。
在 Dynomite 之中,隊列根據可用區域進行分片,將數據推送到隊列時,經過輪訓機制肯定分片,這種機制能夠確保全部分片的數據是平衡的,每一個分片都表明Redis中的有序集合,有序集中的 key 是 queueName 和 AVAILABILITY _ZONE 的組合。
public class RoundRobinStrategy implements ShardingStrategy { private final AtomicInteger nextShardIndex = new AtomicInteger(0); /** * Get shard based on round robin strategy. * @param allShards */ @Override public String getNextShard(List<String> allShards, Message message) { int index = nextShardIndex.incrementAndGet(); if (index >= allShards.size()) { nextShardIndex.set(0); index = 0; } return allShards.get(index); } }
Dyno-queues 隊列是在 Dynomite 的JAVA客戶端 Dyno 之上創建的,Dyno 爲持久鏈接提供鏈接池,而且能夠配置爲拓撲感知。關於 Dyno 具體能夠參見前文:
源碼分析] Dynomite 分佈式存儲引擎 之 DynoJedisClient(1)
源碼分析] Dynomite 分佈式存儲引擎 之 DynoJedisClient(2)
Dyno爲應用程序提供特定的本地機架(在AWS中,機架是一個區域,例如 us-east-1a、us-east-1b等),us-east-1a的客戶端將鏈接到相同區域的Dynomite/Redis節點,除非該節點不可用,在這種狀況下該客戶端將進行故障轉移。這個屬性被用於經過區域劃分隊列。
隊列根據可用區域進行分片,將數據推送到隊列時,經過輪訓機制肯定分片,這種機制能夠確保全部分片的數據是平衡的,每一個分片都表明Redis中的有序集合,有序集中的key是queueName和AVAILABILITY _ZONE的組合。
具體機制舉例以下:
public class RoundRobinStrategy implements ShardingStrategy { private final AtomicInteger nextShardIndex = new AtomicInteger(0); /** * Get shard based on round robin strategy. * @param allShards */ @Override public String getNextShard(List<String> allShards, Message message) { int index = nextShardIndex.incrementAndGet(); if (index >= allShards.size()) { nextShardIndex.set(0); index = 0; } return allShards.get(index); } }
在分佈式系統中有個CAP理論,對於P(分區容忍性)而言,是實際存在 從而沒法避免的。由於分佈系統中的處理不是在本機,而是網絡中的許多機器相互通訊,故網絡分區、網絡通訊故障問題沒法避免。所以,只能儘可能地在C 和 A 之間尋求平衡。
對於數據存儲而言,爲了提升可用性(Availability),採用了副本備份,好比對於HDFS,默認每塊數據存三份。某數據塊所在的機器宕機了,就去該數據塊副本所在的機器上讀取(從這能夠看出,數據分佈方式是按「數據塊」爲單位分佈的)
可是問題來了,當須要修改數據時,就須要更新全部的副本數據,這樣才能保證數據的一致性(Consistency)。所以,就須要在 C(Consistency) 和 A(Availability) 之間權衡。
而Quorum機制,就是這樣的一種權衡機制,一種將「讀寫轉化」的模型。
顯然,咱們更想要作到強一致性的這種效果,那麼有哪些方式能夠實現呢,其中最爲簡單直接的就是 WARO,也就是Write All Read one。
WARO 是一種簡單的副本控制協議,當 Client 請求向某副本寫數據時(更新數據),只有當全部的副本都更新成功以後,此次寫操做纔算成功,不然視爲失敗。這樣的話,只須要讀任何一個副本上的數據便可。可是WARO帶來的影響是寫服務的可用性較低,由於只要有一個副本更新失敗,這次寫操做就視爲失敗了。
Quorum 的定義以下:假設有 N 個副本,更新操做 wi 在 W 個副本中更新成功以後,則認爲這次更新操做 wi 成功,把此次成功提交的更新操做對應的數據叫作:「成功提交的數據」。對於讀操做而言,至少須要讀 R 個副本,其中,W+R>N ,即 W 和 R 有重疊,通常,W+R=N+1。
Quorum機制認爲每次寫入的機器數目達到大多數(W)時,就認爲本次寫操做成功了。即Quorum機制可以不須要更新徹底部的數據,但又保證返回給用戶的是有效數據的解決方案。
咱們以 ES 爲例。
咱們在發送任何一個增刪改操做的時候,均可以帶上一個consistency參數,指明咱們想要的寫一致性是什麼。
quorum = int((primary shard+number_of_replicas)/2)+1
若是節點數少於quorum,可能致使querum不齊全,進而致使沒法執行任何寫操做。quorum不齊全時,會進行等待。默認等待時間爲1分鐘,期待活躍的shard數量能夠增長,最後實在不行,就會timeout。
Dynomite 可以將最終一致性(eventual consistency)擴展爲協調一致性(tunable consistency)。
關於QUORUM,Dynomite有以下配置:
由測試獲得的結果,Dynomite能從3,6,12,24一路擴展到48個節點,在DC_ONE和DC_QUORUM模式下,吞吐率都能線性地增加。與此同時,Dynomite在延遲方面只增長了不多的開支,即使在DC_QUORUM模式下,(延遲)也只有幾毫秒。DC_QUORUM模式在延遲和吞吐量方面處於劣勢,可是能爲客戶提供更好的讀寫保證。
對於Dyno-queues來講,則是在實現中有所體現。好比在 RedisQueues 中,有以下成員變量:
private final JedisCommands quorumConn; private final JedisCommands nonQuorumConn;
在構建 RedisQueues 時,就須要註明使用哪種。
而從註釋中咱們可知,
@param quorumConn
Dyno connection with dc_quorum enabled,就是 採用了Quorum的Redis;@param nonQuorumConn
Dyno connection to local Redis,就是本地Redis;生成 RedisQueues 的代碼以下(注意其中註釋):
/** * @param quorumConn Dyno connection with dc_quorum enabled * @param nonQuorumConn Dyno connection to local Redis */ public RedisQueues(JedisCommands quorumConn, JedisCommands nonQuorumConn, String redisKeyPrefix, ShardSupplier shardSupplier, int unackTime, int unackHandlerIntervalInMS, ShardingStrategy shardingStrategy) { this(Clock.systemDefaultZone(), quorumConn, nonQuorumConn, redisKeyPrefix, shardSupplier, unackTime, unackHandlerIntervalInMS, shardingStrategy); }
在有分片時,就從nonQuorumConn(就是本地Redis)提取。
使用nonQuorumConn來預取的緣由是:最終一致性(eventual consistency)。
由於 replication lag,在某一時刻不一樣分片的數據可能不同,因此須要先預取。這就須要使用 nonQuorumConn 來預取,由於本地 redis 的數據纔是正確的。
private Set<String> doPeekIdsFromShardHelper(final String queueShardName, final double peekTillTs, final int offset,final int count) { return nonQuorumConn.zrangeByScore(queueShardName, 0, peekTillTs, offset, count); }
再好比處理沒有 ack 的消息時,先從 nonQuorumConn 讀取信息ID,再從 quorumConn 讀取消息內容。
這就是由於一致性致使的,因此以下:
@Override public void processUnacks() { execute("processUnacks", keyName, () -> { Set<Tuple> unacks = nonQuorumConn.zrangeByScoreWithScores(unackShardName, 0, now, 0, batchSize); for (Tuple unack : unacks) { double score = unack.getScore(); String member = unack.getElement(); String payload = quorumConn.hget(messageStoreKey, member); long added_back = quorumConn.zadd(localQueueShard, score, member); } }); }
再好比從本地提取消息就使用了 nonQuorumConn。
@Override public Message localGet(String messageId) { try { return execute("localGet", messageStoreKey, () -> { String json = nonQuorumConn.hget(messageStoreKey, messageId); Message msg = om.readValue(json, Message.class); return msg; }); } }
再好比 popWithMsgIdHelper 也是先讀取 nonQuorumConn,再從 quorumConn 讀取其餘內容。
public Message popWithMsgIdHelper(String messageId, String targetShard, boolean warnIfNotExists) { try { return execute("popWithMsgId", targetShard, () -> { String queueShardName = getQueueShardKey(queueName, targetShard); double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue(); String unackShardName = getUnackKey(queueName, targetShard); ZAddParams zParams = ZAddParams.zAddParams().nx(); Long exists = nonQuorumConn.zrank(queueShardName, messageId); // If we get back a null type, then the element doesn't exist. if (exists == null) { // We only have a 'warnIfNotExists' check for this call since not all messages are present in // all shards. So we want to avoid a log spam. If any of the following calls return 'null' or '0', // we may have hit an inconsistency (because it's in the queue, but other calls have failed), // so make sure to log those. monitor.misses.increment(); return null; } String json = quorumConn.hget(messageStoreKey, messageId); if (json == null) { monitor.misses.increment(); return null; } long added = quorumConn.zadd(unackShardName, unackScore, messageId, zParams); if (added == 0) { monitor.misses.increment(); return null; } long removed = quorumConn.zrem(queueShardName, messageId); if (removed == 0) { monitor.misses.increment(); return null; } Message msg = om.readValue(json, Message.class); return msg; }); } }
RedisQueues是爲用戶提供的外部接口,從其成員變量能夠看出來其內部機制,好比各類策略。
public class RedisQueues implements Closeable { private final Clock clock; private final JedisCommands quorumConn; private final JedisCommands nonQuorumConn; private final Set<String> allShards; private final String shardName; private final String redisKeyPrefix; private final int unackTime; private final int unackHandlerIntervalInMS; private final ConcurrentHashMap<String, DynoQueue> queues; private final ShardingStrategy shardingStrategy; private final boolean singleRingTopology; }
用戶經過get方法來獲得DynoQueue:DynoQueue V1Queue = queues.get("simpleQueue")
。
public DynoQueue get(String queueName) { String key = queueName.intern(); return queues.computeIfAbsent(key, (keyToCompute) -> new RedisDynoQueue(clock, redisKeyPrefix, queueName, allShards, shardName, unackHandlerIntervalInMS, shardingStrategy, singleRingTopology) .withUnackTime(unackTime) .withNonQuorumConn(nonQuorumConn) .withQuorumConn(quorumConn)); }
咱們看看 Dyno-queues 中幾種數據結構。
一個完整的消息隊列應該定義清楚本身能夠投遞的消息類型,如事務型消息,本地非持久型消息,以及服務端不落地的非可靠消息等。對不一樣的業務場景作不一樣的選擇。
Dyno-queues 只有服務端落地的可靠消息。每一個延時消息必須包括如下參數:
public class Message { private String id; private String payload; private long timeout; private int priority; private String shard; }
Dyno-queues 關於存儲的整體思路是:hash 記錄消息內容, zset 實現按到期時間排序的隊列
,即:
具體邏輯如圖,這裏的虛線指的是二者經過 msg id 來進行邏輯上的管理,物理沒有關聯:
+----------+----------+----------+-----+----------+ | | | | | | zset | msg id 1 | msg id 2 | msg id 3 | ... | msg id n | | | | | | | +---+------+----+-----+----+-----+-----+----+-----+ | | | | | | | | v v v v +---+---+ +---+---+ +--+----+ +--+--+ hash | msg 1 | | msg 2 | | msg 3 | |msg n| +-------+ +-------+ +-------+ +-----+
具體到代碼,則是:
quorumConn.hset(messageStoreKey, message.getId(), json);
double score = Long.valueOf(clock.millis() + message.getTimeout()).doubleValue() + priority;
具體參見以下:
for (Message message : messages) { String json = om.writeValueAsString(message); quorumConn.hset(messageStoreKey, message.getId(), json); double priority = message.getPriority() / 100.0; double score = Long.valueOf(clock.millis() + message.getTimeout()).doubleValue() + priority; String shard = shardingStrategy.getNextShard(allShards, message); String queueShard = getQueueShardKey(queueName, shard); quorumConn.zadd(queueShard, score, message.getId()); }
RedisDynoQueue是 Dyno-queues 延遲隊列的主要實現。
從Redis角度來看,對於每一個隊列,維護三組Redis數據結構:
這三組Redis數據結構在RedisDynoQueue內部其實沒有對應的成員變量,對於RedisDynoQueue 來講,看起來是邏輯概念,而事實上它們存在於Redis的內部存儲中,由Dynomite負責高可用等等。
具體以下:
message list zset +----------+----------+----------+-----+----------+ | | | | | | | msg id 1 | msg id 2 | msg id 3 | ... | msg id 9 | | | | | | | +---+------+----+-----+----+-----+-----+----+-----+ | | | | | | | | v v v v hash +---+---+ +---+---+ +--+----+ +--+--+ | msg 1 | | msg 2 | | msg 3 | |msg 9| +-------+ +-------+ +-------+ +-----+ unack list +------------+-------------+--------------+ zset | | | | | msg id 11 | msg id 12 | msg id 13 | | | | | +------------+-------------+--------------+
RedisDynoQueue 的成員變量能夠分類以下:
ObjectMapper om:用來把消息序列化,寫到redis中;
Clock clock:用覺得分數生成時間戳;
String redisKeyPrefix:每一個queue的用戶會給本身定義key;
String messageStoreKey:對於每一個Redis hash來講,能夠設定本身的field(字段),好比:
this.messageStoreKey = redisKeyPrefix + ".MESSAGE." + queueName; quorumConn.hget(messageStoreKey, messageId)
List
String localQueueShard:本地分區;
ShardingStrategy shardingStrategy:分區策略;
ConcurrentLinkedQueue
Map<String, ConcurrentLinkedQueue
this.unsafePrefetchedIdsAllShardsMap = new HashMap<>(); for (String shard : allShards) { unsafePrefetchedIdsAllShardsMap.put(getQueueShardKey(queueName, shard), new ConcurrentLinkedQueue<>()); }
int retryCount = 2:重試次數;
int unackTime = 60:用以生成ack隊列的分數。
double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue(); long added = quorumConn.zadd(unackShardName, unackScore, messageId, zParams);
ScheduledExecutorService schedulerForUnacksProcessing:用以生成線程,來按期ack
schedulerForUnacksProcessing = Executors.newScheduledThreadPool(1); if (this.singleRingTopology) { schedulerForUnacksProcessing.scheduleAtFixedRate(() -> atomicProcessUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS); } else { schedulerForUnacksProcessing.scheduleAtFixedRate(() -> processUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS); }
QueueMonitor monitor:監控與統計;
具體代碼以下:
public class RedisDynoQueue implements DynoQueue { private final Clock clock; private final String queueName; private final List<String> allShards; private final String shardName; private final String redisKeyPrefix; private final String messageStoreKey; private final String localQueueShard; private volatile int unackTime = 60; private final QueueMonitor monitor; private final ObjectMapper om; private volatile JedisCommands quorumConn; private volatile JedisCommands nonQuorumConn; private final ConcurrentLinkedQueue<String> prefetchedIds; private final Map<String, ConcurrentLinkedQueue<String>> unsafePrefetchedIdsAllShardsMap; private final ScheduledExecutorService schedulerForUnacksProcessing; private final int retryCount = 2; private final ShardingStrategy shardingStrategy; private final boolean singleRingTopology; }
至此,Dyno-queues 基本功能初步分析完畢,咱們下期繼續介紹消息產生,消費。
消息隊列的理解,幾種常見消息隊列對比,新手也能看得懂!----分佈式中間件消息隊列
http://blog.mikebabineau.com/2013/02/09/delay-queues-in-redis/
http://stackoverflow.com/questions/17014584/how-to-create-a-delayed-queue-in-rabbitmq
http://activemq.apache.org/delay-and-schedule-message-delivery.html
源碼分析] Dynomite 分佈式存儲引擎 之 DynoJedisClient(1)
源碼分析] Dynomite 分佈式存儲引擎 之 DynoJedisClient(2)