Dyno-queues 分佈式延遲隊列 之 基本功能

Dyno-queues 分佈式延遲隊列 之 基本功能

0x00 摘要

本系列咱們會以設計分佈式延遲隊列時重點考慮的模塊爲主線,穿插灌輸一些消息隊列的特性實現方法,經過分析Dyno-queues 分佈式延遲隊列的源碼來具體看看設計實現一個分佈式延遲隊列的方方面面。html

0x01 Dyno-queues分佈式延遲隊列

Dyno-queues 是 Netflix 實現的基於 Dynomite 和 Redis 構建的隊列。java

Dynomite是一種通用的實現,能夠與許多不一樣的 key-value 存儲引擎一塊兒使用。目前它提供了對Redis序列化協議(RESP)和Memcached寫協議的支持。linux

1.1 設計目標

具體設計目標依據業務系統不一樣而不一樣。git

Dyno-queues 的業務背景是:在 Netflix 的平臺上運行着許多的業務流程,這些流程的任務是經過異步編排進行驅動,如今要實現一個分佈式延遲隊列,這個延遲隊列具備以下特色:github

  • 分佈式;
  • 不用外部的鎖機制;
  • 高併發;
  • 至少一次語義交付;
  • 不遵循嚴格的FIFO;
  • 延遲隊列(消息在未來某個時間以前不會從隊列中取出);
  • 優先級;

1.2 選型思路

Netflix 選擇 Dynomite,是由於:redis

  • 其具備性能,多數據中心複製和高可用性的特色;
  • Dynomite提供分片和可插拔的數據存儲引擎,容許在數據需求增長垂直和水平擴展;

Netflix選擇Redis做爲構建隊列的存儲引擎是由於:數據庫

  • Redis架構經過提供構建隊列所需的數據結構很好地支持了隊列設計,同時Redis的性能也很是優秀,具有低延遲的特性;
  • Dynomite在Redis之上提供了高可用性、對等複製以及一致性等特性,用於構建分佈式集羣隊列;

0x02 整體設計

2.1 系統假設

查詢模型:基於Key-Value模型,而不是SQL,即關係模型。存儲對象比較小。apache

ACID屬性:傳統的關係數據庫中,用ACID(A原子性、C一致性、I 隔離性、D持久性)來保證事務,在保證ACID的前提下每每有不好的可用性。Dynamo用弱一致性C來達到高可用,不提供數據隔離 I,只容許單Key更新編程

2.2 高可用

其實全部的高可用,是能夠依賴於RPC和存儲的高可用來實現的。json

  • 先來看RPC的高可用,好比美團的基於MTThrift的RPC框架,阿里的Dubbo等,其自己就具備服務自動發現,負載均衡等功能。
  • 而消息隊列的高可用,只要保證broker接受消息和確認消息的接口是冪等的,而且consumer的幾臺機器處理消息是冪等的,這樣就把消息隊列的可用性,轉交給RPC框架來處理了。

Netflix 選擇 Dynomite,是由於

  • 其具備高性能,多數據中心複製和高可用性的特色;
  • Dynomite 提供分片和可插拔的數據存儲引擎,容許在數據需求增長垂直和水平擴展;

因此 Dyno-queues 的高可用就自動解決了。

2.3 冪等

怎麼保證冪等呢?最簡單的方式莫過於共享存儲。broker多機器共享一個DB或者一個分佈式文件/kv系統,則處理消息天然是冪等的。就算有單點故障,其餘節點能夠馬上頂上。

對於不共享存儲的隊列,如Kafka使用分區加主備模式,就略微麻煩一些。須要保證每個分區內的高可用性,也就是每個分區至少要有一個主備且須要作數據的同步。

Dynomite 使用 redis 集羣這個共享存儲 作了冪等保證。

2.4 承載消息堆積

消息到達服務端後,若是不通過任何處理就到接收者,broker就失去了它的意義。爲了知足咱們錯峯/流控/最終可達等一系列需求,把消息存儲下來,而後選擇時機投遞就顯得是瓜熟蒂落的了。

