Dyno-queues 分佈式延遲隊列 之 輔助功能

Dyno-queues 分佈式延遲隊列 之 輔助功能

0x00 摘要

本系列咱們會以設計分佈式延遲隊列時重點考慮的模塊爲主線,穿插灌輸一些消息隊列的特性實現方法,經過分析Dyno-queues 分佈式延遲隊列的源碼來具體看看設計實現一個分佈式延遲隊列的方方面面。html

0x01 前文回顧

前面兩篇文章介紹了設計思路,消息的產生和消費。本文介紹一些輔助功能,有了這些功能可讓系統更加完善。java

0x2 Ack機制

前面提到,從Redis角度來看,Dyno-queues 對於每一個隊列,維護三組Redis數據結構:linux

  • 包含隊列元素和分數的有序集合;
  • 包含消息內容的Hash集合,其中key爲消息ID;
  • 包含客戶端已經消費但還沒有確認的消息有序集合,Un-ack集合

這裏的第三組數據結構,就是支持咱們的 Ack 機制。redis

2.1 加入Un-ack集合

前面提到,_pop 是消費消息,具體 _pop 的邏輯以下:apache

  • 計算當前時間爲最大分數。
  • 獲取分數在 0 和 最大分數 之間的消息。
  • 將 messageID 添加到 unack 集合中,並從隊列的有序集中刪除這個 messageID
  • 若是上一步成功,則根據messageID從Redis集合中檢索消息。

這就是涉及到 包含客戶端已經消費但還沒有確認的消息有序集合,Un-ack集合json

代碼以下:服務器

private List<Message> _pop(String shard, int messageCount,
                           ConcurrentLinkedQueue<String> prefetchedIdQueue) throws Exception {
    String queueShardName = getQueueShardKey(queueName, shard);
    String unackShardName = getUnackKey(queueName, shard);
    double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue();

    // NX option indicates add only if it doesn't exist.
    // https://redis.io/commands/zadd#zadd-options-redis-302-or-greater
    ZAddParams zParams = ZAddParams.zAddParams().nx();

    List<Message> popped = new LinkedList<>();
    for (;popped.size() != messageCount;) {
        String msgId = prefetchedIdQueue.poll();

        //將messageID添加到unack集合中
        long added = quorumConn.zadd(unackShardName, unackScore, msgId, zParams);
        if(added == 0){
            monitor.misses.increment();
            continue;
        }

        long removed = quorumConn.zrem(queueShardName, msgId);
        if (removed == 0) {
            monitor.misses.increment();
            continue;
        }

        String json = quorumConn.hget(messageStoreKey, msgId);
        if (json == null) {
            monitor.misses.increment();
            continue;
        }
        Message msg = om.readValue(json, Message.class);
        popped.add(msg);

        if (popped.size() == messageCount) {
            return popped;
        }
    }
    return popped;
}

此時邏輯以下:網絡

message list



zset  +----------+----------+----------+-----+----------+  _pop (msg id 9)
      |          |          |          |     |          |
      | msg id 1 | msg id 2 | msg id 3 | ... | msg id 9 | +----+
      |          |          |          |     |          |      |
      +---+------+----+-----+----+-----+-----+----+-----+      |
          |           |          |                |            |
          |           |          |                |            |
          v           v          v                v            |
hash  +---+---+   +---+---+   +--+----+        +--+--+         |
      | msg 1 |   | msg 2 |   | msg 3 |        |msg 9|         |
      +-------+   +-------+   +-------+        +-----+         |
                                                               |
                                                               |
                                                               |
                                                               |
                                                               |
                                                               |
                  unack list                                   |
       +------------+-------------+--------------+             |
zset   |            |             |              |             |
       |  msg id 11 |   msg id 12 |   msg id 13  |  <----------+
       |            |             |              |
       +------------+-------------+--------------+

2.2 ACK

用戶當獲得消息以後,須要Ack消息,好比:數據結構

List pushed_msgs = V1Queue.push(payloads);

Message poppedWithPredicate = V1Queue.popMsgWithPredicate("searchable pay*", false);

V1Queue.ack(poppedWithPredicate.getId());

