sidekiq任務調度流程分析

sidekiq是 Ruby 中一個很是優秀並且可靠的後臺任務處理軟件,其依賴 Redis 實現隊列任務的增長、重試以及調度等。而 sidekiq 從啓動到開始不斷處理任務、定時任務以及失敗任務的重試,都是如何調度的呢?遇到問題的時候,又該如何調優呢?git

注意

  1. 今天的分析所參考的 sidekiq 的源碼對應版本是 4.2.3;github

  2. 今天所討論的內容,將主要圍繞任務調度過程進行分析,無關細節將不贅述,若有須要,請自行翻閱 sidekiq 源碼;redis

  3. 文章內容真的很長,請作好心理準備。數據庫

你將瞭解到什麼?

  1. sidekiq 的任務調度機制:定時任務、重試任務的檢查機制,隊列任務的排隊以及隊列權重對處理優先級的影響;json

  2. sidekiq 的中間件機制以及在此基礎上實現的任務重試機制。數組

先拋結論

時序圖

對於複雜的調用關係,我習慣用時序圖幫助我理解其中各部分代碼之間相互協做的關係(注意:爲了不太多細節形成閱讀負擔,我將參數傳遞以及返回值等冗雜過程去除了,只保留與任務調度相關的關鍵調用):
圖片描述ruby

人話

Sidekiq 整個任務調度過程當中依賴幾個不一樣角色的代碼共同協做,其分工以下:數據結構

角色 對應類型 職責
定時任務拉取器 Sidekiq::Scheduled::Poller 負責在必定時間範圍內不定時檢查定時任務(scheduled)以及重試任務(retry),將計劃時間已經超過當前時間的任務追加到各自對應任務隊列中
worker 管理器 Sidekiq::Manager 負責按照配置的 concurrency 參數建立匹配數量的worker,以及worker的管理(中止等)
worker Sidekiq::Processor 負責執行指定的任務

源碼之旅 —— 啓動

當咱們在執行 sidekiq 時,源碼中的 bin/sidekiq.rb 文件即是第一個開始執行的文件,讓咱們看看裏邊的主要代碼架構

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/bin/sidekiq#L9-L12
begin
  cli = Sidekiq::CLI.instance
  cli.parse
  cli.run # <===== 這邊走
# ...

緊靠 begin 後邊的兩行代碼首先建立 Sidekiq::CLI 類的一個實例,接着調用實例方法 #parse 解析 sidekiq 的配置參數,其中包括隊列的配置、worker 數量的配置等,在此不展開了。接着實例方法 #run 將帶着咱們繼續往下走,讓咱們繼續看 lib/sidekiq/cli.rb 裏邊的代碼:app

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/cli.rb#L46-L106
def run
  # 這裏打印控制檯歡迎信息、打印日誌以及運行環境(不一樣 Rails 版本)加載等

  require 'sidekiq/launcher'
  @launcher = Sidekiq::Launcher.new(options)

  begin
    launcher.run # <===== 這邊走

  # 進程接收到的信號處理以及退出處理
end

上面的代碼主要是實例化了一個 Sidekiq::Launcher 的對象,緊隨其後又調用了實例方法 #run,因此讓咱們繼續順藤摸瓜,看看 Sidekiq::Launcher#run 方法到底作了哪些事情?

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/launcher.rb#L24-L28
def run
  @thread = safe_thread("heartbeat", &method(:start_heartbeat))
  @poller.start
  @manager.start
end

#run 方法首先經過 safe_thread 建立了一個新的線程,線程主要負責執行 start_heartbeat 方法的代碼,從方法名稱上,咱們猜想其主要是心跳代碼,負責定時檢查 sidekiq 健康狀態,跟以前同樣,這裏不往下挖,咱們繼續看後邊的兩行代碼:

@poller.start
@manager.start

這裏的 @poller@manager 都是什麼呢?讓咱們回頭看一下,前面講到 lib/cli.rb#run 方法會負責建立 Sidekiq::Launcher 的實例,那讓咱們看下後者的 initialize 方法定義:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/launcher.rb#L17-L22
def initialize(options)
  @manager = Sidekiq::Manager.new(options)
  @poller = Sidekiq::Scheduled::Poller.new
  @done = false
  @options = options
end

能夠看到,實際上,@manager是在建立 Sidekiq::Launcher 實例的過程當中同步建立的 Sidekiq::Manager 的實例,同理,@poller 是同步建立的 Sidekiq::Scheduled::Poller的實例。那咱們按照代碼執行順序,先看下 @poller.start 也就是 Sidekiq::Scheduled::Poller#start 方法的定義:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L63-L73
def start
  @thread ||= safe_thread("scheduler") do
    initial_wait

    while !@done
      enqueue
      wait
    end
    Sidekiq.logger.info("Scheduler exiting...")
  end