這個存儲能夠作成不少方式。好比存儲在內存裏,存儲在分佈式KV裏,存儲在磁盤裏,存儲在數據庫裏等等。但歸結起來,主要有持久化和非持久化兩種。

持久化的形式能更大程度地保證消息的可靠性(如斷電等不可抗外力),而且理論上能承載更大限度的消息堆積(外存的空間遠大於內存)。

但並非每種消息都須要持久化存儲。不少消息對於投遞性能的要求大於可靠性的要求,且數量極大(如日誌)。這時候,消息不落地直接暫存內存,嘗試幾回failover,最終投遞出去也何嘗不可。

Dynomite 使用 redis 集羣這個共享存儲 在必定程度上緩解了消息堆積問題。

2.5 存儲子系統

咱們來看看若是須要數據落地的狀況下各類存儲子系統的選擇。理論上,從速度來看,文件系統 > 分佈式KV(持久化)> 分佈式文件系統 > 數據庫,而可靠性卻截然相反。仍是要從支持的業務場景出發做出最合理的選擇。

若是大家的消息隊列是用來支持支付/交易等對可靠性要求很是高,但對性能和量的要求沒有這麼高,並且沒有時間精力專門作文件存儲系統的研究,DB是最好的選擇。

可是DB受制於IOPS,若是要求單broker 5位數以上的QPS性能,基於文件的存儲是比較好的解決方案。總體上能夠採用數據文件 + 索引文件的方式處理。

分佈式KV(如MongoDB,HBase)等,或者持久化的Redis,因爲其編程接口較友好,性能也比較可觀,若是在可靠性要求不是那麼高的場景,也不失爲一個不錯的選擇。

由於 場景是 可靠性要求不那麼高,因此 Dynomite 使用 redis 集羣這個存儲子系統 也是能夠的。

2.6 消費關係解析

下一個重要的事情就是解析發送接收關係,進行正確的消息投遞了。拋開現象看本質,發送接收關係無外乎是單播與廣播的區別。所謂單播,就是點到點;而廣播,是一點對多點。

通常比較通用的設計是支持組間廣播,不一樣的組註冊不一樣的訂閱。組內的不一樣機器,若是註冊一個相同的ID,則單播;若是註冊不一樣的ID(如IP地址+端口),則廣播。

至於廣播關係的維護,通常因爲消息隊列自己都是集羣,因此都維護在公共存儲上,如 config server、zookeeper等。維護廣播關係所要作的事情基本是一致的:

  • 發送關係的維護。
  • 發送關係變動時的通知。

本文後續會介紹如何維護髮送關係

2.7 數據分片

數據分片的邏輯既能夠實如今客戶端,也能夠實如今 Proxy 層,取決於你的架構如何設計。

傳統的數據庫中間件大多將分片邏輯實如今客戶端,經過改寫物理 SQL 訪問不一樣的 MySQL 庫;而在 NewSQL 數據庫倡導的計算存儲分離架構中,一般將分片邏輯實如今計算層,即 Proxy 層,經過無狀態的計算節點轉發用戶請求到正確的存儲節點。

在 Dynomite 之中,隊列根據可用區域進行分片,將數據推送到隊列時,經過輪訓機制肯定分片,這種機制能夠確保全部分片的數據是平衡的,每一個分片都表明Redis中的有序集合,有序集中的 key 是 queueName 和 AVAILABILITY _ZONE 的組合

public class RoundRobinStrategy implements ShardingStrategy {

    private final AtomicInteger nextShardIndex = new AtomicInteger(0);

    /**
     * Get shard based on round robin strategy.
     * @param allShards
     */
    @Override
    public String getNextShard(List<String> allShards, Message message) {
        int index = nextShardIndex.incrementAndGet();
        if (index >= allShards.size()) {
            nextShardIndex.set(0);
            index = 0;
        }
        return allShards.get(index);
    }
}

0x03 Dynomite 特性

3.1 可用分區和機架

