繼以前用rabbitMQ實現延時隊列,Redis因爲其自身的Zset數據結構,也一樣能夠實現延時的操做java
Zset本質就是Set結構上加了個排序的功能,除了添加數據value以外,還提供另外一屬性score,這一屬性在添加修改元素時候能夠指定,每次指定後,Zset會自動從新按新的值調整順序。能夠理解爲有兩列字段的數據表,一列存value,一列存順序編號。操做中key理解爲zset的名字,那麼對延時隊列又有何用呢?試想若是score表明的是想要執行時間的時間戳,在某個時間將它插入Zset集合中,它變會按照時間戳大小進行排序,也就是對執行時間先後進行排序,這樣的話,起一個死循環線程不斷地進行取第一個key值,若是當前時間戳大於等於該key值的socre就將它取出來進行消費刪除,就能夠達到延時執行的目的, 注意不須要遍歷整個Zset集合,以避免形成性能浪費。redis
Zset的排列效果以下圖:apache
java代碼實現以下:緩存
package cn.chinotan.service.delayQueueRedis; import org.apache.commons.lang3.StringUtils; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.Tuple; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * @program: test * @description: redis實現延時隊列 * @author: xingcheng * @create: 2018-08-19 **/ public class AppTest { private static final String ADDR = "127.0.0.1"; private static final int PORT = 6379; private static JedisPool jedisPool = new JedisPool(ADDR, PORT); private static CountDownLatch cdl = new CountDownLatch(10); public static Jedis getJedis() { return jedisPool.getResource(); } /** * 生產者,生成5個訂單 */ public void productionDelayMessage() { for (int i = 0; i < 5; i++) { Calendar instance = Calendar.getInstance(); // 3秒後執行 instance.add(Calendar.SECOND, 3 + i); AppTest.getJedis().zadd("orderId", (instance.getTimeInMillis()) / 1000, StringUtils.join("000000000", i + 1)); System.out.println("生產訂單: " + StringUtils.join("000000000", i + 1) + " 當前時間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); System.out.println((3 + i) + "秒後執行"); } } //消費者,取訂單 public static void consumerDelayMessage() { Jedis jedis = AppTest.getJedis(); while (true) { Set<Tuple> order = jedis.zrangeWithScores("orderId", 0, 0); if (order == null || order.isEmpty()) { System.out.println("當前沒有等待的任務"); try { TimeUnit.MICROSECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } continue; } Tuple tuple = (Tuple) order.toArray()[0]; double score = tuple.getScore(); Calendar instance = Calendar.getInstance(); long nowTime = instance.getTimeInMillis() / 1000; if (nowTime >= score) { String element = tuple.getElement(); Long orderId = jedis.zrem("orderId", element); if (orderId > 0) { System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ":redis消費了一個任務:消費的訂單OrderId爲" + element); } } } } static class DelayMessage implements Runnable{ @Override public void run() { try { cdl.await(); consumerDelayMessage(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { AppTest appTest = new AppTest(); appTest.productionDelayMessage(); for (int i = 0; i < 10; i++) { new Thread(new DelayMessage()).start(); cdl.countDown(); } } }
實現效果以下:數據結構
生產環境使用注意:併發
因爲這種實現方式簡單,但在生產環境下大可能是多實例部署,因此存在併發問題,即緩存的查找和刪除不具備原子性(zrangeWithScores和zrem操做不是一個命令,不具備原子性),會致使消息的屢次發送問題,這個問題的避免方法以下:app
1.能夠採用單獨一個實例部署解決(不具有高可用特性,容易單機出現故障後消息不能及時發送)分佈式
2.採用redis的lua腳本進行原子操做,即原子操做查找和刪除(實現難度大)ide
所以,延時隊列的實現最好採用rabbitMQ來實現,rabbitMQ自然具有分佈式的特性,能夠很好的用在多服務,多實例環境下,具體的實現參考http://www.javashuo.com/article/p-xqhwqaxt-cr.html性能