Goolge-Guava Concurrent中的Service

最近在學習了下Google的Guava包,發現這真是一個好東西啊。。因爲平時也會寫一些基於多線程的東西,因此特地瞭解了下這個Service框架。這裏Guava包裏的Service接口用於封裝一個服務對象的運行狀態、包括start和stop等方法。例如web服務器,RPC服務器、計時器等能夠實現這個接口。對此類服務的狀態管理並不輕鬆、須要對服務的開啓/關閉進行妥善管理、特別是在多線程環境下尤其複雜。Guava包提供了一些基礎類幫助你管理複雜的狀態轉換邏輯和同步細節。java

概述

一個服務正常生命週期有:web

  • Service.State.NEW
  • Service.State.STARTING
  • Service.State.RUNNING
  • Service.State.STOPPING
  • Service.State.TERMINATED
    服務一旦被中止就沒法再從新啓動了。若是服務在starting、running、stopping狀態出現問題、會進入Service.State.FAILED.狀態。調用 startAsync()方法能夠異步開啓一個服務,同時返回this對象造成方法調用鏈。注意:只有在當前服務的狀態是NEW時才能調用startAsync()方法,所以最好在應用中有一個統一的地方初始化相關服務。中止一個服務也是相似的、使用異步方法stopAsync() 。可是不像startAsync(),屢次調用這個方法是安全的。這是爲了方便處理關閉服務時候的鎖競爭問題。

Service也提供了一些方法用於等待服務狀態轉換的完成:經過 addListener()方法異步添加監聽器。此方法容許你添加一個 Service.Listener 、它會在每次服務狀態轉換的時候被調用。注意:最好在服務啓動以前添加Listener(這時的狀態是NEW)、不然以前已發生的狀態轉換事件是沒法在新添加的Listener上被從新觸發的。安全

同步使用awaitRunning()。這個方法不能被打斷、不強制捕獲異常、一旦服務啓動就會返回。若是服務沒有成功啓動,會拋出IllegalStateException異常。一樣的, awaitTerminated() 方法會等待服務達到終止狀態(TERMINATED 或者 FAILED)。兩個方法都有重載方法容許傳入超時時間。服務器

Service 接口自己實現起來會比較複雜、且容易碰到一些捉摸不透的問題。所以咱們不推薦直接實現這個接口。而是請繼承Guava包裏已經封裝好的基礎抽象類。每一個基礎類支持一種特定的線程模型。網絡

Service接口的實現

AbstractIdleService

AbstractIdleService在咱們服務處於running狀態時,不會作執行任何動做,咱們僅僅只有在startup和shutdown的時候才執行一些動做,因此咱們在實現這個方法時,只是簡單的實現startUp() 和 shutDown() 這兩個方法便可,在startUp方法中作一些好比初始化,註冊等操做,在shutDown中作一些清理操做等。舉個例子,也就是官網的例子:多線程

protected void startUp() {
  servlets.add(new GcStatsServlet());
}
protected void shutDown() {}

咱們在startUp()方法的時候,實例化了一個GcStatsServlet,當咱們在運行的時候,會有現成的線程處理這個Servlet,因此在服務運行時就不須要作什麼額外動做了。這個比較簡單,就不舉例子了,應該用的狀況應該不會不少吧?。。。。併發

AbstractExecutionThreadService

AbstractExecutionThreadService在單個線程中執行startup, running, and shutdown,咱們必須實現run()方法,同事在方法中要能響應中止服務的請求,好比在一個循環中:框架

import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;

import java.util.concurrent.TimeUnit;

/**
 * User: hupeng
 * Date: 14-12-22
 * Time: 下午10:17
 */
public class AbstractExecutionThreadServiceTest extends AbstractExecutionThreadService {
    private volatile boolean running = true; //聲明一個狀態

    @Override
    protected void startUp() {
        ////在這裏咱們能夠作一些初始化操做
    }

    @Override
    public void run() {
        while (running) {
            // do our work
            try {
                Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
                System.out.println("do our work.....");
            } catch (Exception e) {
                //處理異常,這裏若是拋出異常,會使服務狀態變爲failed同時致使任務終止。
            }
        }
    }