Dyno-queues 隊列是在 Dynomite 的JAVA客戶端 Dyno 之上創建的,Dyno 爲持久鏈接提供鏈接池,而且能夠配置爲拓撲感知。關於 Dyno 具體能夠參見前文:

源碼分析] Dynomite 分佈式存儲引擎 之 DynoJedisClient(1)

源碼分析] Dynomite 分佈式存儲引擎 之 DynoJedisClient(2)

3.1.1 機架

Dyno爲應用程序提供特定的本地機架(在AWS中,機架是一個區域,例如 us-east-1a、us-east-1b等),us-east-1a的客戶端將鏈接到相同區域的Dynomite/Redis節點,除非該節點不可用,在這種狀況下該客戶端將進行故障轉移。這個屬性被用於經過區域劃分隊列。

3.1.2 分片

隊列根據可用區域進行分片,將數據推送到隊列時,經過輪訓機制肯定分片,這種機制能夠確保全部分片的數據是平衡的,每一個分片都表明Redis中的有序集合,有序集中的key是queueName和AVAILABILITY _ZONE的組合。

具體機制舉例以下:

public class RoundRobinStrategy implements ShardingStrategy {

    private final AtomicInteger nextShardIndex = new AtomicInteger(0);

    /**
     * Get shard based on round robin strategy.
     * @param allShards
     */
    @Override
    public String getNextShard(List<String> allShards, Message message) {
        int index = nextShardIndex.incrementAndGet();
        if (index >= allShards.size()) {
            nextShardIndex.set(0);
            index = 0;
        }
        return allShards.get(index);
    }
}

3.2 Quorum

在分佈式系統中有個CAP理論,對於P(分區容忍性)而言,是實際存在 從而沒法避免的。由於分佈系統中的處理不是在本機,而是網絡中的許多機器相互通訊,故網絡分區、網絡通訊故障問題沒法避免。所以,只能儘可能地在C 和 A 之間尋求平衡。

對於數據存儲而言,爲了提升可用性(Availability),採用了副本備份,好比對於HDFS,默認每塊數據存三份。某數據塊所在的機器宕機了,就去該數據塊副本所在的機器上讀取(從這能夠看出,數據分佈方式是按「數據塊」爲單位分佈的)

可是問題來了,當須要修改數據時,就須要更新全部的副本數據,這樣才能保證數據的一致性(Consistency)。所以,就須要在 C(Consistency) 和 A(Availability) 之間權衡。

Quorum機制,就是這樣的一種權衡機制,一種將「讀寫轉化」的模型

3.2.1 數據一致性

  • 強一致性:在任意時刻,從任意不一樣副本取出的值都是同樣的。
  • 弱一致性:有時泛指最終一致性,是指在任意時刻,可能因爲網絡延遲或者設備異常等緣由,不一樣副本中的值可能會不同,但通過一段時間後,最終會變成同樣。

顯然,咱們更想要作到強一致性的這種效果,那麼有哪些方式能夠實現呢,其中最爲簡單直接的就是 WARO,也就是Write All Read one。

3.2.1.1 WARO 協議

WARO 是一種簡單的副本控制協議,當 Client 請求向某副本寫數據時(更新數據),只有當全部的副本都更新成功以後,此次寫操做纔算成功,不然視爲失敗。這樣的話,只須要讀任何一個副本上的數據便可。可是WARO帶來的影響是寫服務的可用性較低,由於只要有一個副本更新失敗,這次寫操做就視爲失敗了。

3.2.1.2 Quorum機制

Quorum 的定義以下:假設有 N 個副本,更新操做 wi 在 W 個副本中更新成功以後,則認爲這次更新操做 wi 成功,把此次成功提交的更新操做對應的數據叫作:「成功提交的數據」。對於讀操做而言,至少須要讀 R 個副本,其中,W+R>N ,即 W 和 R 有重疊,通常,W+R=N+1。

  • N = 存儲數據副本的數量;
  • W = 更新成功所需的副本;
  • R = 一次數據對象讀取要訪問的副本的數量;

