Sidekiq 信號處理源碼分析

引言

在以前的文章《Sidekiq任務調度流程分析》中,咱們一塊兒仔細分析了 Sidekiq 是如何基於多線程完成隊列任務處理以及調度的。咱們在以前的分析裏,看到了不論是 Sidekiq::Scheduled::Poller 仍是 Sidekiq::Processor 的核心代碼裏,都會有一個由 @done 實例變量控制的循環體:
<!-- More -->html

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L63-L73
def start
  @thread ||= safe_thread("scheduler") do
    initial_wait

    while !@done           # 這是 poller 的循環控制
      enqueue
      wait
    end
    Sidekiq.logger.info("Scheduler exiting...")
  end
end
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L66-L77
def run
  begin
    while !@done           # 這是咱們常說的 worker 循環控制
      process_one
    end
    @mgr.processor_stopped(self)
  rescue Sidekiq::Shutdown
    @mgr.processor_stopped(self)
  rescue Exception => ex
    @mgr.processor_died(self, ex)
  end
end

也就是說,這些 @done 實例變量決定了 poller 線程跟 worker 線程是否循環執行?一旦 @done 被改成 true,那循環體就再也不執行,線程天然也就是退出了。因而,單從這些代碼,咱們能夠判定, Sidekiq 就是經過設置 @done 的值來通知一個線程安全退出(graceful exit)的。咱們也知道,生產環境中,咱們是經過發送信號的方式來告訴 sidekiq 退出或者進入靜默(quiet)狀態的,那麼,這裏的 @done 是怎麼跟信號處理聯繫起來的呢?這些就是今天這篇文章的重點了!git

注意

  1. 今天的分析所參考的 sidekiq 的源碼對應版本是 4.2.3;github

  2. 今天所討論的內容,將主要圍繞系統信號處理進行分析,無關細節將不贅述,若有須要,請自行翻閱 sidekiq 源碼;數組

  3. 今天的文章跟上篇的《Sidekiq任務調度流程分析》緊密相關,上篇文章介紹的啓動過程跟任務調度會幫助這篇文章的理解,若是尚未閱讀上篇文章的,建議先閱讀後再來閱讀這一篇信號處理的文章。安全

你將瞭解到什麼?

  1. Sidekiq 信號處理機制;ruby

  2. 爲何重啓 Sidekiq 時,USR1 信號(即進入 quiet 模式)須要儘量早,而進程的退出重啓須要儘量晚。多線程

從頭再來

由於前一篇文章着眼於任務調度,因此略過了其餘無關細節,包括信號處理,這篇文章則將鏡頭對準信號處理,因此讓咱們從頭再來一遍,只是這一次,咱們只關心與信號處理有關的代碼。async

依舊是從 cli.rb 文件開始,它是 Sidekiq 核心代碼的生命起點,由於 Sidekiq 命令行啓動後,它是第一個被執行的代碼,Sidekiq 啓動過程當中調用了 Sidekiq::CLI#run 方法:ide

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/cli.rb#L49-L106
def run
  boot_system
  print_banner

  self_read, self_write = IO.pipe

  %w(INT TERM USR1 USR2 TTIN).each do |sig|
    begin
      trap sig do
        self_write.puts(sig)
      end
    rescue ArgumentError
      puts "Signal #{sig} not supported"
    end
  end

  # ... other codes

  begin
    launcher.run

    while readable_io = IO.select([self_read])
      signal = readable_io.first[0].gets.strip
      handle_signal(signal)
    end
  rescue Interrupt
    logger.info 'Shutting down'
    launcher.stop
    # Explicitly exit so busy Processor threads can't block
    # process shutdown.
    logger.info "Bye!"
    exit(0)
  end

以上的代碼就是整個 Sidekiq 最頂層的信號處理的核心代碼了,讓咱們慢慢分析!
首先,self_read, self_write = IO.pipe 建立了一個模擬管道的 IO 對象,而且同時返回這個 管道的一個寫端以及一個讀端,經過這兩端,就能夠實現對管道的讀寫了。須要注意的是,IO.pipe 建立的讀端在讀的時候不會自動生成 EOF 符,因此這就要求讀時,寫端是關閉的,而寫時,讀端是關閉的,一句話說,就是這樣的管道不容許讀寫端同時打開。關於 IO.pipe 還有挺多細節跟須要注意的點,若是還須要瞭解,請閱讀官方文檔學習