end

這裏看到,#start方法也建立了一個線程,在線程裏執行了兩個部分代碼:1. 初始化等待;2. 循環裏的 enqueuewait。這都是什麼呢?

注意: #start 方法在線程建立完成後就馬上返回了,至於 #start 方法裏的邏輯,請移步後面章節「繼續深挖 Sidekiq::Scheduled::Poller#start」做更深一步分析。這裏,咱們先繼續接着看看 #start 方法返回後接下來執行的 @manager.start 方法又作了什麼:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L45-L49
def start
  @workers.each do |x|
    x.start
  end
end

這裏的 @workers 又是什麼?一個數組?怎樣的數組?咱們回顧下,前面說在建立 Sidekiq::Launcher 實例的過程當中同步建立了 Sidekiq::Manager 的實例,讓咱們就看看 Sidekiq::Manager#initialize 方法:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L31-L43
def initialize(options={})
  logger.debug { options.inspect }
  @options = options
  @count = options[:concurrency] || 25
  raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1

  @done = false
  @workers = Set.new
  @count.times do
    @workers << Processor.new(self)
  end
  @plock = Mutex.new
end

能夠看到,在建立了 Sidekiq::Manager 的實例以後,又同步建立了多個 Sidekiq::Processor 的實例,實例的個數取決於 options[:concurrency] || 25,也就是配置的 :concurrency 的值,缺省值爲 25。至此,咱們知道,sidekiq 中的 worker 的數量就是在此其做用的,Sidekiq::Manager 按照配置的數量建立指定數量的 worker。
往回看剛纔的 #start 方法中:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L46-L48
@workers.each do |x|
  x.start
end

簡言之,就是 Sidekiq::Managerstart 的時候只作一件事:分別調用其管理的全部 worker 的 #start 方法,也就是 Sidekiq::Processor#start。繼續往下走:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L60-L62
def start
  @thread ||= safe_thread("processor", &method(:run))
end

又是咱們熟悉的 safe_thread 方法,一樣是建立了一個新的線程,意味着每個 worker 都是基於本身的一個新線程的,而這個線程裏執行的代碼是私有方法 #run 裏的代碼:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L66-L77
def run
  begin
    while !@done
      process_one
    end
    @mgr.processor_stopped(self)
  rescue Sidekiq::Shutdown
    @mgr.processor_stopped(self)
  rescue Exception => ex
    @mgr.processor_died(self, ex)
  end
end

能夠發現,又是一個 while 循環!而這個循環體裏只調用了一個 #process_one 實例方法,顧名思義,這裏是說每一個 worker 在沒被結束以前,都重複每次處理一個新的任務,那這個 #process_one 裏又作了什麼呢?怎麼決定該先作哪一個任務呢?別急,請看後面章節「繼續深挖 Sidekiq::Processor#process_one」。

小結

sidekiq 在啓動後(此處可借文章開頭的時序圖輔助理解):

  1. 首先建立了 Sidekiq::CLI 的實例,並調用其 run 方法;

  2. Sidekiq::CLI 的實例在 #run 的過程當中,建立了 Sidekiq::Launcher 的實例,並調用其 run 方法;

  3. Sidekiq::Launcher 的實例在建立後,同步建立了一個 Sidekiq::Scheduled::Poller 的實例以及 Sidekiq::Manager 的實例,而在其執行 #run 的過程當中,則分別調用了這兩個實例的 start 方法;

  4. Sidekiq::Scheduled::Poller 的實例在執行 start 過程當中,建立了一個內部循環執行的線程,周而復始地執行 enqueue -> wait

  5. Sidekiq::Manager 的實例在建立後,同步建立若干個指定的 worker,也就是 Sidekiq::Processor 的實例,並在執行 start 方法的過程當中對每個 worker 發起 start 調用;

  6. Sidekiq::Processor 實例在執行 start 方法的過程當中建立了一個新的線程,新的線程裏一樣有一個 while 循環,反覆執行 process_one

以上就是 Sidekiq 的主要啓動過程,如下分別針對 Sidekiq::Scheduled::Poller 以及 Sidekiq::Manager 展開源碼分析。

定時任務拉取器的工做 Sidekiq::Scheduled::Poller#start

通過前面較表層的代碼分析,咱們接下來繼續展開 Sidekiq::Scheduled::Poller#start 方法的探索之旅,首先重溫下其代碼定義:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L63-L73
def start
  @thread ||= safe_thread("scheduler") do
    initial_wait

    while !@done
      enqueue
      wait
    end
    Sidekiq.logger.info("Scheduler exiting...")
  end