Quorum機制認爲每次寫入的機器數目達到大多數(W)時,就認爲本次寫操做成功了。即Quorum機制可以不須要更新徹底部的數據,但又保證返回給用戶的是有效數據的解決方案。

3.2.2 ES 的quorum

咱們以 ES 爲例。

3.2.2.1 寫一致性

咱們在發送任何一個增刪改操做的時候,均可以帶上一個consistency參數,指明咱們想要的寫一致性是什麼。

  • one:要求寫操做只要primay shard是active可用的,就能夠執行;
  • all:要求寫操做必須全部的shard和replica都是active,才能夠執行;
  • quorum(默認):全部shard中必須是大部分是可用的(一半及以上),才能夠執行;
3.2.2.2 quorum機制

quorum = int((primary shard+number_of_replicas)/2)+1

若是節點數少於quorum,可能致使querum不齊全,進而致使沒法執行任何寫操做。quorum不齊全時,會進行等待。默認等待時間爲1分鐘,期待活躍的shard數量能夠增長,最後實在不行,就會timeout。

3.3 DC_QUORUM

3.3.1 配置

Dynomite 可以將最終一致性(eventual consistency)擴展爲協調一致性(tunable consistency)。

關於QUORUM,Dynomite有以下配置:

  • DC_ONE 本節點讀寫入完成及請求完成,其餘的rack異步寫入。使用DC_ONE模式,讀寫行爲在local Availability Zone(AZ)下是同步的;
  • DC_QUORUM 同步寫入到指定個數的rack中,其餘的節點異步寫入。使用DC_QUORUM模式,本地區域特定數量結點下的操做是同步的。
  • DC_SAFE_QUORUM 和DC_QUORUM相似,不過這個參數讀寫都要在指定個數的rack中成功而且數據校驗同步,纔算請求成功,否則會報錯。

由測試獲得的結果,Dynomite能從3,6,12,24一路擴展到48個節點,在DC_ONE和DC_QUORUM模式下,吞吐率都能線性地增加。與此同時,Dynomite在延遲方面只增長了不多的開支,即使在DC_QUORUM模式下,(延遲)也只有幾毫秒。DC_QUORUM模式在延遲和吞吐量方面處於劣勢,可是能爲客戶提供更好的讀寫保證。

3.3.2 實現

對於Dyno-queues來講,則是在實現中有所體現。好比在 RedisQueues 中,有以下成員變量:

private final JedisCommands quorumConn;

private final JedisCommands nonQuorumConn;

在構建 RedisQueues 時,就須要註明使用哪種。

而從註釋中咱們可知,

  • @param quorumConn Dyno connection with dc_quorum enabled,就是 採用了Quorum的Redis;
  • @param nonQuorumConn Dyno connection to local Redis,就是本地Redis;

生成 RedisQueues 的代碼以下(注意其中註釋):

/**
 * @param quorumConn Dyno connection with dc_quorum enabled
 * @param nonQuorumConn    Dyno connection to local Redis
 */
public RedisQueues(JedisCommands quorumConn, JedisCommands nonQuorumConn, String redisKeyPrefix, ShardSupplier shardSupplier, int unackTime, int unackHandlerIntervalInMS, ShardingStrategy shardingStrategy) {
    
    this(Clock.systemDefaultZone(), quorumConn, nonQuorumConn, redisKeyPrefix, shardSupplier, unackTime, unackHandlerIntervalInMS, shardingStrategy);
    
}

3.3.3 使用

在有分片時,就從nonQuorumConn(就是本地Redis)提取。

使用nonQuorumConn來預取的緣由是:最終一致性(eventual consistency)。

由於 replication lag,在某一時刻不一樣分片的數據可能不同,因此須要先預取。這就須要使用 nonQuorumConn 來預取,由於本地 redis 的數據纔是正確的

private Set<String> doPeekIdsFromShardHelper(final String queueShardName, final double peekTillTs, final int offset,final int count) {
    return nonQuorumConn.zrangeByScore(queueShardName, 0, peekTillTs, offset, count);
}

再好比處理沒有 ack 的消息時,先從 nonQuorumConn 讀取信息ID,再從 quorumConn 讀取消息內容。

