在以前的文章《Sidekiq任務調度流程分析》中,咱們一塊兒仔細分析了 Sidekiq 是如何基於多線程完成隊列任務處理以及調度的。咱們在以前的分析裏,看到了不論是 Sidekiq::Scheduled::Poller
仍是 Sidekiq::Processor
的核心代碼裏,都會有一個由 @done
實例變量控制的循環體:
<!-- More -->html
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L63-L73 def start @thread ||= safe_thread("scheduler") do initial_wait while !@done # 這是 poller 的循環控制 enqueue wait end Sidekiq.logger.info("Scheduler exiting...") end end
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L66-L77 def run begin while !@done # 這是咱們常說的 worker 循環控制 process_one end @mgr.processor_stopped(self) rescue Sidekiq::Shutdown @mgr.processor_stopped(self) rescue Exception => ex @mgr.processor_died(self, ex) end end
也就是說,這些 @done
實例變量決定了 poller
線程跟 worker
線程是否循環執行?一旦 @done
被改成 true
,那循環體就再也不執行,線程天然也就是退出了。因而,單從這些代碼,咱們能夠判定, Sidekiq 就是經過設置 @done
的值來通知一個線程安全退出(graceful exit)的。咱們也知道,生產環境中,咱們是經過發送信號的方式來告訴 sidekiq 退出或者進入靜默(quiet)狀態的,那麼,這裏的 @done
是怎麼跟信號處理聯繫起來的呢?這些就是今天這篇文章的重點了!git
今天的分析所參考的 sidekiq 的源碼對應版本是 4.2.3;github
今天所討論的內容,將主要圍繞系統信號處理進行分析,無關細節將不贅述,若有須要,請自行翻閱 sidekiq 源碼;數組
今天的文章跟上篇的《Sidekiq任務調度流程分析》緊密相關,上篇文章介紹的啓動過程跟任務調度會幫助這篇文章的理解,若是尚未閱讀上篇文章的,建議先閱讀後再來閱讀這一篇信號處理的文章。安全
Sidekiq 信號處理機制;ruby
爲何重啓 Sidekiq 時,USR1
信號(即進入 quiet
模式)須要儘量早,而進程的退出重啓須要儘量晚。多線程
由於前一篇文章着眼於任務調度,因此略過了其餘無關細節,包括信號處理,這篇文章則將鏡頭對準信號處理,因此讓咱們從頭再來一遍,只是這一次,咱們只關心與信號處理有關的代碼。async
依舊是從 cli.rb
文件開始,它是 Sidekiq 核心代碼的生命起點,由於 Sidekiq 命令行啓動後,它是第一個被執行的代碼,Sidekiq 啓動過程當中調用了 Sidekiq::CLI#run
方法:ide
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/cli.rb#L49-L106 def run boot_system print_banner self_read, self_write = IO.pipe %w(INT TERM USR1 USR2 TTIN).each do |sig| begin trap sig do self_write.puts(sig) end rescue ArgumentError puts "Signal #{sig} not supported" end end # ... other codes begin launcher.run while readable_io = IO.select([self_read]) signal = readable_io.first[0].gets.strip handle_signal(signal) end rescue Interrupt logger.info 'Shutting down' launcher.stop # Explicitly exit so busy Processor threads can't block # process shutdown. logger.info "Bye!" exit(0) end
以上的代碼就是整個 Sidekiq 最頂層的信號處理的核心代碼了,讓咱們慢慢分析!
首先,self_read, self_write = IO.pipe
建立了一個模擬管道的 IO 對象,而且同時返回這個 管道的一個寫端以及一個讀端,經過這兩端,就能夠實現對管道的讀寫了。須要注意的是,IO.pipe
建立的讀端在讀的時候不會自動生成 EOF
符,因此這就要求讀時,寫端是關閉的,而寫時,讀端是關閉的,一句話說,就是這樣的管道不容許讀寫端同時打開。關於 IO.pipe
還有挺多細節跟須要注意的點,若是還須要瞭解,請閱讀官方文檔。學習
上面說的管道本質上只是一個 IO 對象而已,暫時不用糾結太多,讓咱們接着往下讀:
%w(INT TERM USR1 USR2 TTIN).each do |sig| begin trap sig do self_write.puts(sig) end rescue ArgumentError puts "Signal #{sig} not supported" end end
這段代碼就比較有意思了,最外層遍歷了一個系統信號的數組,而後逐個信號進行監聽(trap,或者叫捕捉?)。讓咱們聚焦在 trap
方法的調用跟其 block 上,查閱 Ruby 文檔,發現 trap
是 Signal
模塊下的一個方法,Signal
主要是處理與系統信號有關的任務,而後 trap
的做用是:
Specifies the handling of signals. The first parameter is a signal name (a string such as 「SIGALRM」, 「SIGUSR1」, and so on) or a signal number...
因此,前面的那段代碼的意思就很容易理解了,Sidekiq 註冊了對 INT
、TERM
、USR1
、USR2
以及TTIN
等系統信號的處理,而在進程收到這些信號時,就會執行 self_write.puts(sig)
,也就是將收到的信號經過以前介紹的管道寫端 self_write
記錄下來。什麼?只記錄下來,那還得處理啊?!
稍安勿躁,讓咱們接着往下分析 Sidekiq::CLI#run
方法末尾的代碼:
begin launcher.run while readable_io = IO.select([self_read]) signal = readable_io.first[0].gets.strip handle_signal(signal) end rescue Interrupt logger.info 'Shutting down' launcher.stop # Explicitly exit so busy Processor threads can't block # process shutdown. logger.info "Bye!" exit(0) end
看到沒有,這裏有個循環,循環控制條件裏,readable_io = IO.select([self_read])
是從前面的管道的讀端 self_read
阻塞地等待信號的到達。對於 IO.select
,Ruby 官方文檔介紹以下:
Calls select(2) system call. It monitors given arrays of IO objects, waits until one or more of IO objects are ready for reading, are ready for writing, and have pending exceptions respectively, and returns an array that contains arrays of those IO objects.
因此這裏就是說 Sidekiq 主線程首先負責執行完其餘初始化工做,最後阻塞在信號等待以及處理。在其等到新的信號以後,進入上面代碼展現的循環體:
signal = readable_io.first[0].gets.strip handle_signal(signal)
這裏語法細節先不深究,咱們看下這兩行代碼第一行是從前面說的管道中讀取信號,而且將信號傳遞給 handle_signal
方法,讓咱們接着往下看 handle_signal
方法的定義:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/cli.rb#L125-L153 def handle_signal(sig) Sidekiq.logger.debug "Got #{sig} signal" case sig when 'INT' # Handle Ctrl-C in JRuby like MRI # http://jira.codehaus.org/browse/JRUBY-4637 raise Interrupt when 'TERM' # Heroku sends TERM and then waits 10 seconds for process to exit. raise Interrupt when 'USR1' Sidekiq.logger.info "Received USR1, no longer accepting new work" launcher.quiet when 'USR2' if Sidekiq.options[:logfile] Sidekiq.logger.info "Received USR2, reopening log file" Sidekiq::Logging.reopen_logs end when 'TTIN' Thread.list.each do |thread| Sidekiq.logger.warn "Thread TID-#{thread.object_id.to_s(36)} #{thread['label']}" if thread.backtrace Sidekiq.logger.warn thread.backtrace.join("\n") else Sidekiq.logger.warn "<no backtrace available>" end end end end
這裏的代碼挺長,可是一點都不難理解,我簡單解釋下就夠了。當進程:
收到 TERM
或者 INT
信號時,直接拋出 Interrupt
中斷;
收到 USR1
信號時,則通知 launcher
執行 .quiet
方法,Sidekiq 在這裏進入 Quiet 模式(怎麼進入?);
收到 USR2
信號時,從新打開日誌;
收到 TTIN
信號時,打印全部線程當前正在執行的代碼列表。
到此,一個信號從收到被存下,到被取出處理的大體過程就是這樣的,至於具體的處理方式,咱們下個章節詳細展開。如今有一點須要補充的是,上面講當 Sidekiq 收到 TERM
或者 INT
信號時,都會拋出 Interrupt
中斷異常,那這個異常又是如何處理的呢?咱們回過頭去看剛纔最開始的 Sidekiq::CLI#run
方法末尾的代碼:
begin launcher.run while readable_io = IO.select([self_read]) signal = readable_io.first[0].gets.strip handle_signal(signal) end rescue Interrupt logger.info 'Shutting down' launcher.stop # Explicitly exit so busy Processor threads can't block # process shutdown. logger.info "Bye!" exit(0) end
原來是 run
方法在處理信號時,聲明瞭 rescue Interrupt
,捕捉了 Interrupt
中斷異常,而且在異常處理時打印必要日誌,同時執行 launcher.stop
通知各個線程中止工做,最後調用 exit
方法強制退出進程,到此,一個 Sidekiq 進程就完全退出了。
可是問題又來了,信號處理的大體過程我是知道了,可是具體的 launcher.quiet
跟 launcher.stop
都幹了些什麼呢?
老規矩,先上代碼:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/launcher.rb#L32-L36 def quiet @done = true @manager.quiet @poller.terminate end
代碼只有短短三行。 Launcher 對象首先設置本身的實例變量 @done
的值爲 true
,接着執行 @manager.quiet
以及 @poller.terminate
。看方法命名上理解,應該是 Luancher 對象又將 quiet 的消息傳遞給了 @manager
即 Sidekiq::Manager
對象,同時通知 @poller
即 Sidekiq::Scheduled::Poller
對象結束工做。那究竟是不是真的這樣呢?讓咱們繼續深挖!
讓咱們來看看 Sidekiq::Manager#quiet
方法的代碼
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L51-L58 def quiet return if @done @done = true logger.info { "Terminating quiet workers" } @workers.each { |x| x.terminate } fire_event(:quiet, true) end
上面的代碼也很短,首先將 Sidekiq::Manager
對象自身的 @done
實例變量的值設置爲 true
,接着對其所管理的每個 worker,都發出一個 terminate
消息。讓咱們接着往下看 worker 對象(Sidekiq::Processor
對象)的 #terminate
方法定義:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L42-L46 def terminate(wait=false) @done = true return if !@thread @thread.value if wait end
這裏的代碼依然保持了精短的特色!跟上一層邏輯同樣,worker 在處理 terminate
時,一樣設置本身的 @done
實例變量爲 true
後返回,可是,若是其參數 wait
爲 true
,則會保持主線程等待,直到 @thread
線程退出(@thread.value
至關於執行 @thread.join
而且返回線程的返回值,可參考 Ruby 文檔)。
那麼,這裏就要問了,worker 設置 @done
爲 true 是要幹嗎?這裏好像也沒有作什麼特別的事啊?!勿急,還記得上篇文章介紹 worker 運行時的核心代碼嗎?
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L66-L77 def run begin while !@done process_one end @mgr.processor_stopped(self) rescue Sidekiq::Shutdown @mgr.processor_stopped(self) rescue Exception => ex @mgr.processor_died(self, ex) end end
看到了吧,@done
變量但是一個重要的開關,當 @done
爲 false
時,worker 一直周而復始地從隊列中取任務而且老老實實幹活;而當 @done
爲 true
時,worker 在處理完當前的任務以後,便再也不執行新的任務,執行 @msg.processor_stopped(self)
通知 worker 管理器本身已經退出工做,最終 #run
方法返回。因爲 #run
方法是在獨立線程裏執行的,因此當 #run
方法返回時,其所在的線程天然也就退出了。
那關於 worker 的 quiet 模式進入過程就是這麼簡單,經過一個共享變量 @done
便實現了對工做線程的控制。
前面說到 Sidekiq::Launcher#quiet
執行時,先將消息傳遞給了 worker 管理器,隨後執行了 @poller.terminate
,那咱們來看看 #terminate
方法的定義:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L53-L61 def terminate @done = true if @thread t = @thread @thread = nil @sleeper << 0 t.value end end
又是如此簡短的代碼。poller 退出的邏輯跟 worker 退出的邏輯很是一致,都是一樣先設置本身的 @done
實例變量的值爲 true
,接着等待線程 @thread
退出,最後 poller 返回。
那麼,poller 的 @done
是否是也是用來控制線程退出呢?答案是確定的!
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L63-L73 def start @thread ||= safe_thread("scheduler") do initial_wait while !@done enqueue wait end Sidekiq.logger.info("Scheduler exiting...") end end
還記得上面這段代碼嗎? poller 在每次將定時任務壓回任務隊列以後,等待必定時間,而後從新檢查 @done
的值,若是爲 true
,則 poller 直接返回退出,由於 #start
方法裏的循環體在新線程中執行,當循環結束時,線程天然也退出了。
當 Sidekiq 收到 USR1
系統信號時,Sidekiq 主線程向 @launcher
發送 quiet
消息,@launcher
又將消息傳遞給 @manager
,同時向 @poller
發出 terminate
消息;
@manager
在收到 quiet
消息時,逐一對運行中的 worker 發送 terminate
消息,worker 收到消息後,設置本身的 @done
爲 true
,標識再也不處理新任務,當前任務處理完成後退出線程;
@poller
在收到 terminate
消息後,也是設置本身的 @done
爲 true
,在本次任務執行完畢後,線程也退出;
Sidekiq 進入 quiet 模式以後,全部未處理任務以及新任務都再也不處理,直到 sidekiq 的下一次重啓。
前面介紹的是 Sidekiq 進入 quiet 模式的過程,那 Sidekiq 的中止過程又是怎樣的呢?
讓咱們從 Sidekiq::Launcher#stop
方法開始尋找答案:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/launcher.rb#L41-L56 def stop deadline = Time.now + @options[:timeout] @done = true @manager.quiet @poller.terminate @manager.stop(deadline) # Requeue everything in case there was a worker who grabbed work while stopped # This call is a no-op in Sidekiq but necessary for Sidekiq Pro. strategy = (@options[:fetch] || Sidekiq::BasicFetch) strategy.bulk_requeue([], @options) clear_heartbeat end
首先,Sidekiq::Launcher
對象設定了一個強制退出的 deadline
,時間是以當前時間加上配置的 timeout
,這個時間默認是 8 秒。
接着,設定對象自己的 @done
變量的值爲 true
,而後分別對 @manager
和 @poller
發送 quiet
和 terminate
消息,這個過程就是咱們上面說的 Sidekiq::Launcher#quiet
的過程,因此,這裏的代碼主要是 Sidekiq 要確保退出前已經通知各個線程準備退出。
接下來的代碼就比較重要了,咱們先看這一行:
@manager.stop(deadline)
在通知完 @manager
進入 quiet 模式以後,launcher 向 @manager
發送了 stop
消息,而且同時傳遞了 deadline
參數。讓咱們接着繼續往下看:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L61-L83 PAUSE_TIME = STDOUT.tty? ? 0.1 : 0.5 def stop(deadline) quiet fire_event(:shutdown, true) # some of the shutdown events can be async, # we don't have any way to know when they're done but # give them a little time to take effect sleep PAUSE_TIME return if @workers.empty? logger.info { "Pausing to allow workers to finish..." } remaining = deadline - Time.now while remaining > PAUSE_TIME return if @workers.empty? sleep PAUSE_TIME remaining = deadline - Time.now end return if @workers.empty? hard_shutdown end
上面的代碼,manager 首先調用了自身的 quiet
方法(這裏就真的畫蛇添足了,由於外層的 launcher 已經調用過一次了),而後 manager 執行 sleep
系統調用進入休眠,持續時間爲 0.5 秒,休眠結束後檢查全部 worker 是否已經都退出,若是退出,則直接返回,任務提早結束;若是仍有 worker 未退出,則檢查當前時間是否接近強制退出的 deadline,若是不是,則重複「檢查全部 worker 退出 - 休眠」 的過程,直到 deadline 來臨,或者 worker 線程都已經所有退出。若是最後到達 deadline,仍有 worker 線程未退出,則最後執行 hard_shutdown
。
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L108-L135 def hard_shutdown cleanup = nil @plock.synchronize do cleanup = @workers.dup end if cleanup.size > 0 jobs = cleanup.map {|p| p.job }.compact # ... other codes strategy = (@options[:fetch] || Sidekiq::BasicFetch) strategy.bulk_requeue(jobs, @options) end cleanup.each do |processor| processor.kill end end
這裏 hard_shutdown
方法在執行時,首先克隆了當前仍未退出的 @workers
列表,接着獲取每一個 worker 當前正在處理的任務,將這些正在執行中的任務數據經過 strategy.bulk_requeue(jobs, @options)
從新寫回隊列,而最後對每個 worker 發送 kill
消息:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L48-L58 def kill(wait=false) @done = true return if !@thread @thread.raise ::Sidekiq::Shutdown @thread.value if wait end
worker 在收到 kill
消息時,首先設置本身的 @done
爲 true
,最後向 worker 所關聯的線程拋出 ::Sidekiq::Shutdown
異常。讓咱們看看 worker 的線程又是如何處理異常的:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L66-L77 def run begin while !@done process_one end @mgr.processor_stopped(self) rescue Sidekiq::Shutdown @mgr.processor_stopped(self) rescue Exception => ex @mgr.processor_died(self, ex) end end
又回到 worker 的 run
方法這裏,能夠看到,run
方法捕捉了 Sidekiq::Shutdown
異常,而且在處理異常時,只是執行 @mgr.processor_stopped(self)
,通知 manager 本身已經退出,因爲已經跳出正常流程,worker 的 run
方法返回,線程也所以得以退出。至此,worker 也都正常退出了。
launcher 在執行退出時,首先按照 quiet 的流程先通知各個線程準備退出;
接着 launcher 向 manager 下達 stop
指令,而且給出最後期限(deadline
);
manager 在給定的限時內,儘量等待全部 worker 執行完本身退出,對於到達限時仍未退出的 worker,manager 備份了每一個 worker 的當前任務,從新加入隊列,確保任務至少完整執行一次,而後經過向線程拋出異常的方式,迫使 worker 的線程被動退出。
Sidekiq 簡單高效利用了系統信號,而且有比較清晰明瞭的信號處理過程;
Sidekiq 在信號處理的過程當中,各個組件協調頗有條理,消息逐級傳遞,並且對被強制中止的任務也有備份方案;
咱們能夠從 Sidekiq 的系統信號處理機制上借鑑很多東西,好比經常使用系統信號的分類處理等;
對於多線程的控制,經過共享變量以及異常的方式作到 graceful
以及 hard
兩種方式的退出處理。
還有不少,一百我的心中有一百個哈姆萊特,一樣一份代碼,不一樣的人學習閱讀,確定收穫不一樣,你能夠在評論區留下你的感悟,跟看到這篇文章的人一塊兒分享!
爲了儘量確保全部 Sidekiq 的任務可以正常主動退出,因此在部署腳本中,都會盡量早地讓 Sidekiq 進入 quiet 模式,可是 Sidekiq 的 quiet 是不可逆的,因此一旦部署腳本中途失敗,Sidekiq 得不到重啓,將會一直保持 quiet 狀態,若是長時間未重啓,任務就會積壓。因此,通常我都會在部署腳本中,額外捕捉部署腳本失敗異常,而後主動執行 sidekiq 的重啓。若是你的部署腳本中有涉及 Sidekiq 的,必定要注意檢查部署失敗是否會影響 Sidekiq 的狀態
雖然 Sidekiq 在強制退出當前長時間未退出的任務時,會將 job 的數據寫回隊列,等待重啓後從新執行,那麼這裏就有個細節須要注意了,就是你的 job 必須是冪等的,不然就不能容許從新執行了。因此,請注意,若是你有須要長時間運行的 job,請注意檢查其冪等性。
好了,今天就寫到這吧!仍然挺長一篇,囉嗦了。感謝看到這裏!