end

能夠看到,#start 方法的核心就是中間的 while 循環,在循環前面,執行了 #initial_wait 方法,讓咱們先看看這個方法究竟是幹些什麼的:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L133-L143
def initial_wait
  # Have all processes sleep between 5-15 seconds.  10 seconds
  # to give time for the heartbeat to register (if the poll interval is going to be calculated by the number
  # of workers), and 5 random seconds to ensure they don't all hit Redis at the same time.
  total = 0
  total += INITIAL_WAIT unless Sidekiq.options[:poll_interval_average]
  total += (5 * rand)

  @sleeper.pop(total)
rescue Timeout::Error
end

結合註釋理解,原來私有方法 #initial_wait 只是爲了不全部進程在後續邏輯中同時觸發 Redis IO 而作的設計,若是對大型系統有過架構經驗的童鞋就會明白,這裏其實就是爲了防止相似雪崩之類的系統故障出現。讓當前進程隨機等待必定範圍的時間,從而就能夠跟其餘進程錯開了。

在理解完 initial_wait 以後,咱們接着看到循環體裏的代碼:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L68-L69
enqueue
wait

enqueue?幹嗎呢?爲何是入隊列呢?帶着疑問往下看:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L75-L86
def enqueue
  begin
    @enq.enqueue_jobs
  rescue => ex
    # ...
  end
end

這裏看到 #enqueue 代碼很是簡單,只是調用了實例變量 @enq#enqueue_jobs 方法而已,那麼,@enq 是什麼類型的實例呢?它的 #enqueue_jobs 方法又作了什麼呢?讓咱們回過頭來看一遍 Sidekiq::Scheduled::Poller#initialize 方法:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L45-L50
def initialize
  @enq = (Sidekiq.options[:scheduled_enq] || Sidekiq::Scheduled::Enq).new
  @sleeper = ConnectionPool::TimedStack.new
  # ...
end

原來缺省狀況下,@enq 就是 Sidekiq::Scheduled::Enq 的實例。而代碼上看的話,sidekiq 支持用戶經過 :scheduled_enq 配置項自定義 @enq 的類型,可是官方文檔未對此參數說起以及說明,這裏實際上是一種策略模式的實現,用戶自定義的類型必須實現 enqueue_jobs 方法。我估計,是 sidekiq pro 裏邊纔會用到的配置項吧。

知道了 @enq 的類型後,讓咱們繼續看下 Sidekiq::Scheduled::Enq#enqueue_jobs 方法的定義:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L11-L33
def enqueue_jobs(now=Time.now.to_f.to_s, sorted_sets=SETS)
  # A job's "score" in Redis is the time at which it should be processed.
  # Just check Redis for the set of jobs with a timestamp before now.
  Sidekiq.redis do |conn|
    sorted_sets.each do |sorted_set|
      # Get the next item in the queue if it's score (time to execute) is <= now.
      # We need to go through the list one at a time to reduce the risk of something
      # going wrong between the time jobs are popped from the scheduled queue and when
      # they are pushed onto a work queue and losing the jobs.
      while job = conn.zrangebyscore(sorted_set, '-inf'.freeze, now, :limit => [0, 1]).first do

        # Pop item off the queue and add it to the work queue. If the job can't be popped from
        # the queue, it's because another process already popped it so we can move on to the
        # next one.
        if conn.zrem(sorted_set, job)
          Sidekiq::Client.push(Sidekiq.load_json(job))
          Sidekiq::Logging.logger.debug { "enqueued #{sorted_set}: #{job}" }
        end
      end
    end
  end

其實這裏這個方法的寓意,經過代碼裏的註釋都已經很明晰了,不過我以爲仍是有幾個點須要強調下。
首先,在無參數調用 #enqueue_jobs 方法時,定義中的參數 now 缺省爲當前時間,而 sorted_sets 缺省爲 Sidekiq::Scheduled::SETS 的值,其值定義爲:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L8
SETS = %w(retry schedule)

也就是數組 ["retry", "schedule"],而這兩個隊列名稱所對應的隊列就是 sidekiq 的重試以及定時任務隊列,在 sidekiq 裏邊,重試任務以及定時任務本質上都是 scheduled jobs,這兩個隊列使用了特殊的 Redis 的數據結構,進入隊列的任務以其執行時間做爲數據的 score,寫入 Redis 以後按照 score 排序,也就是按任務的計劃時間排序。

