支持內部晉升的無鎖併發優先級線程池

支持內部晉升的無鎖併發優先級線程池

[TOC]java

引言

在技術羣討論到一個有意思的業務需求,能夠描述爲:git

有一個內部按照優先級進行任務排序的線程池。線程池會優先執行高優先級的任務。隨着時間的流逝,線程池內部低優先級的任務的優先級會逐漸晉升變爲高優先級,以免被不斷新增的高優先級任務阻塞致使餓死。算法

考慮到 JDK 已經爲開發者提供了自定義線程池ThreadPoolExecutor以及優先級隊列PriorityBlockingQueue,二者相結合而且按期調整隊列中低優先級任務的優先級再進行resort將低優先級的任務調整到隊列的前頭,也能夠必定程度上避免被餓死。數組

這種方案的問題在於resort的消耗比較高,而且還須要從新計算每個任務的優先級。爲此,引出咱們下面的設計,但願使用無鎖併發的數據結構存儲任務,而且任務支持自動的優先級晉升,保證低優先級的任務最終可以執行而不會被不斷增長的高優先級任務餓死。緩存

歡迎加入技術交流羣186233599討論交流,也歡迎關注筆者公衆號:風火說。markdown

推導過程

如何實現優先級晉升

聲明一個數組,按照循環隊列的方式使用。每個數組槽位上都掛載一個任務列表。有一個當前指針指向數組中的某一個槽位,該槽位即爲當前最高優先級任務插入的槽位。指針數字遞增方向優先級依次下降。指針以某種方式沿遞增方向移動,由於指針指向的槽位表明最高優先級,所以指針的移動實際上意味着全部槽位的優先級都晉升了。數據結構

那麼這裏的優先級只能是離散化的整型數字,而且優先級的範圍爲 0 到 數組長度減 1 。最高優先級爲0。多線程

用圖形化的方式表達就是以下的狀況併發

圖中優先級的範圍是[0,6],current指針指向的槽位即爲最高優先級,current左側槽位爲最低優先級,current右側槽位爲次高優先級。每個槽位上都掛載一個隊列,隊列中的任務的優先級都相同(後續算法中能夠看到會有不一樣的優先級混合)。測試

每次取任務時老是從current指針指向的槽位的隊列讀取任務。當必定時間流逝後,current指針沿着右側移動一位,此時意味着全部槽位的優先級都被晉升了,除了本來的current指向的槽位,它變爲了最低優先級槽位。

因爲current指針老是在移動,所以最終會移動到以前低優先級的槽位,此時該槽位下的任務就成了最高優先級任務,被讀取執行。這樣就避免了在運行過程不斷有高優先級任務被加入致使本來的低優先級餓死的狀況發生。

數據結構設計

根據上面的優先級晉升思路,顯然應該有一個數組,其不一樣的槽位表明着不一樣的優先級。每個槽位上掛載一個 MPMC 類型的隊列,用於該優先級下任務的添加和讀取。

使用一個當前指針,該指針指向的槽位爲最高優先級槽位。

一個指針產生的問題

若是隻有一個指針,意味着讀取任務時,從該指針指向的槽位讀取,所以此時指針指向的槽位是最高優先級。而插入任務的時候,須要根據當前指針進行計算。這種模式在優先級晉升時存在併發問題。

當指針從槽位1指向更新到槽位2。此時槽位1可能還存在部分剩餘的任務,這部分任務的實際優先級應該是高於槽位2當中的。而若是在這個時候插入最低優先級的任務,可能就會插入到槽位1中。那麼槽位1的任務隊列實際就混合了最高優先級和最低優先級的任務,沒法區分。

爲了解決不一樣優先級任務在同一個隊列中混合的問題,咱們能夠在指針移動時,將以前槽位的剩餘移動到當前槽位的隊列頭。這實際上就意味着要求隊列是出於雙端隊列模式。可是由於指針移動和任務移動沒法原子化進行,仍是會形成槽位1的隊列中最高優先級任務和最低優先級任務混在一塊兒的狀況。

從實現效果而言,咱們須要的是在指針移動的時候,保證槽位1中剩餘的本來高優先級的任務執行完畢後才能去執行槽位2這個「本來的次高優先級,如今的最高優先級「的任務。從效果來看,並不須要必定移動任務,能夠經過一種手段,保證槽位1中本來高優先級任務執行完畢後再去執行槽位2的任務便可。

基於這種考量,咱們將一個指針拆分爲兩個:任務插入指針和任務讀取指針。

任務插入指針和任務讀取指針

