多線程編程之保護性暫掛模式

       保護性暫掛模式,也稱爲Guarded Suspension模式,指的是當前線程在執行某個任務以前,須要檢查某一條件,只有在該條件成立的狀況下,當前線程才能夠繼續往下執行當前任務。顧名思義,保護性暫掛模式是一種廣義的概念,其主要載體有兩個:預備條件和任務,在任何須要使用預先檢查的狀況中均可以使用保護性暫掛模式。java

1. 角色描述

保護性暫掛類圖

       以下是類圖中各個角色定位的描述:服務器

  • GuardedObject:受保護的對象,供給客戶端調用,用於生成並執行受保護的行爲的和檢查並控制狀態改變的,以下是其各個方法的做用:
    • guardedMethod():生成而且執行受保護的行爲;
    • stateOperation():用於檢查當前狀態是否知足特定的狀態,從而控制受保護行爲的狀態;
  • Blocker:該類主要提供了一些模板方法,主要是用於調用受保護行爲,或者喚醒當前因爲先驗條件不經過而致使等待的線程的,以下是其各個方法的做用:
    • callWithGuard(GuardedAction):該方法首先會檢查GuardedAction提供的先驗條件是否知足,若是不知足,則會阻塞當前線程,不然會執行GuardedAction中的任務;
    • signalAfter(Callable):該方法會先檢查先驗條件是否知足,若是知足先驗條件則會喚醒一個正在等待的線程;
    • broadcastAfter(Callable):該方法會先檢查先驗條件是否知足,若是先驗條件知足則會喚醒全部正在等待的線程;
    • signal:直接喚醒一個正在等待先驗條件知足的線程;
    • broadcast():直接喚醒全部正在等待先驗條件知足的線程;
  • GuardedAction:受保護方法的載體,而且提供了進行先驗檢查的條件,其各方法和屬性做用以下:
    • predicate:提供了進行先驗檢查的條件;
    • call():提供了須要執行的任務;
  • Predicate:承載了進行先驗條件檢查的條件。

2. 實例演示

       好比咱們會遇到這種場景,在進行某些操做時,好比經過elasticsearch服務器進行查詢或更新操做,咱們須要鏈接es服務器,而在es服務器鏈接上以前,全部的查詢和更新操做都是須要被阻塞的。即便在服務器鏈接上以後,咱們也須要常常對服務器進行心跳測試,以檢查與服務器的鏈接是否還存活在,若是不存活,則仍是須要繼續阻塞其他的操做,而且嘗試從新鏈接es服務器,這種狀況咱們就可使用到保護性暫掛模式。保護性條件便是與es服務器的鏈接還存活在,若是不存活則須要掛起全部嘗試鏈接服務器執行任務的線程,而且當前線程會嘗試鏈接服務器。以下是示例代碼:elasticsearch

public class ElasticSearchAgent {
  private volatile boolean connectedToServer = false;

  private final Predicate agentConnected = () -> connectedToServer;

  private final Blocker blocker = new ConditionVarBlocker();

  private final Timer heartbeatTimer = new Timer(true);

  public void update(final UpdateCondition condition) throws Exception {
    GuardedAction<Void> guardedAction = new GuardedAction<Void>(agentConnected) {
      @Override
      public Void call() {
        doUpdate(condition);
        return null;
      }
    };

    blocker.callWithGuard(guardedAction);
  }

