原 薦 簡單說說Kafka中的時間輪算法

零、時間輪定義算法

簡單說說時間輪吧,它是一個高效的延時隊列,或者說定時器。實際上如今網上對於時間輪算法的解釋不少,定義也很全,這裏引用一下 朱小廝博客 裏出現的定義:sql

參考下圖,Kafka中的時間輪(TimingWheel)是一個存儲定時任務的環形隊列,底層採用數組實現,數組中的每一個元素能夠存放一個定時任務列表(TimerTaskList)。TimerTaskList是一個環形的雙向鏈表,鏈表中的每一項表示的都是定時任務項(TimerTaskEntry),其中封裝了真正的定時任務TimerTask。數組

 

 

 

 

 

若是你理解了上面的定義,那麼就沒必要往下看了。但若是你第一次看到和我同樣懵比,而且有很多疑問,那麼這篇博文將帶你進一步瞭解時間輪,甚至理解時間輪算法。數據結構

若是有興趣,能夠去看看其餘的定時器 你真的瞭解延時隊列嗎 。博主認爲,時間輪定時器最大的優勢:架構

  1. 是任務的添加與移除,都是O(1)級的複雜度;
  2. 不會佔用大量的資源;
  3. 只須要有一個線程去推動時間輪就能夠工做了。

咱們將對時間輪作層層推動的解析:併發

1、爲何使用環形隊列分佈式

假設咱們如今有一個很大的數組,專門用於存放延時任務。它的精度達到了毫秒級!那麼咱們的延遲任務實際上須要將定時的那個時間簡單轉換爲毫秒便可,而後將定時任務存入其中:高併發

好比說當前的時間是2018/10/24 19:43:45,那麼就將任務存入Task[1540381425000],value則是定時任務的內容。性能

private Task[很長] tasks;
public List<Task> getTaskList(long timestamp) {
	return task.get(timestamp)
}
// 僞裝這裏真的能一毫秒一個循環
public void run(){
	while (true){
		getTaskList(System.currentTimeMillis()).後臺執行()
		Thread.sleep(1);
	}
}

假如這個數組長度達到了億億級,咱們確實能夠這麼幹。 那若是將精度縮減到秒級呢?咱們也須要一個百億級長度的數組。學習

先不說內存夠不夠,顯然你的定時器要這麼大的內存顯然很浪費。

固然若是咱們本身寫一個map,並保證它不存在hash衝突問題,那也是徹底可行的。(我不肯定個人想法是否正確,若是錯誤,請指出)

/* 一個精度爲秒級的延時任務管理類 */
private Map<Long, Task> taskMap;
public List<Task> getTaskList(long timestamp) {
	return taskMap.get(timestamp - timestamp % 1000)
}
// 新增一個任務
public void addTask(long timestamp, Task task) {
	List<Task> taskList = getTaskList(timestamp - timestamp % 1000);
		if (taskList == null){
			taskList = new ArrayList();
		}
	taskList.add(task);
}
// 僞裝這裏真的能一秒一個循環
public void run(){
	while (true){
		getTaskList(System.currentTimeMillis()).後臺執行()
		Thread.sleep(1000);
	}
}

其實時間輪就是一個不存在hash衝突的數據結構

拋開其餘疑問,咱們看看手腕上的手錶(若是沒有去找個鐘錶,或者想象一個),是否是不管當前是什麼時間,總能用咱們的錶盤去表示它(忽略精度)

 

 

 

 

 

就拿秒錶來講,它老是落在 0 - 59 秒,每走一圈,又會從新開始。

用僞代碼模擬一下咱們這個秒錶:

private Bucket[60] buckets;// 表示60秒
public void addTask(long timestamp, Task task) {
	Bucket bucket = buckets[timestamp / 1000 % 60];
	bucket.add(task);
}
public Bucket getBucket(long timestamp) {
	return buckets[timestamp / 1000 % 60];
}
// 僞裝這裏真的能一秒一個循環
public void run(){
	while (true){
		getBucket(System.currentTimeMillis()).後臺執行()
		Thread.sleep(1000);
	}
}

這樣,咱們的時間總能落在0 - 59任意一個bucket上,就如同咱們的秒鐘老是落在0 - 59刻度上同樣,這即是 時間輪的環形隊列 。

2、表示的時間有限

可是細心的小夥伴也會發現這麼一個問題:若是隻能表示60秒內的定時任務應該怎麼存儲與取出,那是否是太有侷限性了? 若是想要加入一小時後的延遲任務,該怎麼辦?