這就是由於一致性致使的,因此以下:

@Override
public void processUnacks() {
        execute("processUnacks", keyName, () -> {
            
            Set<Tuple> unacks = nonQuorumConn.zrangeByScoreWithScores(unackShardName, 0, now, 0, batchSize);
            
            for (Tuple unack : unacks) {
                double score = unack.getScore();
                String member = unack.getElement();
                String payload = quorumConn.hget(messageStoreKey, member);
                long added_back = quorumConn.zadd(localQueueShard, score, member);
            }
        });
}

再好比從本地提取消息就使用了 nonQuorumConn。

@Override
public Message localGet(String messageId) {
    try {

        return execute("localGet", messageStoreKey, () -> {
            String json = nonQuorumConn.hget(messageStoreKey, messageId);

            Message msg = om.readValue(json, Message.class);
            return msg;
        });

    } 
}

再好比 popWithMsgIdHelper 也是先讀取 nonQuorumConn,再從 quorumConn 讀取其餘內容。

public Message popWithMsgIdHelper(String messageId, String targetShard, boolean warnIfNotExists) {

    try {
        return execute("popWithMsgId", targetShard, () -> {

            String queueShardName = getQueueShardKey(queueName, targetShard);
            double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue();
            String unackShardName = getUnackKey(queueName, targetShard);

            ZAddParams zParams = ZAddParams.zAddParams().nx();

            Long exists = nonQuorumConn.zrank(queueShardName, messageId);
            // If we get back a null type, then the element doesn't exist.
            if (exists == null) {
                // We only have a 'warnIfNotExists' check for this call since not all messages are present in
                // all shards. So we want to avoid a log spam. If any of the following calls return 'null' or '0',
                // we may have hit an inconsistency (because it's in the queue, but other calls have failed),
                // so make sure to log those.
                monitor.misses.increment();
                return null;
            }

            String json = quorumConn.hget(messageStoreKey, messageId);
            if (json == null) {
                monitor.misses.increment();
                return null;
            }

            long added = quorumConn.zadd(unackShardName, unackScore, messageId, zParams);
            if (added == 0) {
                monitor.misses.increment();
                return null;
            }

            long removed = quorumConn.zrem(queueShardName, messageId);
            if (removed == 0) {
                monitor.misses.increment();
                return null;
            }

            Message msg = om.readValue(json, Message.class);
            return msg;
        });
    } 
}

0x04 外層封裝

RedisQueues是爲用戶提供的外部接口,從其成員變量能夠看出來其內部機制,好比各類策略。

public class RedisQueues implements Closeable {

    private final Clock clock;

    private final JedisCommands quorumConn;

    private final JedisCommands nonQuorumConn;

    private final Set<String> allShards;

    private final String shardName;

    private final String redisKeyPrefix;

    private final int unackTime;

    private final int unackHandlerIntervalInMS;

    private final ConcurrentHashMap<String, DynoQueue> queues;

    private final ShardingStrategy shardingStrategy;

    private final boolean singleRingTopology;
}

用戶經過get方法來獲得DynoQueue:DynoQueue V1Queue = queues.get("simpleQueue")

public DynoQueue get(String queueName) {

    String key = queueName.intern();

    return queues.computeIfAbsent(key, (keyToCompute) -> new RedisDynoQueue(clock, redisKeyPrefix, queueName, allShards, shardName, unackHandlerIntervalInMS, shardingStrategy, singleRingTopology)
            .withUnackTime(unackTime)
            .withNonQuorumConn(nonQuorumConn)
            .withQuorumConn(quorumConn));
}

0x05 數據結構

咱們看看 Dyno-queues 中幾種數據結構。

5.1 消息結構

一個完整的消息隊列應該定義清楚本身能夠投遞的消息類型,如事務型消息,本地非持久型消息,以及服務端不落地的非可靠消息等。對不一樣的業務場景作不一樣的選擇。