Ack的邏輯是:架構

  • 從unack集合中刪除messageID。
  • 由於此時已是ack了,因此此消息就完全沒有意義了,因此從Message有效集合中刪除messageID。

代碼以下:

@Override
public boolean ack(String messageId) {
    try {
        return execute("ack", "(a shard in) " + queueName, () -> {

            for (String shard : allShards) {
                String unackShardKey = getUnackKey(queueName, shard);
                Long removed = quorumConn.zrem(unackShardKey, messageId);
                if (removed > 0) {
                    quorumConn.hdel(messageStoreKey, messageId);
                    return true;
                }
            }
            return false;
        });
    } 
}

private String getUnackKey(String queueName, String shard) {
		return redisKeyPrefix + ".UNACK." + queueName + "." + shard;
}

具體以下:

message list



zset  +----------+----------+----------+------
      |          |          |          |     |
      | msg id 1 | msg id 2 | msg id 3 | ... |
      |          |          |          |     |
      +---+------+----+-----+----+-----+-----+
          |           |          |
          |           |          |
          v           v          v                         delete
hash  +---+---+   +---+---+   +--+----+        +-----+
      | msg 1 |   | msg 2 |   | msg 3 |        |msg 9|    <----+  ACK(msg id 9)
      +-------+   +-------+   +-------+        +-----+                  +
                                                                        |
                                                                        |
                                                                        |
                                                                        |
                                                                        |
                                                                        |
                  unack list                                            |
       +------------+-------------+--------------+-------------+  delete|
zset   |            |             |              |             |        |
       |  msg id 11 |   msg id 12 |   msg id 13  |   msg id 9  |  <-----+
       |            |             |              |             |
       +------------+-------------+--------------+-------------+

2.3 處理Un-ACK的消息

後臺進程會定時作檢測,即 監視 UNACK 集合中的消息,這些消息在給定時間內未被客戶端確認(每一個隊列可配置)。這些消息將移回到隊列中。

2.3.1 定時任務

定時任務是以下代碼來啓動:

schedulerForUnacksProcessing = Executors.newScheduledThreadPool(1);