基於併發讀寫的考慮,兩個指針都是AtomicInteger類型。兩個指針的做用分別爲:

  • 任務插入指針:該指針指向的槽位爲當前最高優先級槽位(後續會引入輪次這個概念,所以這裏對當前加粗)。
  • 任務讀取指針:從結構體中獲取任務時使用該指針指向的槽位上獲取任務隊列進行任務讀取。

任務插入指針和任務讀取指針分離的好處在於,任務插入指針的移動意味着不一樣槽位優先級的實際晉升。而讀取能夠依照讀取指針指向的槽位上的隊列讀取任務,直到對應優先級的任務讀取完畢後再移動讀取指針到下一個槽位。這樣一來,保證了按照入列的順序被公平的處理,也保證了同一個時間單位高優先級的任務優於低優先級任務被處理,也避免了單一指針移動須要的任務拷貝帶來的不一樣優先級任務污染問題。

任務插入指針如何移動

插入指針能夠按照兩種策略移動:

  • 天然時間流逝移動,必定時間後移動。
  • 以讀取次數爲單位,必定次數後移動。

若是選擇策略一,須要後臺配置一個線程,按照固定時間移動插入指針;若是選擇策略二,須要一個全局的AtomicInteger對象,用於次數斷定。

若是選擇方案一,可能會存在一種場景,往線程池中投入了大量的同一個優先級的任務,使得某個槽位上的隊列長度很長。若是任務處理相對緩存,則任務插入指針可能會被移動屢次。這種移動會使得槽位上隊列有了不少不一樣優先級的任務。而讀取任務時按照優先級逐步去處理,這使得產生了這麼多不一樣的優先級實際上意義是不大的。

所以採用策略二會更加合適一些。

因爲讀取任務時是多線程的,所以策略二實現上須要注意的點包括:

  • AtomicInteger#incrementAndGet實現任務讀取次數累加。若是返回的數字是閾值的倍數,則意味着能夠移動任務插入指針。
  • 使用AtomicInteger#incrementAndGet來移動插入指針。

在這裏對插入指針移動的併發考量在於,因爲讀取線程對讀取計數使用AtomicInteger#incrementAndGet方式累加是必然成功,而返回數值是晉升閾值的倍數時必然須要實現插入指針的遞增。由於遞增的必然性,所以一樣使用AtomicInteger#incrementAndGet方式來實現。

任務插入指針移動到同一位置致使的優先級任務混合問題

假定系統初始狀態,插入和讀取指針都指向了槽位1,在槽位1上插入了大量的任務。隨着任務的讀取,插入指針移動到了槽位2,此時該槽位上插入了一些任務。隨着任務的讀取,插入指針繼續移動,移動過數組的長度後,再次指向了槽位2。假定此時讀取指針仍然在槽位1,而若是這個時候插入插入任務。那麼實際上槽位2隊列中任務應該分爲兩種:前半部分是上一個輪次插入的任務,後半部分是當前剛插入的任務

若是讀取指針移動到槽位2,應該將前半部分任務執行完畢後就去執行槽位3上的任務,而不是將全部的任務都執行完。所以槽位3上的任務實際優先級應該高於槽位2隊列中後半部分的任務。

基於上述狀況,問題能夠轉化爲依靠讀取指針在讀取任務時,如何識別當前隊列中不是本輪次要處理的任務進而移動讀取指針?

考慮到任務插入指針和任務讀取指針自己是有值的,這個值單調遞增,實際上能夠當作是一種「順序」概念的表達。所以任務的準備添加時,能夠將插入指針的值加上任務的優先級,聲明爲任務的插入優先級。讀取指針在讀取任務時,只有當前任務的插入優先級等於讀取指針的值,意味着該任務時本輪次讀取指針應該要處理的任務。若是讀取的任務的插入優先級與讀取指針不等時,意味着當前隊列不能再讀取任務,應該移動讀取指針。

經過任務自己的插入優先級避免了不一樣輪次的任務在一個隊列中被混合致使的優先級混亂。

任務讀取指針如何移動

上個章節提出任務的插入優先級,解決了不一樣輪次的任務在同一個隊列可能會混合的問題。這個問題的解決引出了讀取指針的移動策略:在讀取到的任務的插入優先級與讀取指針的值不等時意味着須要移動。

可是這裏又產生了新的問題:併發移動讀取指針的問題。在讀取併發的狀況下,會遇到一個問題:讀取出來的任務的優先級不符合指針,此時要從新放回隊列,可是從新放入,就可能和任務的插入混合,形成數據混亂。

有幾種可能的解決方式:

  • 任務的讀取採用Sync關鍵字修飾,若是讀取任務不符合,則放回,而且移動指針。因爲沒有讀取併發,但仍然可能由於讀取的放回和新任務的添加形成數據混亂。
  • 採用分段機制,每個分段是一個隊列,分段和分段構成一個隊列。一個分段內的優先級是固定的,所以當分段耗盡時,就是切換讀取指針的時候。