Dyno-queues 只有服務端落地的可靠消息。每一個延時消息必須包括如下參數:

  • id:惟一標示;
  • payload:消息過時以後發送mq的body,提供給消費這作具體的消息處理;
  • timeout:延時發送時間;
  • priority:優先級,與timeout一塊兒決定消息如何發佈,即同一 timeout 時間的消息中,哪一個優先使用。
  • shard:分區;
public class Message {
    private String id;
    private String payload;
    private long timeout;
    private int priority;
    private String shard;
}

5.2 存儲結構

Dyno-queues 關於存儲的整體思路是:hash 記錄消息內容, zset 實現按到期時間排序的隊列,即:

  • 利用hash 記錄消息內容;
    • 使用hset存儲消息;
    • 使用hget提取消息;
  • 經過Redis中的zset來實現一個延遲隊列,主要利用它的score屬性,Redis經過score來爲集合中的成員進行從小到大的排序;
    • 使用zadd key score1 value1命令生產消息;
    • 使用zrem消費消息;

具體邏輯如圖,這裏的虛線指的是二者經過 msg id 來進行邏輯上的管理,物理沒有關聯:

+----------+----------+----------+-----+----------+
        |          |          |          |     |          |
 zset   | msg id 1 | msg id 2 | msg id 3 | ... | msg id n |
        |          |          |          |     |          |
        +---+------+----+-----+----+-----+-----+----+-----+
            |           |          |                |
            |           |          |                |
            v           v          v                v
        +---+---+   +---+---+   +--+----+        +--+--+
hash    | msg 1 |   | msg 2 |   | msg 3 |        |msg n|
        +-------+   +-------+   +-------+        +-----+

具體到代碼,則是:

  • Message 的id做爲key,Message總體被打包成json String做爲value:quorumConn.hset(messageStoreKey, message.getId(), json);
  • 用Message 的超時時間,優先級以及當前時間戳構建出zset的分數:double score = Long.valueOf(clock.millis() + message.getTimeout()).doubleValue() + priority;

具體參見以下:

for (Message message : messages) {
    String json = om.writeValueAsString(message);
    quorumConn.hset(messageStoreKey, message.getId(), json);
    double priority = message.getPriority() / 100.0;
    double score = Long.valueOf(clock.millis() + message.getTimeout()).doubleValue() + priority;
    String shard = shardingStrategy.getNextShard(allShards, message);
    String queueShard = getQueueShardKey(queueName, shard);
    quorumConn.zadd(queueShard, score, message.getId());
}

0x06 隊列

RedisDynoQueue是 Dyno-queues 延遲隊列的主要實現。

6.1 Redis相關

從Redis角度來看,對於每一個隊列,維護三組Redis數據結構:

  • 包含隊列元素和分數的有序集合 zset;
  • 包含消息內容的Hash集合,其中key爲消息ID;
  • 包含客戶端已經消費但還沒有確認的消息有序集合,Un-ack集合 zset;

這三組Redis數據結構在RedisDynoQueue內部其實沒有對應的成員變量,對於RedisDynoQueue 來講,看起來是邏輯概念,而事實上它們存在於Redis的內部存儲中,由Dynomite負責高可用等等

具體以下:

message list



zset  +----------+----------+----------+-----+----------+
      |          |          |          |     |          |
      | msg id 1 | msg id 2 | msg id 3 | ... | msg id 9 |
      |          |          |          |     |          |
      +---+------+----+-----+----+-----+-----+----+-----+
          |           |          |                |
          |           |          |                |
          v           v          v                v
hash  +---+---+   +---+---+   +--+----+        +--+--+
      | msg 1 |   | msg 2 |   | msg 3 |        |msg 9|
      +-------+   +-------+   +-------+        +-----+



                  unack list
       +------------+-------------+--------------+
zset   |            |             |              |
       |  msg id 11 |   msg id 12 |   msg id 13  |
       |            |             |              |
       +------------+-------------+--------------+

6.2 成員變量

RedisDynoQueue 的成員變量能夠分類以下:

6.2.1 整體

  • String queueName:本Queue名字;
  • String shardName:分區名字;