其實仍是能夠看一看鐘表,對於只有三個指針的表(通常的表)來講,最大能表示12個小時,超過了12小時這個範圍,時間就會產生歧義。若是咱們加多幾個指針呢?好比說咱們有秒針,分針,時針,上下午針,天針,月針,年針...... 那不就能表示很長很長的一段時間了?並且,它並不須要佔用很大的內存。

好比說秒針咱們能夠用一個長度爲60的數組來表示,分針也一樣能夠用一個長度爲60的數組來表示,時針能夠用一個長度爲24的數組來表示。那麼表示一天內的全部時間,只須要三個數組便可。

動手來作吧,咱們將這個數據結構稱做時間輪,tickMs表示一個刻度,好比說上面說的一秒。wheelSize表示一圈有多少個刻度,即上面說的60。interval表示一圈能表示多少時間,即 tickMs * wheelSize = 60秒。

overflowWheel表示上一層的時間輪,好比說,對於秒鐘來講,overflowWheel就表示分鐘,以此類推。

public class TimeWheel {
    /** 一個時間槽的時間 */
    private long tickMs;
    /** 時間輪大小 */
    private int wheelSize;
    /** 時間跨度 */
    private long interval;
    /** 槽 */
    private Bucket[] buckets;
    /** 時間輪指針 */
    private long currentTimestamp;
    /** 上層時間輪 */
    private volatile TimeWheel overflowWheel;
    public TimeWheel(long tickMs, int wheelSize, long currentTimestamp) {
        this.currentTimestamp = currentTimestamp;
        this.tickMs = tickMs;
        this.wheelSize = wheelSize;
        this.interval = tickMs * wheelSize;
        this.buckets = new Bucket[wheelSize];
        this.currentTimestamp = currentTimestamp - (currentTimestamp % tickMs);
        for (int i = 0; i < wheelSize; i++) {
            buckets[i] = new Bucket();
        }
    }
}

將任務添加到時間輪中十分簡單,對於每一個時間輪來講,好比說秒級時間輪,和分級時間輪,都有它本身的過時槽。也就是delayMs < tickMs的時候。

添加延時任務的時候一共就這幾種狀況:

####1、時間到期

  • 1)好比說有一個任務要在 16:29:07 執行,從秒級時間輪中來看,當咱們的當前時間走到16:29:06的時候,則表示這個任務已通過期了。由於它的delayMs = 1000ms,小於了咱們的秒級時間輪的tickMs(1000ms)。
  1. 好比說有一個任務要在 16:41:25 執行,從分級時間輪中來看,當咱們的當前時間走到 16:41的時候( 分級時間輪沒有秒針!它的最小精度是分鐘(必定要理解這一點) ),則表示這個任務已經到期,由於它的delayMs = 25000ms,小於了咱們的分級時間輪的tickMs(60000ms)。

2、時間未到期,且delayMs小於interval。

對於秒級時間輪來講,就是延遲時間小於60s,那麼確定能找到一個秒鐘槽扔進去。

3、時間未到期,且delayMs大於interval。

對於妙級時間輪來講,就是延遲時間大於等於60s,這時候就須要藉助上層時間輪的力量了,很簡單的代碼實現,就是拿到上層時間輪,而後相似遞歸同樣,把它扔進去。

好比說一個有一個延時爲一年後的定時任務,就會在這個遞歸中不斷建立更上層的時間輪,直到找到知足delayMs小於interval的那個時間輪。

這裏爲了避免把代碼寫的那麼複雜,咱們每一層時間輪的刻度都同樣,也就是秒級時間輪表示60秒,上面則表示60分鐘,再上面則表示60小時,再上層則表示60個60小時,再上層則表示60個60個60小時 = 216000小時。

也就是若是將最底層時間輪的tickMs(精度)設置爲1000ms。wheelSize設置爲60。 那麼只須要5層時間輪,可表示的時間跨度已經長達24年(216000小時) 。

/**
     * 添加任務到某個時間輪
     */
    public boolean addTask(TimedTask timedTask) {
        long expireTimestamp = timedTask.getExpireTimestamp();
        long delayMs = expireTimestamp - currentTimestamp;
        if (delayMs < tickMs) {// 到期了
            return false;
        } else {
            // 扔進當前時間輪的某個槽中,只有時間【大於某個槽】,纔會放進去
            if (delayMs < interval) {
                int bucketIndex = (int) (((delayMs + currentTimestamp) / tickMs) % wheelSize);
                Bucket bucket = buckets[bucketIndex];
                bucket.addTask(timedTask);
            } else {
			// 當maybeInThisBucket大於等於wheelSize時,須要將它扔到上一層的時間輪
                TimeWheel timeWheel = getOverflowWheel();
                timeWheel.addTask(timedTask);
            }
        }
        return true;
    }
   /**
     * 獲取或建立一個上層時間輪
     */
	private TimeWheel getOverflowWheel() {
        if (overflowWheel == null) {
            synchronized (this) {
                if (overflowWheel == null) {
                    overflowWheel = new TimeWheel(interval, wheelSize, currentTimestamp, delayQueue);
                }
            }
        }
        return overflowWheel;
    }

