分佈式 延時任務解決方案

在開發中,每每會遇到一些關於延時任務的需求。例如java


  • 生成訂單30分鐘未支付,則自動取消redis

  • 生成訂單60秒後,給用戶發短信緩存


對上述的任務,咱們給一個專業的名字來形容,那就是延時任務。那麼這裏就會產生一個問題,這個延時任務和定時任務的區別究竟在哪裏呢?一共有以下幾點區別併發


  1. 定時任務有明確的觸發時間,延時任務沒有app

  2. 定時任務有執行週期,而延時任務在某事件觸發後一段時間內執行,沒有執行週期運維

  3. 定時任務通常執行的是批處理操做是多個任務,而延時任務通常是單個任務分佈式


下面,咱們以判斷訂單是否超時爲例,進行方案分析ide

 

redis緩存高併發


- 思路一性能


利用redis的zset,zset是一個有序集合,每個元素(member)都關聯了一個score,經過score排序來取集合中的值


添加元素:ZADD key score member [[score member] [score member] …]

按順序查詢元素:ZRANGE key start stop [WITHSCORES]

查詢元素score:ZSCORE key member

移除元素:ZREM key member [member …]


測試以下


# 添加單個元素

 

redisZADD page_rank 10 google.com

(integer) 1

 

 

# 添加多個元素

 

redisZADD page_rank 9 baidu.com 8 bing.com

(integer) 2

 

redisZRANGE page_rank 0 -1 WITHSCORES

1) "bing.com"

2) "8"

3) "baidu.com"

4) "9"

5) "google.com"

6) "10"

 

# 查詢元素的score值

redisZSCORE page_rank bing.com

"8"

 

# 移除單個元素

 

redisZREM page_rank google.com

(integer) 1

 

redisZRANGE page_rank 0 -1 WITHSCORES

1) "bing.com"

2) "8"

3) "baidu.com"

4) "9"


那麼如何實現呢?咱們將訂單超時時間戳與訂單號分別設置爲score和member,系統掃描第一個元素判斷是否超時,具體以下圖所示



實現一


package com.rjzheng.delay4;

 

import java.util.Calendar;

import java.util.Set;

 

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.Tuple;

 

public class AppTest {

    private static final String ADDR = "127.0.0.1";

    private static final int PORT = 6379;

    private static JedisPool jedisPool = new JedisPool(ADDR, PORT);

    

    public static Jedis getJedis() {

       return jedisPool.getResource();

    }

    

    //生產者,生成5個訂單放進去

    public void productionDelayMessage(){

        for(int i=0;i<5;i++){

            //延遲3秒

            Calendar cal1 = Calendar.getInstance();

            cal1.add(Calendar.SECOND, 3);

            int second3later = (int) (cal1.getTimeInMillis() / 1000);

            AppTest.getJedis().zadd("OrderId", second3later,"OID0000001"+i);

            System.out.println(System.currentTimeMillis()+"ms:redis生成了一個訂單任務:訂單ID爲"+"OID0000001"+i);

        }

    }

    

    //消費者,取訂單

    public void consumerDelayMessage(){

        Jedis jedis = AppTest.getJedis();

        while(true){

            Set<Tupleitems = jedis.zrangeWithScores("OrderId", 0, 1);

            if(items == null || items.isEmpty()){

                System.out.println("當前沒有等待的任務");

                try {

                    Thread.sleep(500);

                } catch (InterruptedException e) {

                    // TODO Auto-generated catch block

                    e.printStackTrace();

                }

                continue;

            }

            int  score = (int) ((Tuple)items.toArray()[0]).getScore();

            Calendar cal = Calendar.getInstance();

            int nowSecond = (int) (cal.getTimeInMillis() / 1000);

            if(nowSecond >= score){

                String orderId = ((Tuple)items.toArray()[0]).getElement();

                jedis.zrem("OrderId", orderId);

                System.out.println(System.currentTimeMillis() +"ms:redis消費了一個任務:消費的訂單OrderId爲"+orderId);

            }

        }

    }

    

    public static void main(String[] args) {

        AppTest appTest =new AppTest();

        appTest.productionDelayMessage();

        appTest.consumerDelayMessage();

    }

    

}


此時對應輸出以下



能夠看到,幾乎都是3秒以後,消費訂單。


然而,這一版存在一個致命的硬傷,在高併發條件下,多消費者會取到同一個訂單號,咱們上測試代碼ThreadTest


package com.rjzheng.delay4;

 

import java.util.concurrent.CountDownLatch;

 

public class ThreadTest {

    private static final int threadNum = 10;

    private static CountDownLatch cdl = new CountDownLatch(threadNum);

    static class DelayMessage implements Runnable{

        public void run() {

            try {

                cdl.await();

            } catch (InterruptedException e) {

                // TODO Auto-generated catch block

                e.printStackTrace();

            }

            AppTest appTest =new AppTest();

            appTest.consumerDelayMessage();

        }

    }