6.2.2 Redis鏈接相關

  • JedisCommands quorumConn:採用 quorum 的鏈接;
  • JedisCommands nonQuorumConn:非Quorum的鏈接;

6.2.3 Redis操做相關

  • ObjectMapper om:用來把消息序列化,寫到redis中;

  • Clock clock:用覺得分數生成時間戳;

  • String redisKeyPrefix:每一個queue的用戶會給本身定義key;

  • String messageStoreKey:對於每一個Redis hash來講,能夠設定本身的field(字段),好比:

    this.messageStoreKey = redisKeyPrefix + ".MESSAGE." + queueName;
    
    quorumConn.hget(messageStoreKey, messageId)
  • List allShards:全部分區;

  • String localQueueShard:本地分區;

  • ShardingStrategy shardingStrategy:分區策略;

  • ConcurrentLinkedQueue prefetchedIds:Prefetch message IDs from the local shard;本地分區優先的消息;

  • Map<String, ConcurrentLinkedQueue > unsafePrefetchedIdsAllShardsMap;

    this.unsafePrefetchedIdsAllShardsMap = new HashMap<>();
    
    for (String shard : allShards) {
        unsafePrefetchedIdsAllShardsMap.put(getQueueShardKey(queueName, shard), new ConcurrentLinkedQueue<>());
    }
  • int retryCount = 2:重試次數;

6.2.4 Ack相關

  • int unackTime = 60:用以生成ack隊列的分數。

    double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue();
    
    long added = quorumConn.zadd(unackShardName, unackScore, messageId, zParams);
  • ScheduledExecutorService schedulerForUnacksProcessing:用以生成線程,來按期ack

schedulerForUnacksProcessing = Executors.newScheduledThreadPool(1);

if (this.singleRingTopology) {
    schedulerForUnacksProcessing.scheduleAtFixedRate(() -> atomicProcessUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
} else {
    schedulerForUnacksProcessing.scheduleAtFixedRate(() -> processUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
}
  • boolean singleRingTopology:

6.2.5 監控與統計

QueueMonitor monitor:監控與統計;

6.2.6 具體定義

具體代碼以下:

public class RedisDynoQueue implements DynoQueue {
    private final Clock clock;

    private final String queueName;

    private final List<String> allShards;

    private final String shardName;

    private final String redisKeyPrefix;

    private final String messageStoreKey;

    private final String localQueueShard;

    private volatile int unackTime = 60;

    private final QueueMonitor monitor;

    private final ObjectMapper om;

    private volatile JedisCommands quorumConn;

    private volatile JedisCommands nonQuorumConn;

    private final ConcurrentLinkedQueue<String> prefetchedIds;

    private final Map<String, ConcurrentLinkedQueue<String>> unsafePrefetchedIdsAllShardsMap;

    private final ScheduledExecutorService schedulerForUnacksProcessing;

    private final int retryCount = 2;

    private final ShardingStrategy shardingStrategy;

    private final boolean singleRingTopology;
}

至此,Dyno-queues 基本功能初步分析完畢,咱們下期繼續介紹消息產生,消費。

0xFF 參考

乾貨分享 | 如何從零開始設計一個消息隊列

消息隊列的理解,幾種常見消息隊列對比,新手也能看得懂!----分佈式中間件消息隊列

消息隊列設計精要

有贊延遲隊列設計

基於Dynomite的分佈式延遲隊列

http://blog.mikebabineau.com/2013/02/09/delay-queues-in-redis/

http://stackoverflow.com/questions/17014584/how-to-create-a-delayed-queue-in-rabbitmq

http://activemq.apache.org/delay-and-schedule-message-delivery.html

源碼分析] Dynomite 分佈式存儲引擎 之 DynoJedisClient(1)

源碼分析] Dynomite 分佈式存儲引擎 之 DynoJedisClient(2)

原創 Amazon Dynamo系統架構

Netlix Dynomite性能基準測試,基於AWS和Redis

爲何分佈式必定要有延時任務?

相關文章
相關標籤/搜索