上面說的管道本質上只是一個 IO 對象而已,暫時不用糾結太多,讓咱們接着往下讀:

%w(INT TERM USR1 USR2 TTIN).each do |sig|
  begin
    trap sig do
      self_write.puts(sig)
    end
  rescue ArgumentError
    puts "Signal #{sig} not supported"
  end
end

這段代碼就比較有意思了,最外層遍歷了一個系統信號的數組,而後逐個信號進行監聽(trap,或者叫捕捉?)。讓咱們聚焦在 trap 方法的調用跟其 block 上,查閱 Ruby 文檔,發現 trapSignal 模塊下的一個方法,Signal 主要是處理與系統信號有關的任務,而後 trap 的做用是:

Specifies the handling of signals. The first parameter is a signal name (a string such as 「SIGALRM」, 「SIGUSR1」, and so on) or a signal number...

因此,前面的那段代碼的意思就很容易理解了,Sidekiq 註冊了對 INTTERMUSR1USR2以及TTIN等系統信號的處理,而在進程收到這些信號時,就會執行 self_write.puts(sig),也就是將收到的信號經過以前介紹的管道寫端 self_write 記錄下來。什麼?只記錄下來,那還得處理啊?!

稍安勿躁,讓咱們接着往下分析 Sidekiq::CLI#run 方法末尾的代碼:

begin
  launcher.run

  while readable_io = IO.select([self_read])
    signal = readable_io.first[0].gets.strip
    handle_signal(signal)
  end
rescue Interrupt
  logger.info 'Shutting down'
  launcher.stop
  # Explicitly exit so busy Processor threads can't block
  # process shutdown.
  logger.info "Bye!"
  exit(0)
end

看到沒有,這裏有個循環,循環控制條件裏,readable_io = IO.select([self_read]) 是從前面的管道的讀端 self_read 阻塞地等待信號的到達。對於 IO.selectRuby 官方文檔介紹以下:

Calls select(2) system call. It monitors given arrays of IO objects, waits until one or more of IO objects are ready for reading, are ready for writing, and have pending exceptions respectively, and returns an array that contains arrays of those IO objects.

因此這裏就是說 Sidekiq 主線程首先負責執行完其餘初始化工做,最後阻塞在信號等待以及處理。在其等到新的信號以後,進入上面代碼展現的循環體:

signal = readable_io.first[0].gets.strip
handle_signal(signal)

這裏語法細節先不深究,咱們看下這兩行代碼第一行是從前面說的管道中讀取信號,而且將信號傳遞給 handle_signal 方法,讓咱們接着往下看 handle_signal 方法的定義:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/cli.rb#L125-L153
def handle_signal(sig)
  Sidekiq.logger.debug "Got #{sig} signal"
  case sig
  when 'INT'
    # Handle Ctrl-C in JRuby like MRI
    # http://jira.codehaus.org/browse/JRUBY-4637
    raise Interrupt
  when 'TERM'
    # Heroku sends TERM and then waits 10 seconds for process to exit.
    raise Interrupt
  when 'USR1'
    Sidekiq.logger.info "Received USR1, no longer accepting new work"
    launcher.quiet
  when 'USR2'
    if Sidekiq.options[:logfile]
      Sidekiq.logger.info "Received USR2, reopening log file"
      Sidekiq::Logging.reopen_logs
    end
  when 'TTIN'
    Thread.list.each do |thread|
      Sidekiq.logger.warn "Thread TID-#{thread.object_id.to_s(36)} #{thread['label']}"
      if thread.backtrace
        Sidekiq.logger.warn thread.backtrace.join("\n")
      else
        Sidekiq.logger.warn "<no backtrace available>"
      end
    end
  end
end

這裏的代碼挺長,可是一點都不難理解,我簡單解釋下就夠了。當進程:

  1. 收到 TERM 或者 INT信號時,直接拋出 Interrupt 中斷;

  2. 收到 USR1 信號時,則通知 launcher 執行 .quiet 方法,Sidekiq 在這裏進入 Quiet 模式(怎麼進入?);

  3. 收到 USR2 信號時,從新打開日誌;

  4. 收到 TTIN 信號時,打印全部線程當前正在執行的代碼列表。