   @Override
    protected void triggerShutdown() {
        running = false; //這裏咱們改變狀態值,run方法中就可以獲得響應。=
        //能夠作一些清理操做,也能夠移到shutDown()方法中執行
    }

    @Override
    protected void shutDown() throws Exception {
        //能夠關閉資源,關閉鏈接。。。
    }

    public static void main(String[] args) throws InterruptedException {
        AbstractExecutionThreadServiceTest service = new AbstractExecutionThreadServiceTest();

        service.addListener(new Listener() {
            @Override
            public void starting() {
                System.out.println("服務開始啓動.....");
            }

            @Override
            public void running() {
                System.out.println("服務開始運行");;
            }

            @Override
            public void stopping(State from) {
                System.out.println("服務關閉中");
            }

            @Override
            public void terminated(State from) {
                System.out.println("服務終止");
            }

            @Override
            public void failed(State from, Throwable failure) {
                System.out.println("失敗,cause:" + failure.getCause());
            }
        }, MoreExecutors.directExecutor());

        service.startAsync().awaitRunning();
        System.out.println("服務狀態爲:" + service.state());

        Thread.sleep(10 * 1000);

        service.stopAsync().awaitTerminated();

        System.out.println("服務狀態爲:" + service.state());
    }

}

triggerShutdown() 方法會在執行方法stopAsync調用,startUp方法會在執行startAsync方法時調用,這個類的實現都是委託給AbstractService這個方法實現的。。具體代碼能夠本身看一下異步

AbstractScheduledService

AbstractScheduledService類用於在運行時處理一些週期性的任務。子類能夠實現 runOneIteration()方法定義一個週期執行的任務,以及相應的startUp()和shutDown()方法。爲了可以描述執行週期,你須要實現scheduler()方法。一般狀況下,你可使用AbstractScheduledService.Scheduler類提供的兩種調度器:newFixedRateSchedule(initialDelay, delay, TimeUnit) 和newFixedDelaySchedule(initialDelay, delay, TimeUnit),相似於JDK併發包中ScheduledExecutorService類提供的兩種調度方式。如要自定義schedules則可使用 CustomScheduler類來輔助實現。一個實現類看起來應該是這樣的ide

import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.TimeUnit;

/**
 * User: hupeng
 * Date: 14-12-22
 * Time: 下午7:43
 */
public class AbstractScheduledServiceTest extends AbstractScheduledService {


    @Override
    protected void startUp() throws Exception {

    }

    @Override
    protected void shutDown() throws Exception {

    }

    @Override
    protected void runOneIteration() throws Exception {
        // //處理異常,這裏若是拋出異常,會使服務狀態變爲failed同時致使任務終止
        try {
            System.out.println("do work....");
        } catch (Exception e) {
            //處理異常
        }
    }

    @Override
    protected Scheduler scheduler() {
        return Scheduler.newFixedDelaySchedule(1, 5, TimeUnit.SECONDS);
    }


    public static void main(String[] args) throws InterruptedException {
        AbstractScheduledServiceTest service = new AbstractScheduledServiceTest();

        service.addListener(new Listener() {
            @Override
            public void starting() {
                System.out.println("服務開始啓動.....");
            }

            @Override
            public void running() {
                System.out.println("服務開始運行");
                ;
            }

            @Override
            public void stopping(State from) {
                System.out.println("服務關閉中");
            }

            @Override
            public void terminated(State from) {
                System.out.println("服務終止");
            }

            @Override
            public void failed(State from, Throwable failure) {
                System.out.println("失敗,cause:" + failure.getCause());
            }
        }, MoreExecutors.directExecutor());

        service.startAsync().awaitRunning();
        System.out.println("服務狀態爲:" + service.state());

        Thread.sleep(10 * 1000);

        service.stopAsync().awaitTerminated();

        System.out.println("服務狀態爲:" + service.state());
    }
}

固然這個Listener的註冊只是爲了測試觀察。AbstractScheduledServic默認使用Executors.newSingleThreadScheduledExecutor來執行的