if (this.singleRingTopology) {
    schedulerForUnacksProcessing.scheduleAtFixedRate(() -> atomicProcessUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
} else {
    schedulerForUnacksProcessing.scheduleAtFixedRate(() -> processUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
}

2.3.2 Un-ACK

以下代碼,就是把未確認消息退回到隊列中。

@Override
public void processUnacks() {
    try {

        long queueDepth = size();
        monitor.queueDepth.record(queueDepth);

        String keyName = getUnackKey(queueName, shardName);
        
        execute("processUnacks", keyName, () -> {

            int batchSize = 1_000;
            String unackShardName = getUnackKey(queueName, shardName);

            double now = Long.valueOf(clock.millis()).doubleValue();
            int num_moved_back = 0;
            int num_stale = 0;

            Set<Tuple> unacks = nonQuorumConn.zrangeByScoreWithScores(unackShardName, 0, now, 0, batchSize);

            for (Tuple unack : unacks) {

                double score = unack.getScore();
                String member = unack.getElement();

                String payload = quorumConn.hget(messageStoreKey, member);
                if (payload == null) {
                    quorumConn.zrem(unackShardName, member);
                    ++num_stale;
                    continue;
                }

                long added_back = quorumConn.zadd(localQueueShard, score, member);
                long removed_from_unack = quorumConn.zrem(unackShardName, member);
                if (added_back > 0 && removed_from_unack > 0) ++num_moved_back;
            }
            return null;
        });

    } 
}

此時邏輯以下:

message list



           zset  +----------+----------+----------+-----+
                 |          |          |          |     |
+------------->  | msg id 1 | msg id 2 | msg id 3 | ... |
|                |          |          |          |     |
|                +---+------+----+-----+----+-----+-----+
|                    |           |          |
|                    |           |          |
|                    v           v          v
|          hash  +---+---+   +---+---+   +--+----+
|                | msg 1 |   | msg 2 |   | msg 3 |
|                +-------+   +-------+   +-------+
|
|
|
|                           unack list
|                +------------+-------------+--------------+
|         zset   |            |             |              |
|                |  msg id 11 |   msg id 12 |   msg id 13  |
+-------------+  |            |             |              |
  msg id 11      +-------+----+-------------+--------------+
                         ^
                         |  msg id 11
                         |
                 +-------+---------+
                 |                 |
                 | ScheduledThread |
                 |                 |
                 +-----------------+

0x03 防止重複消費

對於防止重複消費,系統作了以下努力:

  • 每一個節點(上圖中的N1...Nn)與可用性區域具備關聯性,而且與該區域中的redis服務器進行通訊。
  • Dynomite / Redis節點一次只能提供一個請求,Dynomite能夠容許數千個併發鏈接,可是請求是由Redis中的單個線程處理,這確保了當發出兩個併發調用從隊列輪詢元素時,是由Redis服務器順序執行,從而避免任何本地或分佈式鎖。
  • 在發生故障轉移的狀況下,確保沒有兩個客戶端鏈接從隊列中獲取相同的消息。

0x04 防止消息丟失

4.1 消息丟失的可能

4.1.1 生產者弄丟了數據

生產者將數據發送到 MQ 的時候,可能數據就在半路給搞丟了,由於網絡問題啥的,都有可能。

好比,以下就是簡單的插入,缺乏必要的保證。

List pushed_msgs = V1Queue.push(payloads);

4.1.2 MQ 弄丟了數據

這種狀況就是 MQ 本身弄丟了數據,這個你必須開啓MQ 的持久化,就是消息寫入以後會持久化到磁盤,哪怕是 MQ 本身掛了,恢復以後會自動讀取以前存儲的數據,通常數據不會丟。

4.2 Dyno-queues 保證

Dyno-queues 使用ensure來確認消息徹底寫入到全部分區

簡單來講,就是:

  1. 對於全部分區,逐一進行:"寫數據(就是message id),讀出寫入的數據" 這樣的操做。若是有一個分區寫出錯,就返回失敗。
  2. 若是把 message id 都已經寫入到全部的分區,再寫入消息內容。

Enqueues 'message' if it doesn't exist in any of the shards or unack sets.

@Override
public boolean ensure(Message message) {
    return execute("ensure", "(a shard in) " + queueName, () -> {

        String messageId = message.getId();
        for (String shard : allShards) {

            String queueShard = getQueueShardKey(queueName, shard);
            Double score = quorumConn.zscore(queueShard, messageId);
            if (score != null) {
                return false;
            }
            String unackShardKey = getUnackKey(queueName, shard);
            score = quorumConn.zscore(unackShardKey, messageId);
            if (score != null) {
                return false;
            }
            
        }
        push(Collections.singletonList(message));
        return true;
    });
}

0x05 過時消息

針對過時消息,Dyno-queues 的處理方式是一次性找出過時消息給用戶處理,其中過時時間由用戶在參數中設定。

因此 findStaleMessages 就是利用 lua 腳本找出過時消息。

@Override
public List<Message> findStaleMessages() {
    return execute("findStaleMessages", localQueueShard, () -> {

        List<Message> stale_msgs = new ArrayList<>();

        int batchSize = 10;

        double now = Long.valueOf(clock.millis()).doubleValue();
        long num_stale = 0;

        for (String shard : allShards) {
            String queueShardName = getQueueShardKey(queueName, shard);
            
            Set<String> elems = nonQuorumConn.zrangeByScore(queueShardName, 0, now, 0, batchSize);

            if (elems.size() == 0) {
                continue;
            }

            String findStaleMsgsScript = "local hkey=KEYS[1]\n" +
                    "local queue_shard=ARGV[1]\n" +
                    "local unack_shard=ARGV[2]\n" +
                    "local num_msgs=ARGV[3]\n" +
                    "\n" +
                    "local stale_msgs={}\n" +
                    "local num_stale_idx = 1\n" +
                    "for i=0,num_msgs-1 do\n" +
                    "  local msg_id=ARGV[4+i]\n" +
                    "\n" +
                    "  local exists_hash = redis.call('hget', hkey, msg_id)\n" +
                    "  local exists_queue = redis.call('zscore', queue_shard, msg_id)\n" +
                    "  local exists_unack = redis.call('zscore', unack_shard, msg_id)\n" +
                    "\n" +
                    "  if (exists_hash and exists_queue) then\n" +
                    "  elseif (not (exists_unack)) then\n" +
                    "    stale_msgs[num_stale_idx] = msg_id\n" +
                    "    num_stale_idx = num_stale_idx + 1\n" +
                    "  end\n" +
                    "end\n" +
                    "\n" +
                    "return stale_msgs\n";

            String unackKey = getUnackKey(queueName, shard);
            ImmutableList.Builder builder = ImmutableList.builder();
            builder.add(queueShardName);
            builder.add(unackKey);
            builder.add(Integer.toString(elems.size()));
            for (String msg : elems) {
                builder.add(msg);
            }

            ArrayList<String> stale_msg_ids = (ArrayList) ((DynoJedisClient)quorumConn).eval(findStaleMsgsScript, Collections.singletonList(messageStoreKey), builder.build());
            num_stale = stale_msg_ids.size();

            for (String m : stale_msg_ids) {
                Message msg = new Message();
                msg.setId(m);
                stale_msgs.add(msg);
            }
        }

        return stale_msgs;
    });
}

0x6 消息刪除

Dyno-queues 支持消息刪除:業務使用方能夠隨時刪除指定消息。

具體刪除是 從 unack隊列 和 正常隊列中刪除。

@Override
public boolean remove(String messageId) {
		return execute("remove", "(a shard in) " + queueName, () -> {

            for (String shard : allShards) {

                String unackShardKey = getUnackKey(queueName, shard);
                quorumConn.zrem(unackShardKey, messageId);

                String queueShardKey = getQueueShardKey(queueName, shard);
                Long removed = quorumConn.zrem(queueShardKey, messageId);

                if (removed > 0) {
                    // Ignoring return value since we just want to get rid of it.
                    Long msgRemoved = quorumConn.hdel(messageStoreKey, messageId);
                    return true;
                }
            }
            return false;
        });
}

0x07 批量處理以增長吞吐

Dyno-queues 利用lua腳原本進行批量處理,這樣能夠增長吞吐。

7.1 Lua腳本

Redis中爲何引入Lua腳本?

Redis提供了很是豐富的指令集,官網上提供了200多個命令。可是某些特定領域,須要擴充若干指令原子性執行時,僅使用原生命令便沒法完成。

Redis 爲這樣的用戶場景提供了 lua 腳本支持,用戶能夠向服務器發送 lua 腳原本執行自定義動做,獲取腳本的響應數據。Redis 服務器會單線程原子性執行 lua 腳本,保證 lua 腳本在處理的過程當中不會被任意其它請求打斷。

使用腳本的好處以下:

  • 減小網絡開銷。能夠將多個請求經過腳本的形式一次發送,減小網絡時延。
  • 原子操做。Redis會將整個腳本做爲一個總體執行,中間不會被其餘請求插入。所以在腳本運行過程當中無需擔憂會出現競態條件,無需使用事務。
  • 複用。客戶端發送的腳本會永久存在redis中,這樣其餘客戶端能夠複用這一腳本,而不須要使用代碼完成相同的邏輯。

7.2 實現

具體代碼以下,能夠看到就是採用了lua腳本一次性寫入:

// TODO: Do code cleanup/consolidation
private List<Message> atomicBulkPopHelper(int messageCount,
                      ConcurrentLinkedQueue<String> prefetchedIdQueue, boolean localShardOnly) throws IOException {

    double now = Long.valueOf(clock.millis() + 1).doubleValue();
    double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue();

    // The script requires the scores as whole numbers
    NumberFormat fmt = NumberFormat.getIntegerInstance();
    fmt.setGroupingUsed(false);
    String nowScoreString = fmt.format(now);
    String unackScoreString = fmt.format(unackScore);

    List<String> messageIds = new ArrayList<>();
    for (int i = 0; i < messageCount; ++i) {
        messageIds.add(prefetchedIdQueue.poll());
    }

    String atomicBulkPopScriptLocalOnly="local hkey=KEYS[1]\n" +
            "local num_msgs=ARGV[1]\n" +
            "local peek_until=ARGV[2]\n" +
            "local unack_score=ARGV[3]\n" +
            "local queue_shard_name=ARGV[4]\n" +
            "local unack_shard_name=ARGV[5]\n" +
            "local msg_start_idx = 6\n" +
            "local idx = 1\n" +
            "local return_vals={}\n" +
            "for i=0,num_msgs-1 do\n" +
            "  local message_id=ARGV[msg_start_idx + i]\n" +
            "  local exists = redis.call('zscore', queue_shard_name, message_id)\n" +
            "  if (exists) then\n" +
            "    if (exists <=peek_until) then\n" +
            "      local value = redis.call('hget', hkey, message_id)\n" +
            "      if (value) then\n" +
            "        local zadd_ret = redis.call('zadd', unack_shard_name, 'NX', unack_score, message_id)\n" +
            "        if (zadd_ret) then\n" +
            "          redis.call('zrem', queue_shard_name, message_id)\n" +
            "          return_vals[idx]=value\n" +
            "          idx=idx+1\n" +
            "        end\n" +
            "      end\n" +
            "    end\n" +
            "  else\n" +
            "    return {}\n" +
            "  end\n" +
            "end\n" +
            "return return_vals";

    String atomicBulkPopScript="local hkey=KEYS[1]\n" +
            "local num_msgs=ARGV[1]\n" +
            "local num_shards=ARGV[2]\n" +
            "local peek_until=ARGV[3]\n" +
            "local unack_score=ARGV[4]\n" +
            "local shard_start_idx = 5\n" +
            "local msg_start_idx = 5 + (num_shards * 2)\n" +
            "local out_idx = 1\n" +
            "local return_vals={}\n" +
            "for i=0,num_msgs-1 do\n" +
            "  local found_msg=false\n" +
            "  local message_id=ARGV[msg_start_idx + i]\n" +
            "  for j=0,num_shards-1 do\n" +
            "    local queue_shard_name=ARGV[shard_start_idx + (j*2)]\n" +
            "    local unack_shard_name=ARGV[shard_start_idx + (j*2) + 1]\n" +
            "    local exists = redis.call('zscore', queue_shard_name, message_id)\n" +
            "    if (exists) then\n" +
            "      found_msg=true\n" +
            "      if (exists <=peek_until) then\n" +
            "        local value = redis.call('hget', hkey, message_id)\n" +
            "        if (value) then\n" +
            "          local zadd_ret = redis.call('zadd', unack_shard_name, 'NX', unack_score, message_id)\n" +
            "          if (zadd_ret) then\n" +
            "            redis.call('zrem', queue_shard_name, message_id)\n" +
            "            return_vals[out_idx]=value\n" +
            "            out_idx=out_idx+1\n" +
            "            break\n" +
            "          end\n" +
            "        end\n" +
            "      end\n" +
            "    end\n" +
            "  end\n" +
            "  if (found_msg == false) then\n" +
            "    return {}\n" +
            "  end\n" +
            "end\n" +
            "return return_vals";

    List<Message> payloads = new ArrayList<>();
    if (localShardOnly) {
        String unackShardName = getUnackKey(queueName, shardName);

        ImmutableList.Builder builder = ImmutableList.builder();
        builder.add(Integer.toString(messageCount));
        builder.add(nowScoreString);
        builder.add(unackScoreString);
        builder.add(localQueueShard);
        builder.add(unackShardName);
        for (int i = 0; i < messageCount; ++i) {
            builder.add(messageIds.get(i));
        }

        List<String> jsonPayloads;
        // Cast from 'JedisCommands' to 'DynoJedisClient' here since the former does not expose 'eval()'.
        jsonPayloads = (List) ((DynoJedisClient) quorumConn).eval(atomicBulkPopScriptLocalOnly,
                Collections.singletonList(messageStoreKey), builder.build());

        for (String p : jsonPayloads) {
            Message msg = om.readValue(p, Message.class);
            payloads.add(msg);
        }
    } else {
        ImmutableList.Builder builder = ImmutableList.builder();
        builder.add(Integer.toString(messageCount));
        builder.add(Integer.toString(allShards.size()));
        builder.add(nowScoreString);
        builder.add(unackScoreString);
        for (String shard : allShards) {
            String queueShard = getQueueShardKey(queueName, shard);
            String unackShardName = getUnackKey(queueName, shard);
            builder.add(queueShard);
            builder.add(unackShardName);
        }
        for (int i = 0; i < messageCount; ++i) {
            builder.add(messageIds.get(i));
        }

        List<String> jsonPayloads;
        // Cast from 'JedisCommands' to 'DynoJedisClient' here since the former does not expose 'eval()'.
        jsonPayloads = (List) ((DynoJedisClient) quorumConn).eval(atomicBulkPopScript,
                Collections.singletonList(messageStoreKey), builder.build());

        for (String p : jsonPayloads) {
            Message msg = om.readValue(p, Message.class);
            payloads.add(msg);
        }
    }

    return payloads;
}

0x08 V2

最新版本是 V2,有三個類,咱們看看具體是什麼做用。

  • QueueBuilder

  • MultiRedisQueue

  • RedisPipelineQueue

8.1 QueueBuilder

就是封裝,對外統一提供API。

public class QueueBuilder {

    private Clock clock;

    private String queueName;

    private String redisKeyPrefix;

    private int unackTime;

    private String currentShard;

    private ShardSupplier shardSupplier;

    private HostSupplier hs;

    private EurekaClient eurekaClient;

    private String applicationName;

    private Collection<Host> hosts;

    private JedisPoolConfig redisPoolConfig;

    private DynoJedisClient dynoQuorumClient;

    private DynoJedisClient dynoNonQuorumClient;
}

8.2 MultiRedisQueue

該類也是爲了提升速度,其內部包括多個RedisPipelineQueue,每一個queue表明一個分區,利用 round robin 方式寫入。

/**
 * MultiRedisQueue exposes a single queue using multiple redis queues.  Each RedisQueue is a shard.
 * When pushing elements to the queue, does a round robin to push the message to one of the shards.
 * When polling, the message is polled from the current shard (shardName) the instance is associated with.
 */
public class MultiRedisQueue implements DynoQueue {
    private List<String> shards;
    private String name;
    private Map<String, RedisPipelineQueue> queues = new HashMap<>();
    private RedisPipelineQueue me;
}

8.3 RedisPipelineQueue

這個類就是使用pipeline來提高吞吐。

Queue implementation that uses Redis pipelines that improves the throughput under heavy load.。

public class RedisPipelineQueue implements DynoQueue {

    private final Logger logger = LoggerFactory.getLogger(RedisPipelineQueue.class);

    private final Clock clock;

    private final String queueName;

    private final String shardName;

    private final String messageStoreKeyPrefix;

    private final String myQueueShard;

    private final String unackShardKeyPrefix;

    private final int unackTime;

    private final QueueMonitor monitor;

    private final ObjectMapper om;

    private final RedisConnection connPool;

    private volatile RedisConnection nonQuorumPool;

    private final ScheduledExecutorService schedulerForUnacksProcessing;

    private final HashPartitioner partitioner = new Murmur3HashPartitioner();

    private final int maxHashBuckets = 32;

    private final int longPollWaitIntervalInMillis = 10;
}

0xFF 參考

乾貨分享 | 如何從零開始設計一個消息隊列

消息隊列的理解,幾種常見消息隊列對比,新手也能看得懂!----分佈式中間件消息隊列

消息隊列設計精要

有贊延遲隊列設計

基於Dynomite的分佈式延遲隊列

http://blog.mikebabineau.com/2013/02/09/delay-queues-in-redis/

http://stackoverflow.com/questions/17014584/how-to-create-a-delayed-queue-in-rabbitmq

http://activemq.apache.org/delay-and-schedule-message-delivery.html

源碼分析] Dynomite 分佈式存儲引擎 之 DynoJedisClient(1)

源碼分析] Dynomite 分佈式存儲引擎 之 DynoJedisClient(2)

原創 Amazon Dynamo系統架構

Netlix Dynomite性能基準測試,基於AWS和Redis

爲何分佈式必定要有延時任務?

相關文章
相關標籤/搜索