延時隊列的應用場景:java
下單後,30分鐘內未付款就自動取消訂單等;
支付後,24小時未評論自動好評;
在咱們實際開發過程當中,應用場景不少...redis
Redis因爲其自身的Zset數據結構,也一樣能夠實現延時的操做。
Zset本質就是Set結構上加了個排序的功能,除了添加數據value以外,還提供另外一屬性score,這一屬性在添加元素時候能夠指定,每次指定score後,Zset會自動從新按新的值調整順序。算法
> ZADD delay_queue 1581309229 taskId_1
(integer) 1
> ZADD delay_queue 1581309129 taskId_2
(integer) 1
> ZADD delay_queue 1581309329 taskId_3
(integer) 1
複製代碼
時間戳
大於等於該key值的socre就將它取出來進行消費刪除,就能夠達到延時執行的目的。 注意不須要遍歷整個Zset集合,以避免形成性能浪費。> ZRANGE delay_queue 0 -1 withscores
1) "taskId_2"
2) "1581309129"
3) "taskId_1"
4) "1581309229"
5) "taskId_3"
6) "1581309329"
複製代碼
Redis Lua
封裝,確保原子性操做。更要注意 Redis Lua
在 Redis Cluster 的僞集羣問題。redisson
,封裝了 DelayedQueue
的實現。源碼邏輯 org/redisson/RedissonDelayedQueue.java
spring
Beanstalkd,一個高性能、輕量級的分佈式內存隊列系統。支持過有9.5 million用戶的Facebook Causes應用。後來開源,如今有PostRank大規模部署和使用,天天處理百萬級任務。docker
yum install beanstalkd
||
docker run -d -p 11300:11300 pig4cloud/beanstalkd
複製代碼
<!--封裝了 官方的 java sdk,只支持 springboot 2.X-->
<dependency>
<groupId>com.pig4cloud.beanstalk</groupId>
<artifactId>beanstalkd-client-spring-boot-starter</artifactId>
<version>0.0.1</version>
</dependency>
複製代碼
@Autowired
private JobProducer producer;
/**
* @param delay 是一個整形數,表示將job放入ready隊列須要等待的秒數
* @param ttr time to run—是一個整形數,表示容許一個worker執行該job的秒數。這個時間將從一個worker 獲取一個job開始計算。
* 若是該worker沒能在<ttr> 秒內刪除、釋放或休眠該job,這個job就會超時,服務端會主動釋放該job。
* 最小ttr爲1。若是客戶端設置了0,服務端會默認將其增長到1。
* @param priority 優先級 0~2**32的整數,最高優先級是0
*/
@Test
public void testSend() {
String taskId = "1";// 業務對象信息
producer.putJob(0, 10, 10, taskId.getBytes());
}
複製代碼
@Component
public class DemoJobConsumer extends AbstractTubeConsumerListener {
@Override
public void work(JobConsumer consumer) {
// 阻塞多少秒獲取一次 Job
Job job = consumer.reserveJob(1000L);
// 消費此Job
consumer.deleteJob(job.getId());
// 執行延時的業務邏輯
String biz = new String(job.getData());
}
}
複製代碼