/**
   * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp},
   * {@link #runOneIteration} and {@link #shutDown} methods.  If this method is overridden the 
   * executor will not be {@linkplain ScheduledExecutorService#shutdown shutdown} when this 
   * service {@linkplain Service.State#TERMINATED terminates} or 
   * {@linkplain Service.State#TERMINATED fails}. Subclasses may override this method to supply a 
   * custom {@link ScheduledExecutorService} instance. This method is guaranteed to only be called 
   * once.
   * 
   * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread thread
   * pool that sets the name of the thread to the {@linkplain #serviceName() service name}.  
   * Also, the pool will be {@linkplain ScheduledExecutorService#shutdown() shut down} when the 
   * service {@linkplain Service.State#TERMINATED terminates} or 
   * {@linkplain Service.State#TERMINATED fails}.
   */
  protected ScheduledExecutorService executor() {
    final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
        new ThreadFactory() {
          @Override public Thread newThread(Runnable runnable) {
            return MoreExecutors.newThread(serviceName(), runnable);
          }
        });
    // Add a listener to shutdown the executor after the service is stopped.  This ensures that the
    // JVM shutdown will not be prevented from exiting after this service has stopped or failed.
    // Technically this listener is added after start() was called so it is a little gross, but it
    // is called within doStart() so we know that the service cannot terminate or fail concurrently
    // with adding this listener so it is impossible to miss an event that we are interested in.
    addListener(new Listener() {
      @Override public void terminated(State from) {
        executor.shutdown();
      }
      @Override public void failed(State from, Throwable failure) {
        executor.shutdown();
      }
    }, directExecutor());
    return executor;
  }

你能夠參照這個實現override這個方法,獲得你想要的ScheduledExecutorService。

AbstractService

如須要自定義的線程管理、能夠經過擴展 AbstractService類來實現。通常狀況下、使用上面的幾個實現類就已經知足需求了,但若是在服務執行過程當中有一些特定的線程處理需求、則建議繼承AbstractService類。

繼承AbstractService方法必須實現兩個方法.

doStart(): 首次調用startAsync()時會同時調用doStart(),doStart()內部須要處理全部的初始化工做、若是啓動成功則調用notifyStarted()方法;啓動失敗則調用notifyFailed()
doStop(): 首次調用stopAsync()會同時調用doStop(),doStop()要作的事情就是中止服務,若是中止成功則調用 notifyStopped()方法;中止失敗則調用 notifyFailed()方法。
doStart和doStop方法的實現須要考慮下性能,儘量的低延遲。若是初始化的開銷較大,如讀文件,打開網絡鏈接,或者其餘任何可能引發阻塞的操做,建議移到另一個單獨的線程去處理。

使用ServiceManager

除了對Service接口提供基礎的實現類,Guava還提供了 ServiceManager類使得涉及到多個Service集合的操做更加容易。經過實例化ServiceManager類來建立一個Service集合,你能夠經過如下方法來管理它們:

startAsync() : 將啓動全部被管理的服務。若是當前服務的狀態都是NEW的話、那麼你只能調用該方法一次、這跟 Service#startAsync()是同樣的。
stopAsync() :將中止全部被管理的服務。
addListener :會添加一個ServiceManager.Listener,在服務狀態轉換中會調用該Listener
awaitHealthy() :會等待全部的服務達到Running狀態
awaitStopped():會等待全部服務達到終止狀態
檢測類的方法有:

isHealthy() :若是全部的服務處於Running狀態、會返回True
servicesByState():以狀態爲索引返回當前全部服務的快照
startupTimes() :返回一個Map對象,記錄被管理的服務啓動的耗時、以毫秒爲單位,同時Map默認按啓動時間排序。
咱們建議整個服務的生命週期都能經過ServiceManager來管理,不過即便狀態轉換是經過其餘機制觸發的、也不影響ServiceManager方法的正確執行。例如:當一個服務不是經過startAsync()、而是其餘機制啓動時,listeners 仍然能夠被正常調用、awaitHealthy()也可以正常工做。ServiceManager 惟一強制的要求是當其被建立時全部的服務必須處於New狀態。

參考:http://ifeve.com/google-guava-serviceexplained/

相關文章
相關標籤/搜索