策略一併不能完全解決問題,在這裏咱們採用策略二的方案。

策略二的引入實際上改變了上面的一個數據結構,也便是數組存儲的元素再也不是一個任務隊列,而是一個分段隊列。而每個分段內部又存儲了任務隊列,而且分段的隊列的任務的插入優先級均是相同的。這意味着分段在建立的時候就具有了插入優先級這個值。分段和分段的插入優先級必然不一樣,這個結構就自然的支持了輪次的概念。

分段結構的引入致使了數據結構的變化,這實際上會改變任務插入和任務讀取的流程。下文會再來細說具體的實現。分析到這裏,讀取指針的移動時機就很明白了,在分段內數據耗盡,就意味着某個具體插入優先級的任務都被讀取完畢了。

固然,考慮到讀寫併發的緣由,讀取線程發現分段內數據耗盡並不意味着該插入優先級的任務全被讀取了,後文會針對併發場景在處理流程上解決。

插入和讀取併發

插入和讀取可能在同一個槽位同一個分段上併發。分段的隊列自己是支持MPMC的,這並無問題。

可能會出現一種併發異常就是插入線程讀取了插入指針的值,而且準備插入數據,可是由於線程調度的緣由,失去了CPU資源,還沒有完成數據插入。此時讀取線程將槽位內的任務讀取完畢後認爲沒有數據,則移動了讀取指針到下一個槽位。在讀取指針移動後,插入線程才完成數據的插入。這樣致使原本應該是高優先級的任務變成最低優先級槽位上的任務。而當下一輪次讀取指針再次指向該槽位時,讀取指針獲取的到任務的任務優先級又會和讀取指針自己的數值衝突。

針對併發的異常場景,有一種常見的解決思路就是二次檢查。也就是讀取線程在移動任務讀取指針後,再次檢查下當前分段內是否出現了新的任務,若是有,則協助遷移到下一個槽位上;寫入線程在放入任務後,檢查是否讀取指針移動過,若是有,則協助遷移到下一個槽位上。

然而,讀取線程檢查分段內的隊列是否剩餘,寫入線程檢查讀取指針是否移動,這些狀態都是在動態變化的,仍然會產生一些其餘問題。雙重檢查通常會引入一個終止狀態來來減小可能的變化場景。在這裏,咱們爲分段引入狀態:使用中和終止。一個分段初始化時是使用中狀態,當讀取線程認爲該分段內的任務都被消耗後,則應該更新爲終止狀態。一旦分段進入終止狀態,則被拋棄,不該該再有任務數據添加到該分段中。

經過分段狀態,咱們能夠將任務區分爲終止前添加到分段和終止後添加到分段兩類。前者須要被正常讀取,後者則須要遷移到其它合適的分段中再被處理。

到這裏爲止,咱們針對數據結構和其元素屬性的變化就完成了。

將數組經過循環隊列的方式來表達不一樣的優先級。經過任務寫指針的移動來實現內部任務優先級的晉升。經過讀指針來實現任務嚴格按照優先級順序被處理,且避免低優先級任務被高優先級任務餓死。數組的元素指向一個該槽位上插入優先級最低的分段。一同散列到同一個槽位上的分段按照插入優先級的順序造成隊列。

代碼實現

整個代碼當中,最爲複雜的就是任務的插入和讀取,下面分別來設計流程。

任務插入

上面推導過程分析了插入和讀取併發可能致使的衝突場景。這裏咱們細化其解決流程。對於插入線程而言,要處理的狀況包括有:

  • 元素對應槽位上沒有分段。
  • 元素對應槽位上的分段的插入優先級和插入指針的值不相等。
  • 元素對應槽位上分段列表中插入優先級與插入指針相符的分段處於終止狀態
  • 元素對應槽位上的分段插入優先級與插入指針相等,且處於使用狀態。

能夠看到,只有第四種狀況任務能夠在當前分段插入成功,且插入完畢後還須要再次檢查分段的狀態。基於這些考量,咱們將插入流程設計爲

能夠看到,這個流程中沒有處理槽位上沒有分段的狀況,這個在下一個章節咱們會分析。

任務的讀取

有了分段的存在,讀取指針的移動斷定更加複雜,讀取線程可能碰到的場景有:

  • 讀取指針散列的槽位上沒有分段。
  • 讀取指針散列的槽位上有分段且狀態爲使用,分段內沒有任務。
  • 讀取指針散列的槽位上有分段且狀態爲使用,分段內有任務。
  • 讀取指針散列的槽位上有分段且狀態爲關閉,分段內沒有任務。
  • 讀取指針散列的槽位上有分段且狀態爲關閉,分段內有任務。

