【面試必備】聊聊高性能延時隊列應用

延時隊列的應用場景:java

下單後,30分鐘內未付款就自動取消訂單等;
支付後,24小時未評論自動好評;
在咱們實際開發過程當中,應用場景不少...redis

基於Redis Zset 實現

實現原理

Redis因爲其自身的Zset數據結構,也一樣能夠實現延時的操做。
Zset本質就是Set結構上加了個排序的功能,除了添加數據value以外,還提供另外一屬性score,這一屬性在添加元素時候能夠指定,每次指定score後,Zset會自動從新按新的值調整順序算法

  1. 若是score表明的是想要執行時間的時間戳,在某個時間將它插入Zset集合中,它會按照時間戳大小進行排序,也就是對執行時間先後進行排序。
> ZADD delay_queue 1581309229  taskId_1
(integer) 1
> ZADD delay_queue 1581309129  taskId_2
(integer) 1
> ZADD delay_queue 1581309329  taskId_3
(integer) 1
複製代碼
  1. 不斷地進行取第一個key值,若是當前時間戳大於等於該key值的socre就將它取出來進行消費刪除,就能夠達到延時執行的目的。 注意不須要遍歷整個Zset集合,以避免形成性能浪費。
> ZRANGE delay_queue 0 -1 withscores
1) "taskId_2"
2) "1581309129"
3) "taskId_1"
4) "1581309229"
5) "taskId_3"
6) "1581309329"
複製代碼

使用注意

  • 遍歷邏輯,刪除邏輯,注意使用 Redis Lua 封裝,確保原子性操做。更要注意 Redis Lua 在 Redis Cluster 的僞集羣問題。
  • 如果JAVA 語言能夠直接使用 redisson,封裝了 DelayedQueue 的實現。

源碼邏輯 org/redisson/RedissonDelayedQueue.javaspring

Beanstalkd 消息隊列

Beanstalkd,一個高性能、輕量級的分佈式內存隊列系統。支持過有9.5 million用戶的Facebook Causes應用。後來開源,如今有PostRank大規模部署和使用,天天處理百萬級任務。docker

部署使用

  • Linux 安裝 || docker 部署
yum install beanstalkd

||

docker run -d -p 11300:11300 pig4cloud/beanstalkd
複製代碼
  • 客戶端使用,pom 依賴
<!--封裝了 官方的 java sdk,只支持 springboot 2.X-->
<dependency>
    <groupId>com.pig4cloud.beanstalk</groupId>
    <artifactId>beanstalkd-client-spring-boot-starter</artifactId>
    <version>0.0.1</version>
</dependency>
複製代碼
  • 默認配置

  • 代碼使用
@Autowired
private JobProducer producer;

/**
 * @param delay    是一個整形數,表示將job放入ready隊列須要等待的秒數
 * @param ttr      time to run—是一個整形數,表示容許一個worker執行該job的秒數。這個時間將從一個worker 獲取一個job開始計算。
 *                 若是該worker沒能在<ttr> 秒內刪除、釋放或休眠該job,這個job就會超時,服務端會主動釋放該job。
 *                 最小ttr爲1。若是客戶端設置了0,服務端會默認將其增長到1。
 * @param priority 優先級 0~2**32的整數,最高優先級是0
 */
@Test
public void testSend() {
	String taskId = "1";// 業務對象信息
    producer.putJob(0, 10, 10, taskId.getBytes());
}
複製代碼
@Component
public class DemoJobConsumer extends AbstractTubeConsumerListener {

    @Override
    public void work(JobConsumer consumer) {
        // 阻塞多少秒獲取一次 Job
        Job job = consumer.reserveJob(1000L);

        // 消費此Job
        consumer.deleteJob(job.getId());

        // 執行延時的業務邏輯
        String biz = new String(job.getData());
    }
}
複製代碼

擴展

  • 數據庫,利用定時任務輪詢實現,業務量大會性能瓶頸。
  • 延時隊列的其餘實現,好比 rabbitmq 利用ttl特性能夠實現。沒法取消已放入隊列裏面的數據,使用時特別注意死信隊列的配置等。
  • 還能夠本身根據 時間輪片的算法 自行實現 。
  • 總之一切,都要有補償的邏輯,不管是業務人員手動觸發仍是自動補償。

相關文章
相關標籤/搜索