    public static void main(String[] args) {

        AppTest appTest =new AppTest();

        appTest.productionDelayMessage();

        for(int i=0;i<threadNum;i++){

            new Thread(new DelayMessage()).start();

            cdl.countDown();

        }

    }

}


輸出以下所示



顯然,出現了多個線程消費同一個資源的狀況。


解決方案


(1)用分佈式鎖,可是用分佈式鎖,性能降低了,該方案不細說。


(2)對ZREM的返回值進行判斷,只有大於0的時候,才消費數據,因而將consumerDelayMessage()方法裏的


if(nowSecond >= score){

    String orderId = ((Tuple)items.toArray()[0]).getElement();

    jedis.zrem("OrderId", orderId);

    System.out.println(System.currentTimeMillis()+"ms:redis消費了一個任務:消費的訂單OrderId爲"+orderId);

}


修改成


if(nowSecond >= score){

    String orderId = ((Tuple)items.toArray()[0]).getElement();

    Long num = jedis.zrem("OrderId", orderId);

    if( num != null && num>0){

        System.out.println(System.currentTimeMillis()+"ms:redis消費了一個任務:消費的訂單OrderId爲"+orderId);

    }

}


在這種修改後,從新運行ThreadTest類,發現輸出正常了


- 思路二


該方案使用redis的Keyspace Notifications,中文翻譯就是鍵空間機制,就是利用該機制能夠在key失效以後,提供一個回調,其實是redis會給客戶端發送一個消息。是須要redis版本2.8以上。


實現二


在redis.conf中,加入一條配置


notify-keyspace-events Ex


運行代碼以下


package com.rjzheng.delay5;

 

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.JedisPubSub;

 

public class RedisTest {

    private static final String ADDR = "127.0.0.1";

    private static final int PORT = 6379;

    private static JedisPool jedis = new JedisPool(ADDR, PORT);

    private static RedisSub sub = new RedisSub();

 

    public static void init() {

        new Thread(new Runnable() {

            public void run() {

                jedis.getResource().subscribe(sub, "__keyevent@0__:expired");

            }

        }).start();

    }

 

    public static void main(String[] args) throws InterruptedException {

        init();

        for(int i =0;i<10;i++){

            String orderId = "OID000000"+i;

            jedis.getResource().setex(orderId, 3, orderId);

            System.out.println(System.currentTimeMillis()+"ms:"+orderId+"訂單生成");

        }

    }

    

    static class RedisSub extends JedisPubSub {

        <a href='http://www.jobbole.com/members/wx610506454'>@Override</a>

        public void onMessage(String channel, String message) {

            System.out.println(System.currentTimeMillis()+"ms:"+message+"訂單取消");

        }

    }

}


輸出以下



能夠明顯看到3秒事後,訂單取消了


ps:redis的pub/sub機制存在一個硬傷,官網內容以下


原:Because Redis Pub/Sub is fire and forget currently there is no way to use this feature if your application demands reliable notification of events, that is, if your Pub/Sub client disconnects, and reconnects later, all the events delivered during the time the client was disconnected are lost.


翻: Redis的發佈/訂閱目前是即發即棄(fire and forget)模式的,所以沒法實現事件的可靠通知。也就是說,若是發佈/訂閱的客戶端斷鏈以後又重連,則在客戶端斷鏈期間的全部事件都丟失了。
所以,方案二不是太推薦。固然,若是你對可靠性要求不高,可使用。


優缺點


優勢:(1)因爲使用Redis做爲消息通道,消息都存儲在Redis中。若是發送程序或者任務處理程序掛了,重啓以後,還有從新處理數據的可能性。
(2)作集羣擴展至關方便
(3)時間準確度高

缺點:(1)須要額外進行redis維護


(5)使用消息隊列


咱們能夠採用rabbitMQ的延時隊列。RabbitMQ具備如下兩個特性,能夠實現延遲隊列


  • RabbitMQ能夠針對Queue和Message設置 x-message-tt,來控制消息的生存時間,若是超時,則消息變爲dead letter

  • lRabbitMQ的Queue能夠配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個參數,用來控制隊列內出現了deadletter,則按照這兩個參數從新路由。
    結合以上兩個特性,就能夠模擬出延遲消息的功能,具體的,我改天再寫一篇文章,這裏再講下去,篇幅太長。


優缺點


優勢: 高效,能夠利用rabbitmq的分佈式特性輕易的進行橫向擴展,消息支持持久化增長了可靠性。


缺點:自己的易用度要依賴於rabbitMq的運維.由於要引用rabbitMq,因此複雜度和成本變高

 

 

原文連接: https://mp.weixin.qq.com/s/4RMT427vnsRezfV_s7RVGA 

相關文章
相關標籤/搜索