只有第三種狀況能夠讀取任務而且進行處理。有了輪次這個概念,讀取指針永遠只會讀取槽位上的第一個分段。若是槽位上沒有分段,或者分段的插入優先級與讀取指針不一樣,或者分段內沒有任務,則能夠考慮移動讀取指針。注意,分段狀態爲關閉並非讀取指針移動的條件,緣由下面會分析。

可是移動讀取指針的時候首先須要考慮當前讀取指針是否已經處於(寫入指針的值+最低優先級數字),若是是的話,意味着已經處於邊界,不該該在移動。

分段狀態的更新只能由讀取線程來進行。當讀取線程發現該分段已經沒有任務了,首先應該經過CAS的方式更新分段狀態。CAS競爭成功的線程再次檢查分段內是否出現了新的任務,若是出現的話,則提取任務,完成任務讀取。爲什麼不將任務移動到下一個槽位。由於下一個槽位上可能尚未分段,此時讀取線程可能和寫入線程競爭槽位上的分段寫入。若是寫入線程競爭成功,讀取線程移動過去的任務數據的優先級就放到了錯誤的分段中;若是讀取線程競爭成功,則讀取線程建立的分段必須是第一個分段,不然任務仍是移動到錯誤的地方。

解決這個問題最好的辦法就是不解決。不移動任務,仍然在該分段上讀取任務直到任務耗盡。而後再嘗試移動讀取指針。而對於寫入線程而言,當其發現分段的狀態變爲終止後,是提取出任務從新執行完整的放入流程,不會有併發的問題。

再次梳理下沒有任務狀況下的流程,應該是經過CAS修改分段的狀態。不管成功或失敗,均可以繼續檢查隊列是否有任務,若是有的話,則返回讀取到的任務。若是沒有的話,則CAS將讀取指針+1。競爭成功的線程將當前分段的下一個分段設置給槽位,而且從新執行讀取流程。競爭失敗的線程則反覆檢查讀取指針的值,發現變化後,從新執行讀取流程。

這裏有一個併發衝突須要考慮,當讀取線程嘗試將當前分段的下一個分段設置爲槽位的值時,可能此時當前分段的下一個分段是null,而寫入線程正在嘗試爲當前分段設置下一個分段。這種狀況下可能致使下一個分段丟失。特別的,若是當前分段的下一個分段已經被設置,而且有任務被放入其中,丟失這個分段就意味着數據丟失。

爲了不這個狀況,在當前分段的下一個分段爲null時,就不能將下一個分段(屬性值)設置給槽位。這使得在讀取到分段時,須要首先檢查分段的優先級,確認是否本輪次。若是是的話,再執行後續的流程。不然要麼移動(該分段沒有下一個分段),要麼將該分段的下一個分段設置給槽位後,在移動。

從這個角度出發,咱們能夠在初始化的時候,將數組中的元素都填充一個分段。這樣寫入線程就不須要處理槽位上可能爲空的場景了。

基於此,咱們將讀取任務的變化爲:

  • 槽位上的分段優先級小於讀取指針,且分段狀態爲終止。
  • 槽位上的分段優先級等於讀取指針。
  • 槽位上的分段優先級大於讀取指針。

第一種狀況,若是該分段有下一個分段,CAS更新到槽位上;若是沒有,則CAS移動讀取指針。

第二種狀況,按照上面分析的流程進行處理便可。

第三種狀況,CAS移動讀取指針。

綜上,咱們能夠將讀取流程設計爲

包裝爲BlockQueue

在JDK提供的ThreadPoolExecutor類的構造方法中,須要傳入BlockingQueue做爲隊列的接口。顯然,上述的存儲結構並不能支持BlockQueue,須要考慮包裝。

顯然,上面的存儲結果在寫入的時候並不會阻塞,所以只須要考慮如何包裝讀取數據不存在時的阻塞等待便可。

簡單的方式就是在讀取失敗的獲取鎖,而且在隊列空的condition對象執行等待;插入任務的時候執行喚醒。

效果展示

測試代碼以下

首先添加必定量的高優先級任務,隨後添加5個低優先級,最後經過CountLatch模擬在運行過程當中添加高優先級任務。

若是單純按照優先級排序,則須要全部高優先級任務輸出完畢後纔會輸出低優先級任務,顯然這是錯誤的。正確的實現應該是先輸出第一批高優先級任務,再輸出低優先級任務,最後輸出第三批高優先級任務。運行代碼,看到結果以下

與咱們的預期相吻合。

代碼託管地址

Gitee:gitee.com/eric_ds/eri…

相關文章
相關標籤/搜索