接着往下看:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L14-L30
Sidekiq.redis do |conn|
  sorted_sets.each do |sorted_set|
    while job = conn.zrangebyscore(sorted_set, '-inf'.freeze, now, :limit => [0, 1]).first do
      if conn.zrem(sorted_set, job)
        Sidekiq::Client.push(Sidekiq.load_json(job))
        Sidekiq::Logging.logger.debug { "enqueued #{sorted_set}: #{job}" }
      end
    end
  end
end

能夠看到,sidekiq 分別針對 "retry""schedule" 隊列作了一個循環,循環體裏每次經過 Redis 的 ZRANGEBYSCORE命令取出一個計劃時間小於等於當前時間的任務,而且調用 Sidekiq::Client.push 方法將此任務加到指定隊列中(job 中包含隊列名稱等信息,在此不展開,有興趣的同窗請自行閱讀 Sidekiq::Client 的代碼)。

至此,能夠明白,enqueue_jobs 就是分別從 "retry""schedule" 隊列中取出已經到達計劃時間的任務,將其一一加入原來隊列。注意,定時任務以及重試任務的計劃時間只是計劃加進執行中隊列的時間,並不是執行時間,執行的時間就只能取決於隊列的長度以及隊列執行速度了。

接着往回點,繼續看 enqueue_jobs 以後的 wait 方法:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L90-L100
def wait
  @sleeper.pop(random_poll_interval)
rescue Timeout::Error
  # expected
rescue => ex
  #...
end

這裏的 wait 方法只是作一個休眠,休眠的實現依賴於 @sleeper#pop 方法調用,回顧 Sidekiq::Scheduled::Poller#initialize 方法的實現能夠確認 @sleeperConnectionPool::TimedStack 的實例,然後者是 Ruby gem connection_pool 裏的實現,其 pop 方法會阻塞當前代碼的執行,直到有值返回或者到達指定的超時時間,這裏 sidekiq 利用了其阻塞的特性,做爲 wait 方法休眠器的實現。

而代碼裏的休眠時間則不是固定的,依賴 #random_poll_interval 方法的實現:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L103-L105
# Calculates a random interval that is ±50% the desired average.
def random_poll_interval
  poll_interval_average * rand + poll_interval_average.to_f / 2
end

其實現依賴一個 #poll_interval_average 方法的返回值,顧名思義,這個方法將決定定時任務按期檢查的平均時間週期。讓咱們繼續深挖下去:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L107-L122
# We do our best to tune the poll interval to the size of the active Sidekiq
# cluster.  If you have 30 processes and poll every 15 seconds, that means one
# Sidekiq is checking Redis every 0.5 seconds - way too often for most people
# and really bad if the retry or scheduled sets are large.
#
# Instead try to avoid polling more than once every 15 seconds.  If you have
# 30 Sidekiq processes, we'll poll every 30 * 15 or 450 seconds.
# To keep things statistically random, we'll sleep a random amount between
# 225 and 675 seconds for each poll or 450 seconds on average.  Otherwise restarting
# all your Sidekiq processes at the same time will lead to them all polling at
# the same time: the thundering herd problem.
#
# We only do this if poll_interval_average is unset (the default).
def poll_interval_average
  Sidekiq.options[:poll_interval_average] ||= scaled_poll_interval
end

這個方法的重要性經過其幾倍於代碼的註釋就能夠看出來,大概意思是,sidekiq 爲了不在進程重啓後,有大量的進程同時密集地訪問 redis,因此設計了這個機制,就是每一個進程對定時任務的檢查都是按照一個公式來計算的,保證每一個進程兩次檢查之間的平均休眠時間可以在一個範圍內動態變化,從而將全部進程的 Redis IO 均勻錯開。
從代碼上看,sidekiq 的這個平均拉取時間支持配置項配置,可是目前也並無在 wiki 上有所說起。而缺省狀況下,其值由方法 #scaled_poll_interval 決定:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L124-L131

def scaled_poll_interval
  pcount = Sidekiq::ProcessSet.new.size
  pcount = 1 if pcount == 0
  pcount * Sidekiq.options[:average_scheduled_poll_interval]
end

正如前面一段代碼的註釋所說,缺省狀況下,sidekiq 認爲定時任務拉取器的平均休眠時間正是:

sidekiq 進程數量 x 平均拉取時間 average_scheduled_poll_interval

:average_scheduled_poll_interval 的缺省配置是 15 秒:

