sidekiq是 Ruby 中一個很是優秀並且可靠的後臺任務處理軟件,其依賴 Redis 實現隊列任務的增長、重試以及調度等。而 sidekiq 從啓動到開始不斷處理任務、定時任務以及失敗任務的重試,都是如何調度的呢?遇到問題的時候,又該如何調優呢?git
今天的分析所參考的 sidekiq 的源碼對應版本是 4.2.3;github
今天所討論的內容,將主要圍繞任務調度過程進行分析,無關細節將不贅述,若有須要,請自行翻閱 sidekiq 源碼;redis
文章內容真的很長,請作好心理準備。數據庫
sidekiq 的任務調度機制:定時任務、重試任務的檢查機制,隊列任務的排隊以及隊列權重對處理優先級的影響;json
sidekiq 的中間件機制以及在此基礎上實現的任務重試機制。數組
對於複雜的調用關係,我習慣用時序圖幫助我理解其中各部分代碼之間相互協做的關係(注意:爲了不太多細節形成閱讀負擔,我將參數傳遞以及返回值等冗雜過程去除了,只保留與任務調度相關的關鍵調用):
ruby
Sidekiq 整個任務調度過程當中依賴幾個不一樣角色的代碼共同協做,其分工以下:數據結構
角色 | 對應類型 | 職責 |
定時任務拉取器 | Sidekiq::Scheduled::Poller | 負責在必定時間範圍內不定時檢查定時任務(scheduled)以及重試任務(retry),將計劃時間已經超過當前時間的任務追加到各自對應任務隊列中 |
worker 管理器 | Sidekiq::Manager | 負責按照配置的 concurrency 參數建立匹配數量的worker,以及worker的管理(中止等) |
worker | Sidekiq::Processor | 負責執行指定的任務 |
當咱們在執行 sidekiq
時,源碼中的 bin/sidekiq.rb
文件即是第一個開始執行的文件,讓咱們看看裏邊的主要代碼:架構
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/bin/sidekiq#L9-L12 begin cli = Sidekiq::CLI.instance cli.parse cli.run # <===== 這邊走 # ...
緊靠 begin
後邊的兩行代碼首先建立 Sidekiq::CLI
類的一個實例,接着調用實例方法 #parse
解析 sidekiq 的配置參數,其中包括隊列的配置、worker 數量的配置等,在此不展開了。接着實例方法 #run
將帶着咱們繼續往下走,讓咱們繼續看 lib/sidekiq/cli.rb
裏邊的代碼:app
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/cli.rb#L46-L106 def run # 這裏打印控制檯歡迎信息、打印日誌以及運行環境(不一樣 Rails 版本)加載等 require 'sidekiq/launcher' @launcher = Sidekiq::Launcher.new(options) begin launcher.run # <===== 這邊走 # 進程接收到的信號處理以及退出處理 end
上面的代碼主要是實例化了一個 Sidekiq::Launcher
的對象,緊隨其後又調用了實例方法 #run
,因此讓咱們繼續順藤摸瓜,看看 Sidekiq::Launcher#run
方法到底作了哪些事情?
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/launcher.rb#L24-L28 def run @thread = safe_thread("heartbeat", &method(:start_heartbeat)) @poller.start @manager.start end
#run
方法首先經過 safe_thread
建立了一個新的線程,線程主要負責執行 start_heartbeat
方法的代碼,從方法名稱上,咱們猜想其主要是心跳代碼,負責定時檢查 sidekiq 健康狀態,跟以前同樣,這裏不往下挖,咱們繼續看後邊的兩行代碼:
@poller.start @manager.start
這裏的 @poller
跟 @manager
都是什麼呢?讓咱們回頭看一下,前面講到 lib/cli.rb
的 #run
方法會負責建立 Sidekiq::Launcher
的實例,那讓咱們看下後者的 initialize
方法定義:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/launcher.rb#L17-L22 def initialize(options) @manager = Sidekiq::Manager.new(options) @poller = Sidekiq::Scheduled::Poller.new @done = false @options = options end
能夠看到,實際上,@manager
是在建立 Sidekiq::Launcher
實例的過程當中同步建立的 Sidekiq::Manager
的實例,同理,@poller
是同步建立的 Sidekiq::Scheduled::Poller
的實例。那咱們按照代碼執行順序,先看下 @poller.start
也就是 Sidekiq::Scheduled::Poller#start
方法的定義:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L63-L73 def start @thread ||= safe_thread("scheduler") do initial_wait while !@done enqueue wait end Sidekiq.logger.info("Scheduler exiting...") end end
這裏看到,#start
方法也建立了一個線程,在線程裏執行了兩個部分代碼:1. 初始化等待;2. 循環裏的 enqueue
與 wait
。這都是什麼呢?
注意: #start
方法在線程建立完成後就馬上返回了,至於 #start
方法裏的邏輯,請移步後面章節「繼續深挖 Sidekiq::Scheduled::Poller#start」做更深一步分析。這裏,咱們先繼續接着看看 #start
方法返回後接下來執行的 @manager.start
方法又作了什麼:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L45-L49 def start @workers.each do |x| x.start end end
這裏的 @workers
又是什麼?一個數組?怎樣的數組?咱們回顧下,前面說在建立 Sidekiq::Launcher
實例的過程當中同步建立了 Sidekiq::Manager
的實例,讓咱們就看看 Sidekiq::Manager
的 #initialize
方法:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L31-L43 def initialize(options={}) logger.debug { options.inspect } @options = options @count = options[:concurrency] || 25 raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1 @done = false @workers = Set.new @count.times do @workers << Processor.new(self) end @plock = Mutex.new end
能夠看到,在建立了 Sidekiq::Manager
的實例以後,又同步建立了多個 Sidekiq::Processor
的實例,實例的個數取決於 options[:concurrency] || 25
,也就是配置的 :concurrency
的值,缺省值爲 25
。至此,咱們知道,sidekiq 中的 worker 的數量就是在此其做用的,Sidekiq::Manager
按照配置的數量建立指定數量的 worker。
往回看剛纔的 #start
方法中:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L46-L48 @workers.each do |x| x.start end
簡言之,就是 Sidekiq::Manager
在 start
的時候只作一件事:分別調用其管理的全部 worker 的 #start
方法,也就是 Sidekiq::Processor#start
。繼續往下走:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L60-L62 def start @thread ||= safe_thread("processor", &method(:run)) end
又是咱們熟悉的 safe_thread
方法,一樣是建立了一個新的線程,意味着每個 worker 都是基於本身的一個新線程的,而這個線程裏執行的代碼是私有方法 #run
裏的代碼:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L66-L77 def run begin while !@done process_one end @mgr.processor_stopped(self) rescue Sidekiq::Shutdown @mgr.processor_stopped(self) rescue Exception => ex @mgr.processor_died(self, ex) end end
能夠發現,又是一個 while 循環!而這個循環體裏只調用了一個 #process_one
實例方法,顧名思義,這裏是說每一個 worker 在沒被結束以前,都重複每次處理一個新的任務,那這個 #process_one
裏又作了什麼呢?怎麼決定該先作哪一個任務呢?別急,請看後面章節「繼續深挖 Sidekiq::Processor#process_one」。
sidekiq 在啓動後(此處可借文章開頭的時序圖輔助理解):
首先建立了 Sidekiq::CLI
的實例,並調用其 run
方法;
Sidekiq::CLI
的實例在 #run
的過程當中,建立了 Sidekiq::Launcher
的實例,並調用其 run
方法;
Sidekiq::Launcher
的實例在建立後,同步建立了一個 Sidekiq::Scheduled::Poller
的實例以及 Sidekiq::Manager
的實例,而在其執行 #run
的過程當中,則分別調用了這兩個實例的 start
方法;
Sidekiq::Scheduled::Poller
的實例在執行 start
過程當中,建立了一個內部循環執行的線程,周而復始地執行 enqueue
-> wait
;
Sidekiq::Manager
的實例在建立後,同步建立若干個指定的 worker,也就是 Sidekiq::Processor
的實例,並在執行 start
方法的過程當中對每個 worker 發起 start
調用;
Sidekiq::Processor
實例在執行 start
方法的過程當中建立了一個新的線程,新的線程裏一樣有一個 while
循環,反覆執行 process_one
。
以上就是 Sidekiq 的主要啓動過程,如下分別針對 Sidekiq::Scheduled::Poller
以及 Sidekiq::Manager
展開源碼分析。
通過前面較表層的代碼分析,咱們接下來繼續展開 Sidekiq::Scheduled::Poller#start
方法的探索之旅,首先重溫下其代碼定義:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L63-L73 def start @thread ||= safe_thread("scheduler") do initial_wait while !@done enqueue wait end Sidekiq.logger.info("Scheduler exiting...") end end
能夠看到,#start
方法的核心就是中間的 while
循環,在循環前面,執行了 #initial_wait
方法,讓咱們先看看這個方法究竟是幹些什麼的:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L133-L143 def initial_wait # Have all processes sleep between 5-15 seconds. 10 seconds # to give time for the heartbeat to register (if the poll interval is going to be calculated by the number # of workers), and 5 random seconds to ensure they don't all hit Redis at the same time. total = 0 total += INITIAL_WAIT unless Sidekiq.options[:poll_interval_average] total += (5 * rand) @sleeper.pop(total) rescue Timeout::Error end
結合註釋理解,原來私有方法 #initial_wait
只是爲了不全部進程在後續邏輯中同時觸發 Redis IO 而作的設計,若是對大型系統有過架構經驗的童鞋就會明白,這裏其實就是爲了防止相似雪崩之類的系統故障出現。讓當前進程隨機等待必定範圍的時間,從而就能夠跟其餘進程錯開了。
在理解完 initial_wait
以後,咱們接着看到循環體裏的代碼:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L68-L69 enqueue wait
enqueue
?幹嗎呢?爲何是入隊列呢?帶着疑問往下看:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L75-L86 def enqueue begin @enq.enqueue_jobs rescue => ex # ... end end
這裏看到 #enqueue
代碼很是簡單,只是調用了實例變量 @enq
的 #enqueue_jobs
方法而已,那麼,@enq
是什麼類型的實例呢?它的 #enqueue_jobs
方法又作了什麼呢?讓咱們回過頭來看一遍 Sidekiq::Scheduled::Poller
的 #initialize
方法:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L45-L50 def initialize @enq = (Sidekiq.options[:scheduled_enq] || Sidekiq::Scheduled::Enq).new @sleeper = ConnectionPool::TimedStack.new # ... end
原來缺省狀況下,@enq
就是 Sidekiq::Scheduled::Enq
的實例。而代碼上看的話,sidekiq 支持用戶經過 :scheduled_enq
配置項自定義 @enq
的類型,可是官方文檔未對此參數說起以及說明,這裏實際上是一種策略模式的實現,用戶自定義的類型必須實現 enqueue_jobs
方法。我估計,是 sidekiq pro 裏邊纔會用到的配置項吧。
知道了 @enq
的類型後,讓咱們繼續看下 Sidekiq::Scheduled::Enq#enqueue_jobs
方法的定義:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L11-L33 def enqueue_jobs(now=Time.now.to_f.to_s, sorted_sets=SETS) # A job's "score" in Redis is the time at which it should be processed. # Just check Redis for the set of jobs with a timestamp before now. Sidekiq.redis do |conn| sorted_sets.each do |sorted_set| # Get the next item in the queue if it's score (time to execute) is <= now. # We need to go through the list one at a time to reduce the risk of something # going wrong between the time jobs are popped from the scheduled queue and when # they are pushed onto a work queue and losing the jobs. while job = conn.zrangebyscore(sorted_set, '-inf'.freeze, now, :limit => [0, 1]).first do # Pop item off the queue and add it to the work queue. If the job can't be popped from # the queue, it's because another process already popped it so we can move on to the # next one. if conn.zrem(sorted_set, job) Sidekiq::Client.push(Sidekiq.load_json(job)) Sidekiq::Logging.logger.debug { "enqueued #{sorted_set}: #{job}" } end end end end
其實這裏這個方法的寓意,經過代碼裏的註釋都已經很明晰了,不過我以爲仍是有幾個點須要強調下。
首先,在無參數調用 #enqueue_jobs
方法時,定義中的參數 now
缺省爲當前時間,而 sorted_sets
缺省爲 Sidekiq::Scheduled::SETS
的值,其值定義爲:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L8 SETS = %w(retry schedule)
也就是數組 ["retry", "schedule"]
,而這兩個隊列名稱所對應的隊列就是 sidekiq 的重試以及定時任務隊列,在 sidekiq 裏邊,重試任務以及定時任務本質上都是 scheduled jobs,這兩個隊列使用了特殊的 Redis 的數據結構,進入隊列的任務以其執行時間做爲數據的 score,寫入 Redis 以後按照 score 排序,也就是按任務的計劃時間排序。
接着往下看:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L14-L30 Sidekiq.redis do |conn| sorted_sets.each do |sorted_set| while job = conn.zrangebyscore(sorted_set, '-inf'.freeze, now, :limit => [0, 1]).first do if conn.zrem(sorted_set, job) Sidekiq::Client.push(Sidekiq.load_json(job)) Sidekiq::Logging.logger.debug { "enqueued #{sorted_set}: #{job}" } end end end end
能夠看到,sidekiq 分別針對 "retry"
和 "schedule"
隊列作了一個循環,循環體裏每次經過 Redis 的 ZRANGEBYSCORE
命令取出一個計劃時間小於等於當前時間的任務,而且調用 Sidekiq::Client
的 .push
方法將此任務加到指定隊列中(job 中包含隊列名稱等信息,在此不展開,有興趣的同窗請自行閱讀 Sidekiq::Client
的代碼)。
至此,能夠明白,enqueue_jobs
就是分別從 "retry"
和 "schedule"
隊列中取出已經到達計劃時間的任務,將其一一加入原來隊列。注意,定時任務以及重試任務的計劃時間只是計劃加進執行中隊列的時間,並不是執行時間,執行的時間就只能取決於隊列的長度以及隊列執行速度了。
接着往回點,繼續看 enqueue_jobs
以後的 wait
方法:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L90-L100 def wait @sleeper.pop(random_poll_interval) rescue Timeout::Error # expected rescue => ex #... end
這裏的 wait
方法只是作一個休眠,休眠的實現依賴於 @sleeper
的 #pop
方法調用,回顧 Sidekiq::Scheduled::Poller
的 #initialize
方法的實現能夠確認 @sleeper
是 ConnectionPool::TimedStack
的實例,然後者是 Ruby gem connection_pool 裏的實現,其 pop
方法會阻塞當前代碼的執行,直到有值返回或者到達指定的超時時間,這裏 sidekiq 利用了其阻塞的特性,做爲 wait
方法休眠器的實現。
而代碼裏的休眠時間則不是固定的,依賴 #random_poll_interval
方法的實現:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L103-L105 # Calculates a random interval that is ±50% the desired average. def random_poll_interval poll_interval_average * rand + poll_interval_average.to_f / 2 end
其實現依賴一個 #poll_interval_average
方法的返回值,顧名思義,這個方法將決定定時任務按期檢查的平均時間週期。讓咱們繼續深挖下去:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L107-L122 # We do our best to tune the poll interval to the size of the active Sidekiq # cluster. If you have 30 processes and poll every 15 seconds, that means one # Sidekiq is checking Redis every 0.5 seconds - way too often for most people # and really bad if the retry or scheduled sets are large. # # Instead try to avoid polling more than once every 15 seconds. If you have # 30 Sidekiq processes, we'll poll every 30 * 15 or 450 seconds. # To keep things statistically random, we'll sleep a random amount between # 225 and 675 seconds for each poll or 450 seconds on average. Otherwise restarting # all your Sidekiq processes at the same time will lead to them all polling at # the same time: the thundering herd problem. # # We only do this if poll_interval_average is unset (the default). def poll_interval_average Sidekiq.options[:poll_interval_average] ||= scaled_poll_interval end
這個方法的重要性經過其幾倍於代碼的註釋就能夠看出來,大概意思是,sidekiq 爲了不在進程重啓後,有大量的進程同時密集地訪問 redis,因此設計了這個機制,就是每一個進程對定時任務的檢查都是按照一個公式來計算的,保證每一個進程兩次檢查之間的平均休眠時間可以在一個範圍內動態變化,從而將全部進程的 Redis IO 均勻錯開。
從代碼上看,sidekiq 的這個平均拉取時間支持配置項配置,可是目前也並無在 wiki 上有所說起。而缺省狀況下,其值由方法 #scaled_poll_interval
決定:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L124-L131 def scaled_poll_interval pcount = Sidekiq::ProcessSet.new.size pcount = 1 if pcount == 0 pcount * Sidekiq.options[:average_scheduled_poll_interval] end
正如前面一段代碼的註釋所說,缺省狀況下,sidekiq 認爲定時任務拉取器的平均休眠時間正是:
sidekiq 進程數量 x 平均拉取時間 average_scheduled_poll_interval
而 :average_scheduled_poll_interval
的缺省配置是 15 秒:
# https://github.com/mperham/sidekiq/blob/master/lib/sidekiq.rb#L25 DEFAULTS = { # ... average_scheduled_poll_interval: 15, # ...
因此回過頭來,在沒有相關自定義配置的狀況下,假設你只開啓了一個 sidekiq 進程,那麼 sidekiq 的定時任務拉取器的拉取時間平均間隔爲 1 x 15 = 15 秒,那按照上面的 #random_poll_interval
方法的定義,則實際每次拉取的時間間隔則是在 7.5 秒到 22.5 秒之間!
從這個章節的分析,咱們能夠明白 Sidekiq 對定時任務和重試任務是一視同仁的,其處理流程都是:
全部定時任務(包括重試任務,本質上重試任務也是定時的,後邊會單獨講解)以其計劃時間爲 score,加入特殊的 "retry"
或 "schedule"
有序隊列中;
sidekiq 的定時任務拉取器從 "retry"
和 "schedule"
隊列中一一取出已到達計劃時間的任務,將其加入該任務計劃的隊列中,後續的執行則跟其餘普通隊列中的任務一致;
拉取器休眠必定時間(random_poll_interval
)後,從步驟 2 從新開始,周而復始。
因此,定時任務的計劃時間不是確切的任務時間!只是容許加回隊列的時間,具體執行時間還得另外看隊列長度以及隊列處理速度!
前面咱們分析過 sidekiq 的 worker 的核心代碼就是在線程裏循環執行 #process_one
方法,那麼這個方法到底作了些什麼啊?別急,如今就來一探究竟:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L79-L83 def process_one @job = fetch process(@job) if @job @job = nil end
代碼中,#process_one
先經過 #fetch
方法獲取一個任務,當任務獲取成功後,就將其做爲參數調用 #process
方法,完成對任務的處理;若是沒有獲取到任務,則直接從新嘗試獲取新的任務。
首先讓咱們看看 #fetch
方法的實現:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L96-L104 def fetch j = get_one # 吐槽一下這個 `j` 變量,命名真的不敢恭維,這個庫就這裏寫得不雅 if j && @done j.requeue nil else j end end
#fetch
方法經過 #get_one
方法從隊列中獲取任務,當獲取到任務後,判斷當前 worker 是否已經中止(@done
爲 true
),是則將任務從新壓回隊列。
讓咱們接着看 #get_one
方法的實現:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L85-L94 def get_one begin work = @strategy.retrieve_work (logger.info { "Redis is online, #{Time.now - @down} sec downtime" }; @down = nil) if @down work rescue Sidekiq::Shutdown rescue => ex handle_fetch_exception(ex) end end
核心代碼則是 work = @strategy.retrieve_work
,爲了瞭解 @strategy
,咱們仍舊往回看#initialize
方法的定義:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L32-L40 def initialize(mgr) # ... @strategy = (mgr.options[:fetch] || Sidekiq::BasicFetch).new(mgr.options) # ... end
又是一個策略模式,缺省下,使用了 Sidekiq::BasicFetch
生成實例,而且經過實例變量 @strategy
引用。
回到前面的 @strategy.retrieve_work
,讓咱們繼續看看 Sidekiq::BasicFetch#retrieve_work
的實現:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/fetch.rb#L35-L38 def retrieve_work work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) } UnitOfWork.new(*work) if work end
經過上面的代碼,能夠知道 Sidekiq::BasicFetch
的取任務邏輯比較直接,是經過 Redis 的 BRPOP
命令從「全部隊列」中阻塞地取出第一個任務:
BRPOP is a blocking list pop primitive. It is the blocking version of RPOP because it blocks the connection when there are no elements to pop from any of the given lists. An element is popped from the tail of the first list that is non-empty, with the given keys being checked in the order that they are given.
因此,理解了 BRPOP
命令的工做細節以後,咱們把注意力縮放到 #queues_cmd
方法上:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/fetch.rb#L40-L53 def queues_cmd if @strictly_ordered_queues @queues else queues = @queues.shuffle.uniq queues << TIMEOUT queues end end
首先,代碼中檢查了 @strictly_ordered_queues
這個實例變量的值,讓咱們回頭看下這個變量的值的來源,也就是 #initialize
方法的定義:
# https://github.com/mperham/sidekiq/blob/d8f11c26518dbe967880f76fd23bb99e9d2411d5/lib/sidekiq/fetch.rb#L26-L33 def initialize(options) @strictly_ordered_queues = !!options[:strict] @queues = options[:queues].map { |q| "queue:#{q}" } if @strictly_ordered_queues @queues = @queues.uniq @queues << TIMEOUT end end end
缺省狀況下,此值爲 false
。因此讓咱們看 #queues_cmd
方法的 else
分支裏的代碼:
queues = @queues.shuffle.uniq
而這裏的 @queues
就是來自 options[:queues]
中的配置: options[:queues].map { |q| "queue:#{q}" }
。那麼,這個 options[:queues]
的值又是什麼呢?
讓咱們一步一步沿着調用鏈上參數往回走:
Sidekiq::BasicFetch.new
的參數 options
來自 worker 在 Sidekiq::Processor#initialize
方法中的參數 mgr
的 options
屬性;
worker 的 mgr 參數正是 Sidekiq::Manager
的實例,其 options
屬性則是 Sidekiq::Launcher
建立 Sidekiq::Manager
實例時傳入的 options
變量;
而 Sidekiq::Launcher#initialize
接收到的 options
變量則是更外層的 Sidekiq::CLI
的實例方法 options
的值;
而 Sidekiq::CLI
的實例的 options
則是在其接收到 #parse
調用時設置的。
爲了節省篇幅,省略這裏其中的太多調用棧,咱們直接看最根源代碼:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/cli.rb#L389-L399 def parse_queues(opts, queues_and_weights) queues_and_weights.each { |queue_and_weight| parse_queue(opts, *queue_and_weight) } end def parse_queue(opts, q, weight=nil) [weight.to_i, 1].max.times do (opts[:queues] ||= []) << q end opts[:strict] = false if weight.to_i > 0 end
能夠看到,sidekiq 在解析 :queues
的相關配置時,按照每一個隊列以及其權重,生成了一個重複次數等於隊列權重的隊列的新數組,假設用戶提供以下配置:
:queues: - default - [myqueue, 2]
則此處生成的 options[:queues]
則爲 ["default", "myqueue", "myqueue"]
。因此,這裏權重主要用於後邊肯定各個不一樣隊列被處理到的優先權的比重。
瞭解了 @queues
的來源以後,咱們回到最開始討論的地方:
queues = @queues.shuffle.uniq
也就是說,每次 worker 在請求新的任務時,sidekiq 都按照原來的 @queues
執行 shuffle
方法,而 shuffle
則表示將數組元素從新隨機排序,亦即「洗牌」。結合前面的權重,那麼每一個隊列洗牌後排在第一位的機率與其權重掛鉤。最後的 #uniq
方法確保隊列名稱沒有重複,避免 Redis 在執行 BRPOP
命令時重複檢查同一隊列。這裏使用 BRPOP
還有個好處就是,加入當前面優先的隊列裏邊沒有任務時,能夠依次將機會讓給後面的隊列。
然後邊的:
queues << TIMEOUT
則是在命令末尾追加超時設定,即 Redis 的 BRPOP
命令最多阻塞 2 秒,超時則直接放棄。
瞭解了任務的獲取以後,咱們接着看 sidekiq 如何處理獲取到的任務,回到 retrieve_work
的代碼:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/fetch.rb#L36-L37 work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) } UnitOfWork.new(*work) if work
看到在獲取到任務以後,任務經過 Sidekiq::BasicFetch::UnitOfWork
結構體實例化後返回給調用方。
直接回到 Sidekiq::Processor#process_one
:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L79-L83 def process_one @job = fetch process(@job) if @job @job = nil end
能夠明白,@job 就是返回的 UnitOfWork
實例,那麼 process(@job)
會作些什麼呢?
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L118-L152 def process(work) jobstr = work.job queue = work.queue_name @reloader.call do ack = false begin job = Sidekiq.load_json(jobstr) klass = job['class'.freeze].constantize worker = klass.new worker.jid = job['jid'.freeze] stats(worker, job, queue) do Sidekiq.server_middleware.invoke(worker, job, queue) do # Only ack if we either attempted to start this job or # successfully completed it. This prevents us from # losing jobs if a middleware raises an exception before yielding ack = true execute_job(worker, cloned(job['args'.freeze])) end end # ...
上面代碼中,sidekiq 從 work
中獲取任務的相關信息,包括隊列名稱,任務對應的類型(job['class'.freeze]
)、任務調用所需的參數等,根據這些信息從新實例化任務對象,而且將實例化的任務對象 worker
以及任務參數都傳遞給對 execute_job
的調用。讓咱們看看 #execute_job
的實現:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L154-L156 def execute_job(worker, cloned_args) worker.perform(*cloned_args) end
看到了吧?咱們最熟悉的 #perform
方法!這下知道咱們爲何須要在每一個 sidekiq Worker 或者 ActiveJob 的 Job 類中定義這個方法了吧?由於這個方法就是最終任務執行時所需調用的方法,這就是約定!
至此,任務的調度過程就到此爲止了,剩下的就是周而復始的重複了。
通過上面的分析,咱們能夠明白 sidekiq 中 worker 的工做流程:
按照全部隊列以及其權重,每次從新排列待處理隊列順序,高權重的隊列有更高的優先級;
將從新排好的隊列順序傳遞給 Redis 的 BRPOP 命令,同時設置 2 秒超時;
sidekiq 將從隊列中獲取到的任務實例化,而且根據攜帶的參數調用了任務的 #perform
方法。
等等,上面都只是正常流程,那若是任務執行過程當中出錯了怎麼辦???重試的機制是如何運轉的呢?
注意:閱讀本章節前,建議先閱讀官方 Wiki 的 Error Handling。
細心的童鞋確定發現了上面的 Sidekiq::Processor#process
方法中有個關鍵的代碼:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L131-L137 Sidekiq.server_middleware.invoke(worker, job, queue) do # ... execute_job(worker, cloned(job['args'.freeze])) end
這個 server_middleware
是什麼呢?讓咱們來簡單過一下吧:
全局搜索了代碼,發現 Sidekiq.server_middleware
的來源是:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq.rb#L140-L144 def self.server_middleware @server_chain ||= default_server_middleware yield @server_chain if block_given? @server_chain end
缺省狀況下,.server_middleware
依賴 .default_server_middleware
的實現:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq.rb#L146-L154 def self.default_server_middleware #... Middleware::Chain.new do |m| m.add Middleware::Server::Logging m.add Middleware::Server::RetryJobs end end
能夠明白 Sidekiq.default_server_middleware
返回一個 Middleware::Chain
實例,而且調用了其 #add
方法將 Middleware::Server::Logging
以及 Middleware::Server::RetryJobs
兩個中間件加到中間件的 Chain 上。此中間件的實現以及實現相似 rackup,有興趣的童鞋自行閱讀源碼,在此不展開,讓咱們直接跳到 Middleware::Server::RetryJobs
的 call
方法中:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/middleware/server/retry_jobs.rb#L73-L84 def call(worker, msg, queue) yield rescue Sidekiq::Shutdown # ignore, will be pushed back onto queue during hard_shutdown raise rescue Exception => e # ignore, will be pushed back onto queue during hard_shutdown raise Sidekiq::Shutdown if exception_caused_by_shutdown?(e) raise e unless msg['retry'] attempt_retry(worker, msg, queue, e) end
讓咱們聚焦方法的最後一行代碼 attempt_retry(worker, msg, queue, e)
,此處表示當執行中的任務出現異常時,除去停機的因素以及禁用了重試機制後,嘗試進行下次重試運行:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/middleware/server/retry_jobs.rb#L88-L137 def attempt_retry(worker, msg, queue, exception) max_retry_attempts = retry_attempts_from(msg['retry'], @max_retries) # ... count = if msg['retry_count'] msg['retried_at'] = Time.now.to_f msg['retry_count'] += 1 else msg['failed_at'] = Time.now.to_f msg['retry_count'] = 0 end # ... if count < max_retry_attempts delay = delay_for(worker, count, exception) logger.debug { "Failure! Retry #{count} in #{delay} seconds" } retry_at = Time.now.to_f + delay payload = Sidekiq.dump_json(msg) Sidekiq.redis do |conn| conn.zadd('retry', retry_at.to_s, payload) end else # Goodbye dear message, you (re)tried your best I'm sure. retries_exhausted(worker, msg, exception) end raise exception end
從上面的代碼中看出,sidekiq 在捕捉到異常後,首先檢查此任務此前是否已經重試過,是的話,則在重試累計次數上加 1,更新最後重試時間;不然初始化重試累計次數爲 0,設定初次失敗時間。接着,sidekiq 檢查重試累計次數是否超過限定最大重試次數,是的話則放棄重試,任務今後再也不重試,進入 Dead 狀態,sidekiq 拋出異常;不然計算任務下次重試時間,將任務按照計劃的下次重試時間加到 retry
有序隊列中,最後拋出異常。關於重試任務的檢查跟執行,請閱讀前面的相關章節,接下來咱們主要分析 sidekiq 如何計算任務的下次重試時間 delay
。
讓咱們展開對 #delay_for
方法的探索:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/middleware/server/retry_jobs.rb#L172-L174 def delay_for(worker, count, exception) worker.sidekiq_retry_in_block? && retry_in(worker, count, exception) || seconds_to_delay(count) end
首先了解下 worker.sidekiq_retry_in_block?
的定義:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/worker.rb#L32 base.class_attribute :sidekiq_retry_in_block
其定義了每一個 Worker 類的 sidekiq_retry_in_block
屬性,而其又能夠經過 Worker 類的 #sidekiq_retry_in
方法完成賦值:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/worker.rb#L96-L98 def sidekiq_retry_in(&block) self.sidekiq_retry_in_block = block end
回過頭來,前面的
worker.sidekiq_retry_in_block? && retry_in(worker, count, exception) || seconds_to_delay(count)
表示當具體的 Worker 配置了 :sidekiq_retry_in_block
時,則直接使用這個配置的 block 執行的值做爲失敗任務下次重試的時間間隔;不然使用缺省的計算公式:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/middleware/server/retry_jobs.rb#L177-L179 def seconds_to_delay(count) (count ** 4) + 15 + (rand(30)*(count+1)) end
其中 count
爲任務累計重試次數,從公式上看,隨着失敗重試次數的累計增長,任務的下次重試時間間隔也會指數式增加,按照官方文檔說法:
Sidekiq will retry failures with an exponential backoff using the formula (retry_count * 4) + 15 + (rand(30) (retry_count + 1)) (i.e. 15, 16, 31, 96, 271, ... seconds + a random amount of time). It will perform 25 retries over approximately 21 days.
更多失敗任務重試的相關配置請看文檔:Error Handling: Configuration。
sidekiq 在執行任務時,經過自行實現的中間件架構以及對應的簡單的中間件,及時捕捉失敗的任務,針對容許再次重試的任務,按失敗次數計算新的重試時間,缺省爲指數增加的時間間隔;
用戶能夠經過配置修改缺省的公式,也能夠指定最大重試次數等。
注意:結合失敗任務捕捉處理以及重試任務的檢查,缺省狀況下,一個首次失敗任務下次重回隊列(不是執行)的理論最大時間間隔大概是 67.5 秒!(固定的 15 秒 + 最大隨機時間 30 秒 + 最大理論檢查時間 22.5 秒)。因此,若是你的任務很重要,又須要儘快重試,就須要對幾部分時間的相關配置參數進行調優了哦!在我本身的工做中,我針對某個隊列任務設置的 sidekiq_retry_in
公式爲線性時間,即1s、2s、...50s,而後在重試檢查那裏設置了 :poll_interval_average
爲 5 秒,新的下次執行時間理論最大時間間隔就是 8.5 秒!不過這些配置須要慎重調整,綜合考慮業務以及業務量,既要儘量保證任務儘早處理完,又得保證 Redis 沒被 IO 壓垮。
sidekiq 的源碼比較簡潔,不多看到長方法定義,大部分方法都在幾行以內,讀的過程當中很是舒服;
sidekiq 的註釋也很充足,比較重要又比較核心的代碼都有大量詳細的註釋跟例子,除此以外大部分重點在 Wiki 中都有說起,很是好的一份代碼庫;
sidekiq 將 Redis 的各類數據結構用得都恰到好處,能夠經過 sidekiq 加深對 Redis 的印象以及學習到如何恰當高效地結合 Redis 實現業務邏輯;
正是由於 sidekiq 將 Redis 充分利用以及高度結合,我終於理解 sidekiq 的做者爲何表示 sidekiq 不考慮其餘數據庫了;
sidekiq 的代碼沒有太多花俏的代碼,很是推薦各位童鞋仔細研讀。
帶着問題去閱讀,效率一般很高;
讀的過程當中適當放棄無關細節,只追擊與問題相關的線索;
有些文檔中沒有說起的配置項,每每都藏匿在代碼之中;
只有充分了解了工具的運行機制,在遇到問題調優的時候才能駕輕就熟。
若是你能從頭看到結尾,那麼很是感謝你的時間,畢竟這篇文章確實不短,儘管我已經儘可能去除無用的部分,一些代碼也直接跳過了,可是系統得了解一個框架或者一個軟件,確實也是不少細節。
這是今年第二篇博客,今年的產出遠不比去年,然而去年的產出遠不比千年,因此,可能這篇也是今年最後一篇了。洋洋灑灑幾萬字,從下午兩三點寫到如今,七個多小時,可貴能夠靜下心來寫這麼多,哎,這兩年心態太浮躁,技術路上,仍是繼續保持「stay foolish, stay hungry」的好。