到此,一個信號從收到被存下,到被取出處理的大體過程就是這樣的,至於具體的處理方式,咱們下個章節詳細展開。如今有一點須要補充的是,上面講當 Sidekiq 收到 TERM 或者 INT 信號時,都會拋出 Interrupt 中斷異常,那這個異常又是如何處理的呢?咱們回過頭去看剛纔最開始的 Sidekiq::CLI#run 方法末尾的代碼:

begin
  launcher.run

  while readable_io = IO.select([self_read])
    signal = readable_io.first[0].gets.strip
    handle_signal(signal)
  end
rescue Interrupt
  logger.info 'Shutting down'
  launcher.stop
  # Explicitly exit so busy Processor threads can't block
  # process shutdown.
  logger.info "Bye!"
  exit(0)
end

原來是 run 方法在處理信號時,聲明瞭 rescue Interrupt,捕捉了 Interrupt 中斷異常,而且在異常處理時打印必要日誌,同時執行 launcher.stop 通知各個線程中止工做,最後調用 exit 方法強制退出進程,到此,一個 Sidekiq 進程就完全退出了。
可是問題又來了,信號處理的大體過程我是知道了,可是具體的 launcher.quietlauncher.stop 都幹了些什麼呢?

Sidekiq::Launcher#quiet 源碼探索

老規矩,先上代碼:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/launcher.rb#L32-L36
def quiet
  @done = true
  @manager.quiet
  @poller.terminate
end

代碼只有短短三行。 Launcher 對象首先設置本身的實例變量 @done 的值爲 true,接着執行 @manager.quiet 以及 @poller.terminate。看方法命名上理解,應該是 Luancher 對象又將 quiet 的消息傳遞給了 @managerSidekiq::Manager 對象,同時通知 @pollerSidekiq::Scheduled::Poller 對象結束工做。那究竟是不是真的這樣呢?讓咱們繼續深挖!

Sidekiq::Manager#quiet

讓咱們來看看 Sidekiq::Manager#quiet 方法的代碼

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L51-L58
def quiet
  return if @done
  @done = true

  logger.info { "Terminating quiet workers" }
  @workers.each { |x| x.terminate }
  fire_event(:quiet, true)
end

上面的代碼也很短,首先將 Sidekiq::Manager 對象自身的 @done 實例變量的值設置爲 true,接着對其所管理的每個 worker,都發出一個 terminate 消息。讓咱們接着往下看 worker 對象(Sidekiq::Processor 對象)的 #terminate 方法定義:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L42-L46
def terminate(wait=false)
  @done = true
  return if !@thread
  @thread.value if wait
end

這裏的代碼依然保持了精短的特色!跟上一層邏輯同樣,worker 在處理 terminate 時,一樣設置本身的 @done 實例變量爲 true 後返回,可是,若是其參數 waittrue,則會保持主線程等待,直到 @thread 線程退出(@thread.value 至關於執行 @thread.join而且返回線程的返回值,可參考 Ruby 文檔)。

那麼,這裏就要問了,worker 設置 @done 爲 true 是要幹嗎?這裏好像也沒有作什麼特別的事啊?!勿急,還記得上篇文章介紹 worker 運行時的核心代碼嗎?

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L66-L77
def run
  begin
    while !@done
      process_one
    end
    @mgr.processor_stopped(self)
  rescue Sidekiq::Shutdown
    @mgr.processor_stopped(self)
  rescue Exception => ex
    @mgr.processor_died(self, ex)
  end
end

看到了吧,@done 變量但是一個重要的開關,當 @donefalse 時,worker 一直周而復始地從隊列中取任務而且老老實實幹活;而當 @donetrue 時,worker 在處理完當前的任務以後,便再也不執行新的任務,執行 @msg.processor_stopped(self) 通知 worker 管理器本身已經退出工做,最終 #run 方法返回。因爲 #run 方法是在獨立線程裏執行的,因此當 #run 方法返回時,其所在的線程天然也就退出了。

那關於 worker 的 quiet 模式進入過程就是這麼簡單,經過一個共享變量 @done 便實現了對工做線程的控制。

Sidekiq::Scheduled::Poller#terminate

前面說到 Sidekiq::Launcher#quiet 執行時,先將消息傳遞給了 worker 管理器,隨後執行了 @poller.terminate,那咱們來看看 #terminate 方法的定義:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L53-L61
def terminate
  @done = true
  if @thread
    t = @thread
    @thread = nil
    @sleeper << 0
    t.value
  end
