最近在學習了下Google的Guava包,發現這真是一個好東西啊。。因爲平時也會寫一些基於多線程的東西,因此特地瞭解了下這個Service框架。這裏Guava包裏的Service接口用於封裝一個服務對象的運行狀態、包括start和stop等方法。例如web服務器,RPC服務器、計時器等能夠實現這個接口。對此類服務的狀態管理並不輕鬆、須要對服務的開啓/關閉進行妥善管理、特別是在多線程環境下尤其複雜。Guava包提供了一些基礎類幫助你管理複雜的狀態轉換邏輯和同步細節。java
一個服務正常生命週期有:web
Service也提供了一些方法用於等待服務狀態轉換的完成:經過 addListener()方法異步添加監聽器。此方法容許你添加一個 Service.Listener 、它會在每次服務狀態轉換的時候被調用。注意:最好在服務啓動以前添加Listener(這時的狀態是NEW)、不然以前已發生的狀態轉換事件是沒法在新添加的Listener上被從新觸發的。安全
同步使用awaitRunning()。這個方法不能被打斷、不強制捕獲異常、一旦服務啓動就會返回。若是服務沒有成功啓動,會拋出IllegalStateException異常。一樣的, awaitTerminated() 方法會等待服務達到終止狀態(TERMINATED 或者 FAILED)。兩個方法都有重載方法容許傳入超時時間。服務器
Service 接口自己實現起來會比較複雜、且容易碰到一些捉摸不透的問題。所以咱們不推薦直接實現這個接口。而是請繼承Guava包裏已經封裝好的基礎抽象類。每一個基礎類支持一種特定的線程模型。網絡
AbstractIdleService在咱們服務處於running狀態時,不會作執行任何動做,咱們僅僅只有在startup和shutdown的時候才執行一些動做,因此咱們在實現這個方法時,只是簡單的實現startUp() 和 shutDown() 這兩個方法便可,在startUp方法中作一些好比初始化,註冊等操做,在shutDown中作一些清理操做等。舉個例子,也就是官網的例子:多線程
protected void startUp() { servlets.add(new GcStatsServlet()); } protected void shutDown() {}
咱們在startUp()方法的時候,實例化了一個GcStatsServlet,當咱們在運行的時候,會有現成的線程處理這個Servlet,因此在服務運行時就不須要作什麼額外動做了。這個比較簡單,就不舉例子了,應該用的狀況應該不會不少吧?。。。。併發
AbstractExecutionThreadService在單個線程中執行startup, running, and shutdown,咱們必須實現run()方法,同事在方法中要能響應中止服務的請求,好比在一個循環中:框架
import com.google.common.util.concurrent.AbstractExecutionThreadService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import java.util.concurrent.TimeUnit; /** * User: hupeng * Date: 14-12-22 * Time: 下午10:17 */ public class AbstractExecutionThreadServiceTest extends AbstractExecutionThreadService { private volatile boolean running = true; //聲明一個狀態 @Override protected void startUp() { ////在這裏咱們能夠作一些初始化操做 } @Override public void run() { while (running) { // do our work try { Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); System.out.println("do our work....."); } catch (Exception e) { //處理異常,這裏若是拋出異常,會使服務狀態變爲failed同時致使任務終止。 } } } @Override protected void triggerShutdown() { running = false; //這裏咱們改變狀態值,run方法中就可以獲得響應。= //能夠作一些清理操做,也能夠移到shutDown()方法中執行 } @Override protected void shutDown() throws Exception { //能夠關閉資源,關閉鏈接。。。 } public static void main(String[] args) throws InterruptedException { AbstractExecutionThreadServiceTest service = new AbstractExecutionThreadServiceTest(); service.addListener(new Listener() { @Override public void starting() { System.out.println("服務開始啓動....."); } @Override public void running() { System.out.println("服務開始運行");; } @Override public void stopping(State from) { System.out.println("服務關閉中"); } @Override public void terminated(State from) { System.out.println("服務終止"); } @Override public void failed(State from, Throwable failure) { System.out.println("失敗,cause:" + failure.getCause()); } }, MoreExecutors.directExecutor()); service.startAsync().awaitRunning(); System.out.println("服務狀態爲:" + service.state()); Thread.sleep(10 * 1000); service.stopAsync().awaitTerminated(); System.out.println("服務狀態爲:" + service.state()); } }
triggerShutdown() 方法會在執行方法stopAsync調用,startUp方法會在執行startAsync方法時調用,這個類的實現都是委託給AbstractService這個方法實現的。。具體代碼能夠本身看一下異步
AbstractScheduledService類用於在運行時處理一些週期性的任務。子類能夠實現 runOneIteration()方法定義一個週期執行的任務,以及相應的startUp()和shutDown()方法。爲了可以描述執行週期,你須要實現scheduler()方法。一般狀況下,你可使用AbstractScheduledService.Scheduler類提供的兩種調度器:newFixedRateSchedule(initialDelay, delay, TimeUnit) 和newFixedDelaySchedule(initialDelay, delay, TimeUnit),相似於JDK併發包中ScheduledExecutorService類提供的兩種調度方式。如要自定義schedules則可使用 CustomScheduler類來輔助實現。一個實現類看起來應該是這樣的ide
import com.google.common.util.concurrent.AbstractScheduledService; import com.google.common.util.concurrent.MoreExecutors; import java.util.concurrent.TimeUnit; /** * User: hupeng * Date: 14-12-22 * Time: 下午7:43 */ public class AbstractScheduledServiceTest extends AbstractScheduledService { @Override protected void startUp() throws Exception { } @Override protected void shutDown() throws Exception { } @Override protected void runOneIteration() throws Exception { // //處理異常,這裏若是拋出異常,會使服務狀態變爲failed同時致使任務終止 try { System.out.println("do work...."); } catch (Exception e) { //處理異常 } } @Override protected Scheduler scheduler() { return Scheduler.newFixedDelaySchedule(1, 5, TimeUnit.SECONDS); } public static void main(String[] args) throws InterruptedException { AbstractScheduledServiceTest service = new AbstractScheduledServiceTest(); service.addListener(new Listener() { @Override public void starting() { System.out.println("服務開始啓動....."); } @Override public void running() { System.out.println("服務開始運行"); ; } @Override public void stopping(State from) { System.out.println("服務關閉中"); } @Override public void terminated(State from) { System.out.println("服務終止"); } @Override public void failed(State from, Throwable failure) { System.out.println("失敗,cause:" + failure.getCause()); } }, MoreExecutors.directExecutor()); service.startAsync().awaitRunning(); System.out.println("服務狀態爲:" + service.state()); Thread.sleep(10 * 1000); service.stopAsync().awaitTerminated(); System.out.println("服務狀態爲:" + service.state()); } }
固然這個Listener的註冊只是爲了測試觀察。AbstractScheduledServic默認使用Executors.newSingleThreadScheduledExecutor來執行的
/** * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp}, * {@link #runOneIteration} and {@link #shutDown} methods. If this method is overridden the * executor will not be {@linkplain ScheduledExecutorService#shutdown shutdown} when this * service {@linkplain Service.State#TERMINATED terminates} or * {@linkplain Service.State#TERMINATED fails}. Subclasses may override this method to supply a * custom {@link ScheduledExecutorService} instance. This method is guaranteed to only be called * once. * * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread thread * pool that sets the name of the thread to the {@linkplain #serviceName() service name}. * Also, the pool will be {@linkplain ScheduledExecutorService#shutdown() shut down} when the * service {@linkplain Service.State#TERMINATED terminates} or * {@linkplain Service.State#TERMINATED fails}. */ protected ScheduledExecutorService executor() { final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( new ThreadFactory() { @Override public Thread newThread(Runnable runnable) { return MoreExecutors.newThread(serviceName(), runnable); } }); // Add a listener to shutdown the executor after the service is stopped. This ensures that the // JVM shutdown will not be prevented from exiting after this service has stopped or failed. // Technically this listener is added after start() was called so it is a little gross, but it // is called within doStart() so we know that the service cannot terminate or fail concurrently // with adding this listener so it is impossible to miss an event that we are interested in. addListener(new Listener() { @Override public void terminated(State from) { executor.shutdown(); } @Override public void failed(State from, Throwable failure) { executor.shutdown(); } }, directExecutor()); return executor; }
你能夠參照這個實現override這個方法,獲得你想要的ScheduledExecutorService。
如須要自定義的線程管理、能夠經過擴展 AbstractService類來實現。通常狀況下、使用上面的幾個實現類就已經知足需求了,但若是在服務執行過程當中有一些特定的線程處理需求、則建議繼承AbstractService類。
繼承AbstractService方法必須實現兩個方法.
doStart(): 首次調用startAsync()時會同時調用doStart(),doStart()內部須要處理全部的初始化工做、若是啓動成功則調用notifyStarted()方法;啓動失敗則調用notifyFailed()
doStop(): 首次調用stopAsync()會同時調用doStop(),doStop()要作的事情就是中止服務,若是中止成功則調用 notifyStopped()方法;中止失敗則調用 notifyFailed()方法。
doStart和doStop方法的實現須要考慮下性能,儘量的低延遲。若是初始化的開銷較大,如讀文件,打開網絡鏈接,或者其餘任何可能引發阻塞的操做,建議移到另一個單獨的線程去處理。
除了對Service接口提供基礎的實現類,Guava還提供了 ServiceManager類使得涉及到多個Service集合的操做更加容易。經過實例化ServiceManager類來建立一個Service集合,你能夠經過如下方法來管理它們:
startAsync() : 將啓動全部被管理的服務。若是當前服務的狀態都是NEW的話、那麼你只能調用該方法一次、這跟 Service#startAsync()是同樣的。
stopAsync() :將中止全部被管理的服務。
addListener :會添加一個ServiceManager.Listener,在服務狀態轉換中會調用該Listener
awaitHealthy() :會等待全部的服務達到Running狀態
awaitStopped():會等待全部服務達到終止狀態
檢測類的方法有:
isHealthy() :若是全部的服務處於Running狀態、會返回True
servicesByState():以狀態爲索引返回當前全部服務的快照
startupTimes() :返回一個Map對象,記錄被管理的服務啓動的耗時、以毫秒爲單位,同時Map默認按啓動時間排序。
咱們建議整個服務的生命週期都能經過ServiceManager來管理,不過即便狀態轉換是經過其餘機制觸發的、也不影響ServiceManager方法的正確執行。例如:當一個服務不是經過startAsync()、而是其餘機制啓動時,listeners 仍然能夠被正常調用、awaitHealthy()也可以正常工做。ServiceManager 惟一強制的要求是當其被建立時全部的服務必須處於New狀態。
參考:http://ifeve.com/google-guava-serviceexplained/