# https://github.com/mperham/sidekiq/blob/master/lib/sidekiq.rb#L25
DEFAULTS = {
  # ...
  average_scheduled_poll_interval: 15,
  # ...

因此回過頭來,在沒有相關自定義配置的狀況下,假設你只開啓了一個 sidekiq 進程,那麼 sidekiq 的定時任務拉取器的拉取時間平均間隔爲 1 x 15 = 15 秒,那按照上面的 #random_poll_interval 方法的定義,則實際每次拉取的時間間隔則是在 7.5 秒到 22.5 秒之間!

小結

從這個章節的分析,咱們能夠明白 Sidekiq 對定時任務和重試任務是一視同仁的,其處理流程都是:

  1. 全部定時任務(包括重試任務,本質上重試任務也是定時的,後邊會單獨講解)以其計劃時間爲 score,加入特殊的 "retry""schedule" 有序隊列中;

  2. sidekiq 的定時任務拉取器從 "retry""schedule" 隊列中一一取出已到達計劃時間的任務,將其加入該任務計劃的隊列中,後續的執行則跟其餘普通隊列中的任務一致;

  3. 拉取器休眠必定時間(random_poll_interval)後,從步驟 2 從新開始,周而復始。

因此,定時任務的計劃時間不是確切的任務時間!只是容許加回隊列的時間,具體執行時間還得另外看隊列長度以及隊列處理速度!

Sidekie worker 的祕密: Sidekiq::Processor#process_one

前面咱們分析過 sidekiq 的 worker 的核心代碼就是在線程裏循環執行 #process_one 方法,那麼這個方法到底作了些什麼啊?別急,如今就來一探究竟:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L79-L83
def process_one
  @job = fetch
  process(@job) if @job
  @job = nil
end

代碼中,#process_one 先經過 #fetch 方法獲取一個任務,當任務獲取成功後,就將其做爲參數調用 #process 方法,完成對任務的處理;若是沒有獲取到任務,則直接從新嘗試獲取新的任務。

首先讓咱們看看 #fetch 方法的實現:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L96-L104
def fetch
  j = get_one         # 吐槽一下這個 `j` 變量,命名真的不敢恭維,這個庫就這裏寫得不雅
  if j && @done
    j.requeue
    nil
  else
    j
  end
end

#fetch 方法經過 #get_one 方法從隊列中獲取任務,當獲取到任務後,判斷當前 worker 是否已經中止(@donetrue),是則將任務從新壓回隊列。

讓咱們接着看 #get_one 方法的實現:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L85-L94
def get_one
  begin
    work = @strategy.retrieve_work
    (logger.info { "Redis is online, #{Time.now - @down} sec downtime" }; @down = nil) if @down
    work
  rescue Sidekiq::Shutdown
  rescue => ex
    handle_fetch_exception(ex)
  end
end

核心代碼則是 work = @strategy.retrieve_work,爲了瞭解 @strategy,咱們仍舊往回看#initialize 方法的定義:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L32-L40
def initialize(mgr)
  # ...
  @strategy = (mgr.options[:fetch] || Sidekiq::BasicFetch).new(mgr.options)
  # ...
end

又是一個策略模式,缺省下,使用了 Sidekiq::BasicFetch 生成實例,而且經過實例變量 @strategy 引用。

回到前面的 @strategy.retrieve_work,讓咱們繼續看看 Sidekiq::BasicFetch#retrieve_work 的實現:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/fetch.rb#L35-L38
def retrieve_work
  work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
  UnitOfWork.new(*work) if work
end

經過上面的代碼,能夠知道 Sidekiq::BasicFetch 的取任務邏輯比較直接,是經過 Redis 的 BRPOP 命令從「全部隊列」中阻塞地取出第一個任務:

BRPOP is a blocking list pop primitive. It is the blocking version of RPOP because it blocks the connection when there are no elements to pop from any of the given lists. An element is popped from the tail of the first list that is non-empty, with the given keys being checked in the order that they are given.

因此,理解了 BRPOP 命令的工做細節以後,咱們把注意力縮放到 #queues_cmd 方法上:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/fetch.rb#L40-L53
def queues_cmd
  if @strictly_ordered_queues
    @queues
  else
    queues = @queues.shuffle.uniq
    queues << TIMEOUT
    queues
  end
end

首先,代碼中檢查了 @strictly_ordered_queues 這個實例變量的值,讓咱們回頭看下這個變量的值的來源,也就是 #initialize 方法的定義:

# https://github.com/mperham/sidekiq/blob/d8f11c26518dbe967880f76fd23bb99e9d2411d5/lib/sidekiq/fetch.rb#L26-L33
def initialize(options)
  @strictly_ordered_queues = !!options[:strict]
  @queues = options[:queues].map { |q| "queue:#{q}" }
    if @strictly_ordered_queues
      @queues = @queues.uniq
      @queues << TIMEOUT
    end
  end
end

缺省狀況下,此值爲 false。因此讓咱們看 #queues_cmd 方法的 else 分支裏的代碼:

queues = @queues.shuffle.uniq

而這裏的 @queues 就是來自 options[:queues] 中的配置: options[:queues].map { |q| "queue:#{q}" }。那麼,這個 options[:queues] 的值又是什麼呢?
讓咱們一步一步沿着調用鏈上參數往回走:

  1. Sidekiq::BasicFetch.new 的參數 options 來自 worker 在 Sidekiq::Processor#initialize 方法中的參數 mgroptions 屬性;

  2. worker 的 mgr 參數正是 Sidekiq::Manager 的實例,其 options 屬性則是 Sidekiq::Launcher 建立 Sidekiq::Manager 實例時傳入的 options 變量;

  3. Sidekiq::Launcher#initialize 接收到的 options 變量則是更外層的 Sidekiq::CLI 的實例方法 options 的值;

  4. Sidekiq::CLI 的實例的 options 則是在其接收到 #parse 調用時設置的。
    爲了節省篇幅,省略這裏其中的太多調用棧,咱們直接看最根源代碼:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/cli.rb#L389-L399
def parse_queues(opts, queues_and_weights)
  queues_and_weights.each { |queue_and_weight| parse_queue(opts, *queue_and_weight) }
end

def parse_queue(opts, q, weight=nil)
  [weight.to_i, 1].max.times do
   (opts[:queues] ||= []) << q
  end
  opts[:strict] = false if weight.to_i > 0
end

能夠看到,sidekiq 在解析 :queues 的相關配置時,按照每一個隊列以及其權重,生成了一個重複次數等於隊列權重的隊列的新數組,假設用戶提供以下配置:

:queues:
  - default
  - [myqueue, 2]

則此處生成的 options[:queues] 則爲 ["default", "myqueue", "myqueue"]。因此,這裏權重主要用於後邊肯定各個不一樣隊列被處理到的優先權的比重。

瞭解了 @queues 的來源以後,咱們回到最開始討論的地方:

queues = @queues.shuffle.uniq

也就是說,每次 worker 在請求新的任務時,sidekiq 都按照原來的 @queues 執行 shuffle 方法,而 shuffle 則表示將數組元素從新隨機排序,亦即「洗牌」。結合前面的權重,那麼每一個隊列洗牌後排在第一位的機率與其權重掛鉤。最後的 #uniq 方法確保隊列名稱沒有重複,避免 Redis 在執行 BRPOP 命令時重複檢查同一隊列。這裏使用 BRPOP 還有個好處就是,加入當前面優先的隊列裏邊沒有任務時,能夠依次將機會讓給後面的隊列。

然後邊的:

queues << TIMEOUT

則是在命令末尾追加超時設定,即 Redis 的 BRPOP 命令最多阻塞 2 秒,超時則直接放棄。

瞭解了任務的獲取以後,咱們接着看 sidekiq 如何處理獲取到的任務,回到 retrieve_work 的代碼:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/fetch.rb#L36-L37
work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
UnitOfWork.new(*work) if work

看到在獲取到任務以後,任務經過 Sidekiq::BasicFetch::UnitOfWork 結構體實例化後返回給調用方。

直接回到 Sidekiq::Processor#process_one:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L79-L83
def process_one
  @job = fetch
  process(@job) if @job
  @job = nil
end

能夠明白,@job 就是返回的 UnitOfWork 實例,那麼 process(@job) 會作些什麼呢?

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L118-L152
def process(work)
  jobstr = work.job
  queue = work.queue_name

  @reloader.call do
    ack = false
    begin
      job = Sidekiq.load_json(jobstr)
      klass  = job['class'.freeze].constantize
      worker = klass.new
      worker.jid = job['jid'.freeze]

      stats(worker, job, queue) do
        Sidekiq.server_middleware.invoke(worker, job, queue) do
          # Only ack if we either attempted to start this job or
          # successfully completed it. This prevents us from
          # losing jobs if a middleware raises an exception before yielding
          ack = true
          execute_job(worker, cloned(job['args'.freeze]))
        end
      end
  # ...

上面代碼中,sidekiq 從 work 中獲取任務的相關信息,包括隊列名稱,任務對應的類型(job['class'.freeze])、任務調用所需的參數等,根據這些信息從新實例化任務對象,而且將實例化的任務對象 worker 以及任務參數都傳遞給對 execute_job 的調用。讓咱們看看 #execute_job 的實現:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L154-L156
def execute_job(worker, cloned_args)
  worker.perform(*cloned_args)
end

看到了吧?咱們最熟悉的 #perform 方法!這下知道咱們爲何須要在每一個 sidekiq Worker 或者 ActiveJob 的 Job 類中定義這個方法了吧?由於這個方法就是最終任務執行時所需調用的方法,這就是約定!

至此,任務的調度過程就到此爲止了,剩下的就是周而復始的重複了。

小結

通過上面的分析,咱們能夠明白 sidekiq 中 worker 的工做流程:

  1. 按照全部隊列以及其權重,每次從新排列待處理隊列順序,高權重的隊列有更高的優先級;

  2. 將從新排好的隊列順序傳遞給 Redis 的 BRPOP 命令,同時設置 2 秒超時;

  3. sidekiq 將從隊列中獲取到的任務實例化,而且根據攜帶的參數調用了任務的 #perform 方法。

等等,上面都只是正常流程,那若是任務執行過程當中出錯了怎麼辦???重試的機制是如何運轉的呢?

重試機制:基於中間件的實現

注意:閱讀本章節前,建議先閱讀官方 Wiki 的 Error Handling

細心的童鞋確定發現了上面的 Sidekiq::Processor#process 方法中有個關鍵的代碼:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L131-L137
Sidekiq.server_middleware.invoke(worker, job, queue) do
  # ...
  execute_job(worker, cloned(job['args'.freeze]))
end

這個 server_middleware 是什麼呢?讓咱們來簡單過一下吧:

全局搜索了代碼,發現 Sidekiq.server_middleware 的來源是:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq.rb#L140-L144
def self.server_middleware
  @server_chain ||= default_server_middleware
  yield @server_chain if block_given?
  @server_chain
end

缺省狀況下,.server_middleware 依賴 .default_server_middleware 的實現:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq.rb#L146-L154
def self.default_server_middleware
  #...

  Middleware::Chain.new do |m|
    m.add Middleware::Server::Logging
    m.add Middleware::Server::RetryJobs
  end
end

能夠明白 Sidekiq.default_server_middleware 返回一個 Middleware::Chain 實例,而且調用了其 #add 方法將 Middleware::Server::Logging 以及 Middleware::Server::RetryJobs 兩個中間件加到中間件的 Chain 上。此中間件的實現以及實現相似 rackup,有興趣的童鞋自行閱讀源碼,在此不展開,讓咱們直接跳到 Middleware::Server::RetryJobscall 方法中:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/middleware/server/retry_jobs.rb#L73-L84
def call(worker, msg, queue)
  yield
rescue Sidekiq::Shutdown
  # ignore, will be pushed back onto queue during hard_shutdown
  raise
rescue Exception => e
  # ignore, will be pushed back onto queue during hard_shutdown
  raise Sidekiq::Shutdown if exception_caused_by_shutdown?(e)

  raise e unless msg['retry']
  attempt_retry(worker, msg, queue, e)
end

讓咱們聚焦方法的最後一行代碼 attempt_retry(worker, msg, queue, e),此處表示當執行中的任務出現異常時,除去停機的因素以及禁用了重試機制後,嘗試進行下次重試運行:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/middleware/server/retry_jobs.rb#L88-L137
def attempt_retry(worker, msg, queue, exception)
  max_retry_attempts = retry_attempts_from(msg['retry'], @max_retries)
  # ...

  count = if msg['retry_count']
    msg['retried_at'] = Time.now.to_f
    msg['retry_count'] += 1
  else
    msg['failed_at'] = Time.now.to_f
    msg['retry_count'] = 0
  end

  # ...
  if count < max_retry_attempts
    delay = delay_for(worker, count, exception)
    logger.debug { "Failure! Retry #{count} in #{delay} seconds" }
    retry_at = Time.now.to_f + delay
    payload = Sidekiq.dump_json(msg)
    Sidekiq.redis do |conn|
      conn.zadd('retry', retry_at.to_s, payload)
    end
  else
    # Goodbye dear message, you (re)tried your best I'm sure.
    retries_exhausted(worker, msg, exception)
  end

  raise exception
end

從上面的代碼中看出,sidekiq 在捕捉到異常後,首先檢查此任務此前是否已經重試過,是的話,則在重試累計次數上加 1,更新最後重試時間;不然初始化重試累計次數爲 0,設定初次失敗時間。接着,sidekiq 檢查重試累計次數是否超過限定最大重試次數,是的話則放棄重試,任務今後再也不重試,進入 Dead 狀態,sidekiq 拋出異常;不然計算任務下次重試時間,將任務按照計劃的下次重試時間加到 retry 有序隊列中,最後拋出異常。關於重試任務的檢查跟執行,請閱讀前面的相關章節,接下來咱們主要分析 sidekiq 如何計算任務的下次重試時間 delay

讓咱們展開對 #delay_for 方法的探索:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/middleware/server/retry_jobs.rb#L172-L174
def delay_for(worker, count, exception)
  worker.sidekiq_retry_in_block? && retry_in(worker, count, exception) || seconds_to_delay(count)
end

首先了解下 worker.sidekiq_retry_in_block? 的定義:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/worker.rb#L32
base.class_attribute :sidekiq_retry_in_block

其定義了每一個 Worker 類的 sidekiq_retry_in_block 屬性,而其又能夠經過 Worker 類的 #sidekiq_retry_in 方法完成賦值:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/worker.rb#L96-L98
def sidekiq_retry_in(&block)
  self.sidekiq_retry_in_block = block
end

回過頭來,前面的

worker.sidekiq_retry_in_block? && retry_in(worker, count, exception) || seconds_to_delay(count)

表示當具體的 Worker 配置了 :sidekiq_retry_in_block 時,則直接使用這個配置的 block 執行的值做爲失敗任務下次重試的時間間隔;不然使用缺省的計算公式:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/middleware/server/retry_jobs.rb#L177-L179
def seconds_to_delay(count)
  (count ** 4) + 15 + (rand(30)*(count+1))
end

其中 count 爲任務累計重試次數,從公式上看,隨着失敗重試次數的累計增長,任務的下次重試時間間隔也會指數式增加,按照官方文檔說法:

Sidekiq will retry failures with an exponential backoff using the formula (retry_count * 4) + 15 + (rand(30) (retry_count + 1)) (i.e. 15, 16, 31, 96, 271, ... seconds + a random amount of time). It will perform 25 retries over approximately 21 days.

更多失敗任務重試的相關配置請看文檔:Error Handling: Configuration

小結

  1. sidekiq 在執行任務時,經過自行實現的中間件架構以及對應的簡單的中間件,及時捕捉失敗的任務,針對容許再次重試的任務,按失敗次數計算新的重試時間,缺省爲指數增加的時間間隔;

  2. 用戶能夠經過配置修改缺省的公式,也能夠指定最大重試次數等。

注意:結合失敗任務捕捉處理以及重試任務的檢查,缺省狀況下,一個首次失敗任務下次重回隊列(不是執行)的理論最大時間間隔大概是 67.5 秒!(固定的 15 秒 + 最大隨機時間 30 秒 + 最大理論檢查時間 22.5 秒)。因此,若是你的任務很重要,又須要儘快重試,就須要對幾部分時間的相關配置參數進行調優了哦!在我本身的工做中,我針對某個隊列任務設置的 sidekiq_retry_in 公式爲線性時間,即1s、2s、...50s,而後在重試檢查那裏設置了 :poll_interval_average 爲 5 秒,新的下次執行時間理論最大時間間隔就是 8.5 秒!不過這些配置須要慎重調整,綜合考慮業務以及業務量,既要儘量保證任務儘早處理完,又得保證 Redis 沒被 IO 壓垮。

總結

關於 sidekiq 項目代碼

  1. sidekiq 的源碼比較簡潔,不多看到長方法定義,大部分方法都在幾行以內,讀的過程當中很是舒服;

  2. sidekiq 的註釋也很充足,比較重要又比較核心的代碼都有大量詳細的註釋跟例子,除此以外大部分重點在 Wiki 中都有說起,很是好的一份代碼庫;

  3. sidekiq 將 Redis 的各類數據結構用得都恰到好處,能夠經過 sidekiq 加深對 Redis 的印象以及學習到如何恰當高效地結合 Redis 實現業務邏輯;

  4. 正是由於 sidekiq 將 Redis 充分利用以及高度結合,我終於理解 sidekiq 的做者爲何表示 sidekiq 不考慮其餘數據庫了;

  5. sidekiq 的代碼沒有太多花俏的代碼,很是推薦各位童鞋仔細研讀。

關於源碼閱讀

  1. 帶着問題去閱讀,效率一般很高;

  2. 讀的過程當中適當放棄無關細節,只追擊與問題相關的線索;

  3. 有些文檔中沒有說起的配置項,每每都藏匿在代碼之中;

  4. 只有充分了解了工具的運行機制,在遇到問題調優的時候才能駕輕就熟。

最後

若是你能從頭看到結尾,那麼很是感謝你的時間,畢竟這篇文章確實不短,儘管我已經儘可能去除無用的部分,一些代碼也直接跳過了,可是系統得了解一個框架或者一個軟件,確實也是不少細節。

這是今年第二篇博客,今年的產出遠不比去年,然而去年的產出遠不比千年,因此,可能這篇也是今年最後一篇了。洋洋灑灑幾萬字,從下午兩三點寫到如今,七個多小時,可貴能夠靜下心來寫這麼多,哎,這兩年心態太浮躁,技術路上,仍是繼續保持「stay foolish, stay hungry」的好。

相關文章
相關標籤/搜索