固然咱們的時間輪還須要一個指針的推動機制,總不能讓時間永遠停留在當前吧?推動的時候,同時相似遞歸,去推動一下上一層的時間輪。

注意:要強調一點的是,咱們這個時間輪更像是電子錶,它不存在時間的中間狀態,也就是精度這個概念必定要理解好。好比說,對於秒級時間輪來講,它的精度只能保證到1秒,小於1秒的,都會當成是已到期

對於分級時間輪來講,它的精度只能保證到1分,小於1分的,都會當成是已到期

/**
     * 嘗試推動一下指針
     */
    public void advanceClock(long timestamp) {
        if (timestamp >= currentTimestamp + tickMs) {
            currentTimestamp = timestamp - (timestamp % tickMs);
            if (overflowWheel != null) {
                this.getOverflowWheel()
                    .advanceClock(timestamp);
            }
        }
    }

3、對於高層時間輪來講,精度愈來愈不許,會不會有影響?

上面說到,分級時間輪,精度只有分鐘級,總不能延遲1秒的定時任務和延遲59秒的定時任務同時執行吧?

有這個疑問的同窗很好!實際上很好解決,只需再入時間輪便可。好比說,對於分鐘級時間輪來講,delayMs爲1秒和delayMs爲59秒的都已通過期,咱們將其取出,再扔進底層的時間輪不就能夠了?

1秒的會被扔到秒級時間輪的下一個執行槽中,而59秒的會被扔到秒級時間輪的後59個時間槽中。

細心的同窗會發現,咱們的添加任務方法,返回的是一個bool

public boolean addTask(TimedTask timedTask)

再倒回去好好看看,添加到最底層時間輪失敗的(咱們只能直接操做最底層的時間輪,不能直接操做上層的時間輪),是否是會直接返回flase? 對於再入失敗的任務,咱們直接執行便可。

/**
     * 將任務添加到時間輪
     */
    public void addOrSubmitTask(TimedTask timedTask) {
        if (!timeWheel.addTask(timedTask)) {
            taskExecutor.submit(timedTask.getTask());
        }
    }

4、如何知道一個任務已通過期?

記得咱們將任務存儲在槽中嘛?好比說秒級時間輪中,有60個槽,那麼一共有60個槽。若是時間輪共有兩層,也僅僅只有120個槽。咱們只需將槽扔進一個delayedQueue之中便可。

咱們輪詢地從delayedQueue取出已通過期的槽便可。(前面的全部代碼,爲了簡單說明,並無引入這個DelayQueue的概念,因此不用去上面翻了,並無。博主以爲... 已經看到這裏了,應該很明白這個DelayQueue的意義了。 )

其實簡單來講,實際上定時任務單單使用DelayQueue來實現,也是能夠的,可是一旦任務的數量多了起來,達到了百萬級,千萬級,針對這個delayQueue的增刪,將很是的慢。

** 1、面向槽的delayQueue**

而對於時間輪來講,它只須要往delayQueue裏面扔各類槽便可,好比咱們的定時任務長短不一,最長的跨度到了24年,這個delayQueue也僅僅只有300個元素。

** 2、處理過時的槽**

而這個槽到期後,也就是被咱們從delayQueue中poll出來後,咱們只須要將槽中的全部任務循環一次,從新加到新的槽中(添加失敗則直接執行)便可。

/**
     * 推動一下時間輪的指針,而且將delayQueue中的任務取出來再從新扔進去
     */
    public void advanceClock(long timeout) {
        try {
            Bucket bucket = delayQueue.poll(timeout, TimeUnit.MILLISECONDS);
            if (bucket != null) {
                timeWheel.advanceClock(bucket.getExpire());
                bucket.flush(this::addTask);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

歡迎工做一到五年的Java工程師朋友們加入Java架構開發: 855835163 羣內提供免費的Java架構學習資料(裏面有高可用、高併發、高性能及分佈式、Jvm性能調優、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料)合理利用本身每一分每一秒的時間來學習提高本身,不要再用"沒有時間「來掩飾本身思想上的懶惰!趁年輕,使勁拼,給將來的本身一個交代!

相關文章
相關標籤/搜索