end

又是如此簡短的代碼。poller 退出的邏輯跟 worker 退出的邏輯很是一致,都是一樣先設置本身的 @done 實例變量的值爲 true,接着等待線程 @thread 退出,最後 poller 返回。

那麼,poller 的 @done 是否是也是用來控制線程退出呢?答案是確定的!

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L63-L73
def start
  @thread ||= safe_thread("scheduler") do
    initial_wait

    while !@done
      enqueue
      wait
    end
    Sidekiq.logger.info("Scheduler exiting...")
  end
end

還記得上面這段代碼嗎? poller 在每次將定時任務壓回任務隊列以後,等待必定時間,而後從新檢查 @done 的值,若是爲 true,則 poller 直接返回退出,由於 #start 方法裏的循環體在新線程中執行,當循環結束時,線程天然也退出了。

小結

  1. 當 Sidekiq 收到 USR1 系統信號時,Sidekiq 主線程向 @launcher 發送 quiet 消息,@launcher 又將消息傳遞給 @manager ,同時向 @poller 發出 terminate 消息;

  2. @manager 在收到 quiet 消息時,逐一對運行中的 worker 發送 terminate 消息,worker 收到消息後,設置本身的 @donetrue,標識再也不處理新任務,當前任務處理完成後退出線程;

  3. @poller 在收到 terminate 消息後,也是設置本身的 @donetrue,在本次任務執行完畢後,線程也退出;

  4. Sidekiq 進入 quiet 模式以後,全部未處理任務以及新任務都再也不處理,直到 sidekiq 的下一次重啓。

Sidekiq::Launcher#stop 源碼探索

前面介紹的是 Sidekiq 進入 quiet 模式的過程,那 Sidekiq 的中止過程又是怎樣的呢?

讓咱們從 Sidekiq::Launcher#stop 方法開始尋找答案:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/launcher.rb#L41-L56
def stop
  deadline = Time.now + @options[:timeout]

  @done = true
  @manager.quiet
  @poller.terminate

  @manager.stop(deadline)

  # Requeue everything in case there was a worker who grabbed work while stopped
  # This call is a no-op in Sidekiq but necessary for Sidekiq Pro.
  strategy = (@options[:fetch] || Sidekiq::BasicFetch)
  strategy.bulk_requeue([], @options)

  clear_heartbeat
end

首先,Sidekiq::Launcher 對象設定了一個強制退出的 deadline,時間是以當前時間加上配置的 timeout,這個時間默認是 8 秒

接着,設定對象自己的 @done 變量的值爲 true,而後分別對 @manager@poller 發送 quietterminate 消息,這個過程就是咱們上面說的 Sidekiq::Launcher#quiet 的過程,因此,這裏的代碼主要是 Sidekiq 要確保退出前已經通知各個線程準備退出。

接下來的代碼就比較重要了,咱們先看這一行:

@manager.stop(deadline)

在通知完 @manager 進入 quiet 模式以後,launcher 向 @manager 發送了 stop 消息,而且同時傳遞了 deadline 參數。讓咱們接着繼續往下看:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L61-L83
PAUSE_TIME = STDOUT.tty? ? 0.1 : 0.5

def stop(deadline)
  quiet
  fire_event(:shutdown, true)

  # some of the shutdown events can be async,
  # we don't have any way to know when they're done but
  # give them a little time to take effect
  sleep PAUSE_TIME
  return if @workers.empty?

  logger.info { "Pausing to allow workers to finish..." }
  remaining = deadline - Time.now
  while remaining > PAUSE_TIME
    return if @workers.empty?
    sleep PAUSE_TIME
    remaining = deadline - Time.now
  end
  return if @workers.empty?

  hard_shutdown
end

上面的代碼,manager 首先調用了自身的 quiet 方法(這裏就真的畫蛇添足了,由於外層的 launcher 已經調用過一次了),而後 manager 執行 sleep 系統調用進入休眠,持續時間爲 0.5 秒,休眠結束後檢查全部 worker 是否已經都退出,若是退出,則直接返回,任務提早結束;若是仍有 worker 未退出,則檢查當前時間是否接近強制退出的 deadline,若是不是,則重複「檢查全部 worker 退出 - 休眠」 的過程,直到 deadline 來臨,或者 worker 線程都已經所有退出。若是最後到達 deadline,仍有 worker 線程未退出,則最後執行 hard_shutdown

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L108-L135
def hard_shutdown
  cleanup = nil
  @plock.synchronize do
    cleanup = @workers.dup
  end

  if cleanup.size > 0
    jobs = cleanup.map {|p| p.job }.compact

    # ... other codes

    strategy = (@options[:fetch] || Sidekiq::BasicFetch)
    strategy.bulk_requeue(jobs, @options)
  end

  cleanup.each do |processor|
    processor.kill
  end
