本系列咱們會以設計分佈式延遲隊列時重點考慮的模塊爲主線,穿插灌輸一些消息隊列的特性實現方法,經過分析Dyno-queues 分佈式延遲隊列的源碼來具體看看設計實現一個分佈式延遲隊列的方方面面。html
前面兩篇文章介紹了設計思路,消息的產生和消費。本文介紹一些輔助功能,有了這些功能可讓系統更加完善。java
前面提到,從Redis角度來看,Dyno-queues 對於每一個隊列,維護三組Redis數據結構:linux
這裏的第三組數據結構,就是支持咱們的 Ack 機制。redis
前面提到,_pop
是消費消息,具體 _pop
的邏輯以下:apache
這就是涉及到 包含客戶端已經消費但還沒有確認的消息有序集合,Un-ack集合。json
代碼以下:服務器
private List<Message> _pop(String shard, int messageCount, ConcurrentLinkedQueue<String> prefetchedIdQueue) throws Exception { String queueShardName = getQueueShardKey(queueName, shard); String unackShardName = getUnackKey(queueName, shard); double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue(); // NX option indicates add only if it doesn't exist. // https://redis.io/commands/zadd#zadd-options-redis-302-or-greater ZAddParams zParams = ZAddParams.zAddParams().nx(); List<Message> popped = new LinkedList<>(); for (;popped.size() != messageCount;) { String msgId = prefetchedIdQueue.poll(); //將messageID添加到unack集合中 long added = quorumConn.zadd(unackShardName, unackScore, msgId, zParams); if(added == 0){ monitor.misses.increment(); continue; } long removed = quorumConn.zrem(queueShardName, msgId); if (removed == 0) { monitor.misses.increment(); continue; } String json = quorumConn.hget(messageStoreKey, msgId); if (json == null) { monitor.misses.increment(); continue; } Message msg = om.readValue(json, Message.class); popped.add(msg); if (popped.size() == messageCount) { return popped; } } return popped; }
此時邏輯以下:網絡
message list zset +----------+----------+----------+-----+----------+ _pop (msg id 9) | | | | | | | msg id 1 | msg id 2 | msg id 3 | ... | msg id 9 | +----+ | | | | | | | +---+------+----+-----+----+-----+-----+----+-----+ | | | | | | | | | | | v v v v | hash +---+---+ +---+---+ +--+----+ +--+--+ | | msg 1 | | msg 2 | | msg 3 | |msg 9| | +-------+ +-------+ +-------+ +-----+ | | | | | | | unack list | +------------+-------------+--------------+ | zset | | | | | | msg id 11 | msg id 12 | msg id 13 | <----------+ | | | | +------------+-------------+--------------+
用戶當獲得消息以後,須要Ack消息,好比:數據結構
List pushed_msgs = V1Queue.push(payloads); Message poppedWithPredicate = V1Queue.popMsgWithPredicate("searchable pay*", false); V1Queue.ack(poppedWithPredicate.getId());
Ack的邏輯是:架構
代碼以下:
@Override public boolean ack(String messageId) { try { return execute("ack", "(a shard in) " + queueName, () -> { for (String shard : allShards) { String unackShardKey = getUnackKey(queueName, shard); Long removed = quorumConn.zrem(unackShardKey, messageId); if (removed > 0) { quorumConn.hdel(messageStoreKey, messageId); return true; } } return false; }); } } private String getUnackKey(String queueName, String shard) { return redisKeyPrefix + ".UNACK." + queueName + "." + shard; }
具體以下:
message list zset +----------+----------+----------+------ | | | | | | msg id 1 | msg id 2 | msg id 3 | ... | | | | | | +---+------+----+-----+----+-----+-----+ | | | | | | v v v delete hash +---+---+ +---+---+ +--+----+ +-----+ | msg 1 | | msg 2 | | msg 3 | |msg 9| <----+ ACK(msg id 9) +-------+ +-------+ +-------+ +-----+ + | | | | | | unack list | +------------+-------------+--------------+-------------+ delete| zset | | | | | | | msg id 11 | msg id 12 | msg id 13 | msg id 9 | <-----+ | | | | | +------------+-------------+--------------+-------------+
後臺進程會定時作檢測,即 監視 UNACK 集合中的消息,這些消息在給定時間內未被客戶端確認(每一個隊列可配置)。這些消息將移回到隊列中。
定時任務是以下代碼來啓動:
schedulerForUnacksProcessing = Executors.newScheduledThreadPool(1); if (this.singleRingTopology) { schedulerForUnacksProcessing.scheduleAtFixedRate(() -> atomicProcessUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS); } else { schedulerForUnacksProcessing.scheduleAtFixedRate(() -> processUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS); }
以下代碼,就是把未確認消息退回到隊列中。
@Override public void processUnacks() { try { long queueDepth = size(); monitor.queueDepth.record(queueDepth); String keyName = getUnackKey(queueName, shardName); execute("processUnacks", keyName, () -> { int batchSize = 1_000; String unackShardName = getUnackKey(queueName, shardName); double now = Long.valueOf(clock.millis()).doubleValue(); int num_moved_back = 0; int num_stale = 0; Set<Tuple> unacks = nonQuorumConn.zrangeByScoreWithScores(unackShardName, 0, now, 0, batchSize); for (Tuple unack : unacks) { double score = unack.getScore(); String member = unack.getElement(); String payload = quorumConn.hget(messageStoreKey, member); if (payload == null) { quorumConn.zrem(unackShardName, member); ++num_stale; continue; } long added_back = quorumConn.zadd(localQueueShard, score, member); long removed_from_unack = quorumConn.zrem(unackShardName, member); if (added_back > 0 && removed_from_unack > 0) ++num_moved_back; } return null; }); } }
此時邏輯以下:
message list zset +----------+----------+----------+-----+ | | | | | +-------------> | msg id 1 | msg id 2 | msg id 3 | ... | | | | | | | | +---+------+----+-----+----+-----+-----+ | | | | | | | | | v v v | hash +---+---+ +---+---+ +--+----+ | | msg 1 | | msg 2 | | msg 3 | | +-------+ +-------+ +-------+ | | | | unack list | +------------+-------------+--------------+ | zset | | | | | | msg id 11 | msg id 12 | msg id 13 | +-------------+ | | | | msg id 11 +-------+----+-------------+--------------+ ^ | msg id 11 | +-------+---------+ | | | ScheduledThread | | | +-----------------+
對於防止重複消費,系統作了以下努力:
生產者將數據發送到 MQ 的時候,可能數據就在半路給搞丟了,由於網絡問題啥的,都有可能。
好比,以下就是簡單的插入,缺乏必要的保證。
List pushed_msgs = V1Queue.push(payloads);
這種狀況就是 MQ 本身弄丟了數據,這個你必須開啓MQ 的持久化,就是消息寫入以後會持久化到磁盤,哪怕是 MQ 本身掛了,恢復以後會自動讀取以前存儲的數據,通常數據不會丟。
Dyno-queues 使用ensure來確認消息徹底寫入到全部分區。
簡單來講,就是:
Enqueues 'message' if it doesn't exist in any of the shards or unack sets.
@Override public boolean ensure(Message message) { return execute("ensure", "(a shard in) " + queueName, () -> { String messageId = message.getId(); for (String shard : allShards) { String queueShard = getQueueShardKey(queueName, shard); Double score = quorumConn.zscore(queueShard, messageId); if (score != null) { return false; } String unackShardKey = getUnackKey(queueName, shard); score = quorumConn.zscore(unackShardKey, messageId); if (score != null) { return false; } } push(Collections.singletonList(message)); return true; }); }
針對過時消息,Dyno-queues 的處理方式是一次性找出過時消息給用戶處理,其中過時時間由用戶在參數中設定。
因此 findStaleMessages 就是利用 lua 腳本找出過時消息。
@Override public List<Message> findStaleMessages() { return execute("findStaleMessages", localQueueShard, () -> { List<Message> stale_msgs = new ArrayList<>(); int batchSize = 10; double now = Long.valueOf(clock.millis()).doubleValue(); long num_stale = 0; for (String shard : allShards) { String queueShardName = getQueueShardKey(queueName, shard); Set<String> elems = nonQuorumConn.zrangeByScore(queueShardName, 0, now, 0, batchSize); if (elems.size() == 0) { continue; } String findStaleMsgsScript = "local hkey=KEYS[1]\n" + "local queue_shard=ARGV[1]\n" + "local unack_shard=ARGV[2]\n" + "local num_msgs=ARGV[3]\n" + "\n" + "local stale_msgs={}\n" + "local num_stale_idx = 1\n" + "for i=0,num_msgs-1 do\n" + " local msg_id=ARGV[4+i]\n" + "\n" + " local exists_hash = redis.call('hget', hkey, msg_id)\n" + " local exists_queue = redis.call('zscore', queue_shard, msg_id)\n" + " local exists_unack = redis.call('zscore', unack_shard, msg_id)\n" + "\n" + " if (exists_hash and exists_queue) then\n" + " elseif (not (exists_unack)) then\n" + " stale_msgs[num_stale_idx] = msg_id\n" + " num_stale_idx = num_stale_idx + 1\n" + " end\n" + "end\n" + "\n" + "return stale_msgs\n"; String unackKey = getUnackKey(queueName, shard); ImmutableList.Builder builder = ImmutableList.builder(); builder.add(queueShardName); builder.add(unackKey); builder.add(Integer.toString(elems.size())); for (String msg : elems) { builder.add(msg); } ArrayList<String> stale_msg_ids = (ArrayList) ((DynoJedisClient)quorumConn).eval(findStaleMsgsScript, Collections.singletonList(messageStoreKey), builder.build()); num_stale = stale_msg_ids.size(); for (String m : stale_msg_ids) { Message msg = new Message(); msg.setId(m); stale_msgs.add(msg); } } return stale_msgs; }); }
Dyno-queues 支持消息刪除:業務使用方能夠隨時刪除指定消息。
具體刪除是 從 unack隊列 和 正常隊列中刪除。
@Override public boolean remove(String messageId) { return execute("remove", "(a shard in) " + queueName, () -> { for (String shard : allShards) { String unackShardKey = getUnackKey(queueName, shard); quorumConn.zrem(unackShardKey, messageId); String queueShardKey = getQueueShardKey(queueName, shard); Long removed = quorumConn.zrem(queueShardKey, messageId); if (removed > 0) { // Ignoring return value since we just want to get rid of it. Long msgRemoved = quorumConn.hdel(messageStoreKey, messageId); return true; } } return false; }); }
Dyno-queues 利用lua腳原本進行批量處理,這樣能夠增長吞吐。
Redis中爲何引入Lua腳本?
Redis提供了很是豐富的指令集,官網上提供了200多個命令。可是某些特定領域,須要擴充若干指令原子性執行時,僅使用原生命令便沒法完成。
Redis 爲這樣的用戶場景提供了 lua 腳本支持,用戶能夠向服務器發送 lua 腳原本執行自定義動做,獲取腳本的響應數據。Redis 服務器會單線程原子性執行 lua 腳本,保證 lua 腳本在處理的過程當中不會被任意其它請求打斷。
使用腳本的好處以下:
具體代碼以下,能夠看到就是採用了lua腳本一次性寫入:
// TODO: Do code cleanup/consolidation private List<Message> atomicBulkPopHelper(int messageCount, ConcurrentLinkedQueue<String> prefetchedIdQueue, boolean localShardOnly) throws IOException { double now = Long.valueOf(clock.millis() + 1).doubleValue(); double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue(); // The script requires the scores as whole numbers NumberFormat fmt = NumberFormat.getIntegerInstance(); fmt.setGroupingUsed(false); String nowScoreString = fmt.format(now); String unackScoreString = fmt.format(unackScore); List<String> messageIds = new ArrayList<>(); for (int i = 0; i < messageCount; ++i) { messageIds.add(prefetchedIdQueue.poll()); } String atomicBulkPopScriptLocalOnly="local hkey=KEYS[1]\n" + "local num_msgs=ARGV[1]\n" + "local peek_until=ARGV[2]\n" + "local unack_score=ARGV[3]\n" + "local queue_shard_name=ARGV[4]\n" + "local unack_shard_name=ARGV[5]\n" + "local msg_start_idx = 6\n" + "local idx = 1\n" + "local return_vals={}\n" + "for i=0,num_msgs-1 do\n" + " local message_id=ARGV[msg_start_idx + i]\n" + " local exists = redis.call('zscore', queue_shard_name, message_id)\n" + " if (exists) then\n" + " if (exists <=peek_until) then\n" + " local value = redis.call('hget', hkey, message_id)\n" + " if (value) then\n" + " local zadd_ret = redis.call('zadd', unack_shard_name, 'NX', unack_score, message_id)\n" + " if (zadd_ret) then\n" + " redis.call('zrem', queue_shard_name, message_id)\n" + " return_vals[idx]=value\n" + " idx=idx+1\n" + " end\n" + " end\n" + " end\n" + " else\n" + " return {}\n" + " end\n" + "end\n" + "return return_vals"; String atomicBulkPopScript="local hkey=KEYS[1]\n" + "local num_msgs=ARGV[1]\n" + "local num_shards=ARGV[2]\n" + "local peek_until=ARGV[3]\n" + "local unack_score=ARGV[4]\n" + "local shard_start_idx = 5\n" + "local msg_start_idx = 5 + (num_shards * 2)\n" + "local out_idx = 1\n" + "local return_vals={}\n" + "for i=0,num_msgs-1 do\n" + " local found_msg=false\n" + " local message_id=ARGV[msg_start_idx + i]\n" + " for j=0,num_shards-1 do\n" + " local queue_shard_name=ARGV[shard_start_idx + (j*2)]\n" + " local unack_shard_name=ARGV[shard_start_idx + (j*2) + 1]\n" + " local exists = redis.call('zscore', queue_shard_name, message_id)\n" + " if (exists) then\n" + " found_msg=true\n" + " if (exists <=peek_until) then\n" + " local value = redis.call('hget', hkey, message_id)\n" + " if (value) then\n" + " local zadd_ret = redis.call('zadd', unack_shard_name, 'NX', unack_score, message_id)\n" + " if (zadd_ret) then\n" + " redis.call('zrem', queue_shard_name, message_id)\n" + " return_vals[out_idx]=value\n" + " out_idx=out_idx+1\n" + " break\n" + " end\n" + " end\n" + " end\n" + " end\n" + " end\n" + " if (found_msg == false) then\n" + " return {}\n" + " end\n" + "end\n" + "return return_vals"; List<Message> payloads = new ArrayList<>(); if (localShardOnly) { String unackShardName = getUnackKey(queueName, shardName); ImmutableList.Builder builder = ImmutableList.builder(); builder.add(Integer.toString(messageCount)); builder.add(nowScoreString); builder.add(unackScoreString); builder.add(localQueueShard); builder.add(unackShardName); for (int i = 0; i < messageCount; ++i) { builder.add(messageIds.get(i)); } List<String> jsonPayloads; // Cast from 'JedisCommands' to 'DynoJedisClient' here since the former does not expose 'eval()'. jsonPayloads = (List) ((DynoJedisClient) quorumConn).eval(atomicBulkPopScriptLocalOnly, Collections.singletonList(messageStoreKey), builder.build()); for (String p : jsonPayloads) { Message msg = om.readValue(p, Message.class); payloads.add(msg); } } else { ImmutableList.Builder builder = ImmutableList.builder(); builder.add(Integer.toString(messageCount)); builder.add(Integer.toString(allShards.size())); builder.add(nowScoreString); builder.add(unackScoreString); for (String shard : allShards) { String queueShard = getQueueShardKey(queueName, shard); String unackShardName = getUnackKey(queueName, shard); builder.add(queueShard); builder.add(unackShardName); } for (int i = 0; i < messageCount; ++i) { builder.add(messageIds.get(i)); } List<String> jsonPayloads; // Cast from 'JedisCommands' to 'DynoJedisClient' here since the former does not expose 'eval()'. jsonPayloads = (List) ((DynoJedisClient) quorumConn).eval(atomicBulkPopScript, Collections.singletonList(messageStoreKey), builder.build()); for (String p : jsonPayloads) { Message msg = om.readValue(p, Message.class); payloads.add(msg); } } return payloads; }
最新版本是 V2,有三個類,咱們看看具體是什麼做用。
QueueBuilder
MultiRedisQueue
RedisPipelineQueue
就是封裝,對外統一提供API。
public class QueueBuilder { private Clock clock; private String queueName; private String redisKeyPrefix; private int unackTime; private String currentShard; private ShardSupplier shardSupplier; private HostSupplier hs; private EurekaClient eurekaClient; private String applicationName; private Collection<Host> hosts; private JedisPoolConfig redisPoolConfig; private DynoJedisClient dynoQuorumClient; private DynoJedisClient dynoNonQuorumClient; }
該類也是爲了提升速度,其內部包括多個RedisPipelineQueue,每一個queue表明一個分區,利用 round robin 方式寫入。
/** * MultiRedisQueue exposes a single queue using multiple redis queues. Each RedisQueue is a shard. * When pushing elements to the queue, does a round robin to push the message to one of the shards. * When polling, the message is polled from the current shard (shardName) the instance is associated with. */ public class MultiRedisQueue implements DynoQueue { private List<String> shards; private String name; private Map<String, RedisPipelineQueue> queues = new HashMap<>(); private RedisPipelineQueue me; }
這個類就是使用pipeline來提高吞吐。
Queue implementation that uses Redis pipelines that improves the throughput under heavy load.。
public class RedisPipelineQueue implements DynoQueue { private final Logger logger = LoggerFactory.getLogger(RedisPipelineQueue.class); private final Clock clock; private final String queueName; private final String shardName; private final String messageStoreKeyPrefix; private final String myQueueShard; private final String unackShardKeyPrefix; private final int unackTime; private final QueueMonitor monitor; private final ObjectMapper om; private final RedisConnection connPool; private volatile RedisConnection nonQuorumPool; private final ScheduledExecutorService schedulerForUnacksProcessing; private final HashPartitioner partitioner = new Murmur3HashPartitioner(); private final int maxHashBuckets = 32; private final int longPollWaitIntervalInMillis = 10; }
消息隊列的理解,幾種常見消息隊列對比,新手也能看得懂!----分佈式中間件消息隊列
http://blog.mikebabineau.com/2013/02/09/delay-queues-in-redis/
http://stackoverflow.com/questions/17014584/how-to-create-a-delayed-queue-in-rabbitmq
http://activemq.apache.org/delay-and-schedule-message-delivery.html
源碼分析] Dynomite 分佈式存儲引擎 之 DynoJedisClient(1)
源碼分析] Dynomite 分佈式存儲引擎 之 DynoJedisClient(2)