  private void doUpdate(UpdateCondition condition) {
    try {
      TimeUnit.MICROSECONDS.sleep(20); // 模擬進行更新
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  public void init() {
    Thread connectingThread = new Thread(new ConnectingTask());
    connectingThread.start();
    heartbeatTimer.schedule(new HeartBeatTask(), 60000, 2000);
  }

  public void disconnect() {
    connectedToServer = false;
  }

  protected void onConnected() {
    try {
      blocker.signalAfter(() -> {
        connectedToServer = true;
        return Boolean.TRUE;
      });
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  protected void onDisconnected() {
    connectedToServer = false;
  }

  private class ConnectingTask implements Runnable {
    @Override
    public void run() {
      try {
        Thread.sleep(100);
      } catch (InterruptedException e) {}

      onConnected();
    }
  }

  private class HeartBeatTask extends TimerTask {

    @Override
    public void run() {
      if (!testConnection()) {
        onDisconnected();
        reconnect();
      }
    }

    private boolean testConnection() {
      return true;
    }

    private void reconnect() {
      ConnectingTask connectingTask = new ConnectingTask();
      connectingTask.run();
    }
  }
}

       能夠看到,在進行update()操做時,首先會建立一個GuardedAction對象,真正的更新操做是在該對象中進行的,這裏的保護性條件是經過一個volatile類型的變量connectedToServer來控制的,若是當前與es服務器的鏈接還存活在,則該變量置爲true。HeartBeatTask是一個定時任務,在60s延遲以後每隔2s會向服務器發送心跳測試,以檢查鏈接是否存活,若是不存活,則會將connectedToServer變量置爲false,而且會嘗試鏈接服務器。在init()方法中首先會建立一個鏈接服務器的任務,以保證服務器鏈接在初始時的可用狀態,而且其還會啓動心跳測試的定時任務。以下是Blocker和ConditionVarBlocker的實現代碼:ide

public interface Blocker {
  <V> V callWithGuard(GuardedAction<V> guardedAction) throws Exception;

  void signalAfter(Callable<Boolean> stateOperation) throws Exception;

  void signal() throws InterruptedException;

  void broadcastAfter(Callable<Boolean> stateOperation) throws Exception;
}
public class ConditionVarBlocker implements Blocker {
  private final Lock lock;
  private final Condition condition;

  public ConditionVarBlocker(Lock lock) {
    this.lock = lock;
    this.condition = lock.newCondition();
  }

  public ConditionVarBlocker() {
    this.lock = new ReentrantLock();
    this.condition = lock.newCondition();
  }

  @Override
  public <V> V callWithGuard(GuardedAction<V> guardedAction) throws Exception {
    lock.lockInterruptibly();
    try {
      final Predicate guard = guardedAction.guard;
      while (!guard.evaluate()) {
        condition.await();
      }

      return guardedAction.call();
    } finally {
      lock.unlock();
    }
  }

  @Override
  public void signalAfter(Callable<Boolean> stateOperation) throws Exception {
    lock.lockInterruptibly();
    try {
      if (stateOperation.call()) {
        condition.signal();
      }
    } finally {
      lock.unlock();
    }
  }

  @Override
  public void signal() throws InterruptedException {
    lock.lockInterruptibly();
    try {
      condition.signal();
    } finally {
      lock.unlock();
    }
  }

  @Override
  public void broadcastAfter(Callable<Boolean> stateOperation) throws Exception {
    lock.lockInterruptibly();
    try {
      if (stateOperation.call()) {
        condition.signalAll();
      }
    } finally {
      lock.unlock();
    }
  }
}

        能夠看到,ConditionVarBlocker中基本上都是模板代碼,其聲明瞭一個Lock對象和一個Condition對象,Lock對象用於對當前的先驗條件檢查過程進行同步處理,Condition對象則用於在先驗條件不知足的狀況下阻塞當前線程的。測試

       在callWithGuard()方法中,首先會在一個循環中檢查當前的先驗條件是否知足,若是不知足,則使當前線程進入等待狀態,若是知足,則當前線程繼續執行其任務。這裏須要注意的是,咱們使用了while()循環用於判斷先驗條件是否知足,由於有可能當前線程被意外的喚醒,或者說被喚醒以後先驗條件仍是不知足,於是這裏使用循環判斷,以使當前線程在先驗條件不知足的狀況下繼續等待。this

       在signalAfter()方法中,其首先調用stateOperation.call()方法,判斷當前的先驗條件是否知足,只有在先驗條件知足的狀況下才會喚醒一個等待的線程。這裏stateOperation是ElasticSearchAgent傳入的用於判斷當前是否處於鏈接狀態的一個條件載體。lua

       以下是GuardedAction的實現代碼:線程

public abstract class GuardedAction<V> implements Callable<V> {
  protected final Predicate guard;

  public GuardedAction(Predicate guard) {
    this.guard = guard;
  }
}

       這裏GuardedAction是一個抽象類,其主要封裝了一個Predicate屬性。GuardedAction的主要實如今ElasticSearchAgent.guardedMethod()方法中生成的,由於具體須要執行的任務須要調用方生成,這裏只是提供了一個模板方法。以下是Predicate的代碼:code

@FunctionalInterface
public interface Predicate {
  boolean evaluate();
}

       這裏Predicate也只是一個聲明而已,其具體的實現也是在ElasticSearchAgent中,本例中主要是判斷connectedToServer是否爲true,即處於鏈接服務器的狀態。對象

3. Guarded Suspension實現考量

  • 能夠看到Guarded Suspension的實現中,Blocker、GuardedAction和Predicate只是提供的一個模板,其內主要是一些通用的代碼,而和具體業務相關的代碼主要在ElasticSearchAgent中,其會建立所需執行的GuardedAction對象,而且控制其所需的先驗條件。Guarded Suspension將關注點進行了分離,咱們在使用該模式的時候主要須要實現的也便是相似於ElasticSearchAgent的一個客戶端類;
  • 在執行使用Guarded Suspension模式的時候,須要注意的是,每次執行GuardedObject.guardedMethod()方法時都會建立一個GuardedAction對象,這可能會對JVM垃圾回收形成必定的負擔,於是在使用該模式時若是內存較小須要特別注意該問題;
  • 在Guarded Suspension模式中,ConditionVarBlocker的callWithGuard()和signal*()方法的執行都進行了加鎖處理,這是由於ConditionVarBlocker是全部線程所共有的一個對象,其lock和condition變量是須要全部線程都一致可見的,於是這裏須要對其進行加鎖處理;
  • 在ConditionVarBlocker.callWithGuard()方法中,對先驗條件的檢查是使用一個while循環進行的,這是爲了防止等待的線程被意外的喚醒,而先驗條件此時還不知足,使用while循環就能夠保證當前線程再次進入到等待狀態;
  • 上述ConditionVarBlocker還提供了一個以下的構造方法:
public ConditionVarBlocker(Lock lock) {
    this.lock = lock;
    this.condition = lock.newCondition();
}

該方法用於防止ElasticSearchAgent因爲某種緣由而須要加鎖時可能會形成嵌套監視器鎖死的問題的。所謂的嵌套監視器鎖死的問題指的是,若是某個線程執行依次獲取了兩個鎖,而因爲先驗條件不知足,從而致使當前線程釋放了內層鎖從而進入等待狀態,而另外的線程爲了檢查當前的先驗條件須要獲取到外層鎖,這就致使了鎖循環等待的問題,在等待先驗條件知足的線程持有外層鎖,其沒法釋放,而嘗試改變先驗條件的線程正在嘗試獲取外層鎖,但其一直沒法獲取到,從而形成了死鎖。這種狀況下就提供了該構造方法,若是ElasticSearchAgent須要對其方法進行加鎖,那麼其須要經過該構造方法將鎖傳遞給ConditionVarBlocker,這樣當前線程在釋放鎖的時候就會將外層鎖和內層鎖同時釋放了(由於都是同一個鎖)。

相關文章
相關標籤/搜索