end

這裏 hard_shutdown 方法在執行時,首先克隆了當前仍未退出的 @workers 列表,接着獲取每一個 worker 當前正在處理的任務,將這些正在執行中的任務數據經過 strategy.bulk_requeue(jobs, @options) 從新寫回隊列,而最後對每個 worker 發送 kill 消息:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L48-L58
def kill(wait=false)
  @done = true
  return if !@thread

  @thread.raise ::Sidekiq::Shutdown
  @thread.value if wait
end

worker 在收到 kill 消息時,首先設置本身的 @donetrue,最後向 worker 所關聯的線程拋出 ::Sidekiq::Shutdown 異常。讓咱們看看 worker 的線程又是如何處理異常的:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L66-L77
def run
  begin
    while !@done
      process_one
    end
    @mgr.processor_stopped(self)
  rescue Sidekiq::Shutdown
    @mgr.processor_stopped(self)
  rescue Exception => ex
    @mgr.processor_died(self, ex)
  end
end

又回到 worker 的 run 方法這裏,能夠看到,run 方法捕捉了 Sidekiq::Shutdown 異常,而且在處理異常時,只是執行 @mgr.processor_stopped(self),通知 manager 本身已經退出,因爲已經跳出正常流程,worker 的 run 方法返回,線程也所以得以退出。至此,worker 也都正常退出了。

小結

  1. launcher 在執行退出時,首先按照 quiet 的流程先通知各個線程準備退出;

  2. 接着 launcher 向 manager 下達 stop 指令,而且給出最後期限(deadline);

  3. manager 在給定的限時內,儘量等待全部 worker 執行完本身退出,對於到達限時仍未退出的 worker,manager 備份了每一個 worker 的當前任務,從新加入隊列,確保任務至少完整執行一次,而後經過向線程拋出異常的方式,迫使 worker 的線程被動退出。

總結

  1. Sidekiq 簡單高效利用了系統信號,而且有比較清晰明瞭的信號處理過程;

  2. Sidekiq 在信號處理的過程當中,各個組件協調頗有條理,消息逐級傳遞,並且對被強制中止的任務也有備份方案;

  3. 咱們能夠從 Sidekiq 的系統信號處理機制上借鑑很多東西,好比經常使用系統信號的分類處理等;

  4. 對於多線程的控制,經過共享變量以及異常的方式作到 graceful 以及 hard 兩種方式的退出處理。

  5. 還有不少,一百我的心中有一百個哈姆萊特,一樣一份代碼,不一樣的人學習閱讀,確定收穫不一樣,你能夠在評論區留下你的感悟,跟看到這篇文章的人一塊兒分享!

問題思考

  1. 爲了儘量確保全部 Sidekiq 的任務可以正常主動退出,因此在部署腳本中,都會盡量早地讓 Sidekiq 進入 quiet 模式,可是 Sidekiq 的 quiet 是不可逆的,因此一旦部署腳本中途失敗,Sidekiq 得不到重啓,將會一直保持 quiet 狀態,若是長時間未重啓,任務就會積壓。因此,通常我都會在部署腳本中,額外捕捉部署腳本失敗異常,而後主動執行 sidekiq 的重啓。若是你的部署腳本中有涉及 Sidekiq 的,必定要注意檢查部署失敗是否會影響 Sidekiq 的狀態

  2. 雖然 Sidekiq 在強制退出當前長時間未退出的任務時,會將 job 的數據寫回隊列,等待重啓後從新執行,那麼這裏就有個細節須要注意了,就是你的 job 必須是冪等的,不然就不能容許從新執行了。因此,請注意,若是你有須要長時間運行的 job,請注意檢查其冪等性

好了,今天就寫到這吧!仍然挺長一篇,囉嗦了。感謝看到這裏!

相關文章
相關標籤/搜索