保護性暫掛模式,也稱爲Guarded Suspension模式,指的是當前線程在執行某個任務以前,須要檢查某一條件,只有在該條件成立的狀況下,當前線程才能夠繼續往下執行當前任務。顧名思義,保護性暫掛模式是一種廣義的概念,其主要載體有兩個:預備條件和任務,在任何須要使用預先檢查的狀況中均可以使用保護性暫掛模式。java
以下是類圖中各個角色定位的描述:服務器
好比咱們會遇到這種場景,在進行某些操做時,好比經過elasticsearch服務器進行查詢或更新操做,咱們須要鏈接es服務器,而在es服務器鏈接上以前,全部的查詢和更新操做都是須要被阻塞的。即便在服務器鏈接上以後,咱們也須要常常對服務器進行心跳測試,以檢查與服務器的鏈接是否還存活在,若是不存活,則仍是須要繼續阻塞其他的操做,而且嘗試從新鏈接es服務器,這種狀況咱們就可使用到保護性暫掛模式。保護性條件便是與es服務器的鏈接還存活在,若是不存活則須要掛起全部嘗試鏈接服務器執行任務的線程,而且當前線程會嘗試鏈接服務器。以下是示例代碼:elasticsearch
public class ElasticSearchAgent { private volatile boolean connectedToServer = false; private final Predicate agentConnected = () -> connectedToServer; private final Blocker blocker = new ConditionVarBlocker(); private final Timer heartbeatTimer = new Timer(true); public void update(final UpdateCondition condition) throws Exception { GuardedAction<Void> guardedAction = new GuardedAction<Void>(agentConnected) { @Override public Void call() { doUpdate(condition); return null; } }; blocker.callWithGuard(guardedAction); } private void doUpdate(UpdateCondition condition) { try { TimeUnit.MICROSECONDS.sleep(20); // 模擬進行更新 } catch (InterruptedException e) { e.printStackTrace(); } } public void init() { Thread connectingThread = new Thread(new ConnectingTask()); connectingThread.start(); heartbeatTimer.schedule(new HeartBeatTask(), 60000, 2000); } public void disconnect() { connectedToServer = false; } protected void onConnected() { try { blocker.signalAfter(() -> { connectedToServer = true; return Boolean.TRUE; }); } catch (Exception e) { e.printStackTrace(); } } protected void onDisconnected() { connectedToServer = false; } private class ConnectingTask implements Runnable { @Override public void run() { try { Thread.sleep(100); } catch (InterruptedException e) {} onConnected(); } } private class HeartBeatTask extends TimerTask { @Override public void run() { if (!testConnection()) { onDisconnected(); reconnect(); } } private boolean testConnection() { return true; } private void reconnect() { ConnectingTask connectingTask = new ConnectingTask(); connectingTask.run(); } } }
能夠看到,在進行update()操做時,首先會建立一個GuardedAction對象,真正的更新操做是在該對象中進行的,這裏的保護性條件是經過一個volatile類型的變量connectedToServer來控制的,若是當前與es服務器的鏈接還存活在,則該變量置爲true。HeartBeatTask是一個定時任務,在60s延遲以後每隔2s會向服務器發送心跳測試,以檢查鏈接是否存活,若是不存活,則會將connectedToServer變量置爲false,而且會嘗試鏈接服務器。在init()方法中首先會建立一個鏈接服務器的任務,以保證服務器鏈接在初始時的可用狀態,而且其還會啓動心跳測試的定時任務。以下是Blocker和ConditionVarBlocker的實現代碼:ide
public interface Blocker { <V> V callWithGuard(GuardedAction<V> guardedAction) throws Exception; void signalAfter(Callable<Boolean> stateOperation) throws Exception; void signal() throws InterruptedException; void broadcastAfter(Callable<Boolean> stateOperation) throws Exception; }
public class ConditionVarBlocker implements Blocker { private final Lock lock; private final Condition condition; public ConditionVarBlocker(Lock lock) { this.lock = lock; this.condition = lock.newCondition(); } public ConditionVarBlocker() { this.lock = new ReentrantLock(); this.condition = lock.newCondition(); } @Override public <V> V callWithGuard(GuardedAction<V> guardedAction) throws Exception { lock.lockInterruptibly(); try { final Predicate guard = guardedAction.guard; while (!guard.evaluate()) { condition.await(); } return guardedAction.call(); } finally { lock.unlock(); } } @Override public void signalAfter(Callable<Boolean> stateOperation) throws Exception { lock.lockInterruptibly(); try { if (stateOperation.call()) { condition.signal(); } } finally { lock.unlock(); } } @Override public void signal() throws InterruptedException { lock.lockInterruptibly(); try { condition.signal(); } finally { lock.unlock(); } } @Override public void broadcastAfter(Callable<Boolean> stateOperation) throws Exception { lock.lockInterruptibly(); try { if (stateOperation.call()) { condition.signalAll(); } } finally { lock.unlock(); } } }
能夠看到,ConditionVarBlocker中基本上都是模板代碼,其聲明瞭一個Lock對象和一個Condition對象,Lock對象用於對當前的先驗條件檢查過程進行同步處理,Condition對象則用於在先驗條件不知足的狀況下阻塞當前線程的。測試
在callWithGuard()方法中,首先會在一個循環中檢查當前的先驗條件是否知足,若是不知足,則使當前線程進入等待狀態,若是知足,則當前線程繼續執行其任務。這裏須要注意的是,咱們使用了while()循環用於判斷先驗條件是否知足,由於有可能當前線程被意外的喚醒,或者說被喚醒以後先驗條件仍是不知足,於是這裏使用循環判斷,以使當前線程在先驗條件不知足的狀況下繼續等待。this
在signalAfter()方法中,其首先調用stateOperation.call()方法,判斷當前的先驗條件是否知足,只有在先驗條件知足的狀況下才會喚醒一個等待的線程。這裏stateOperation是ElasticSearchAgent傳入的用於判斷當前是否處於鏈接狀態的一個條件載體。lua
以下是GuardedAction的實現代碼:線程
public abstract class GuardedAction<V> implements Callable<V> { protected final Predicate guard; public GuardedAction(Predicate guard) { this.guard = guard; } }
這裏GuardedAction是一個抽象類,其主要封裝了一個Predicate屬性。GuardedAction的主要實如今ElasticSearchAgent.guardedMethod()方法中生成的,由於具體須要執行的任務須要調用方生成,這裏只是提供了一個模板方法。以下是Predicate的代碼:code
@FunctionalInterface public interface Predicate { boolean evaluate(); }
這裏Predicate也只是一個聲明而已,其具體的實現也是在ElasticSearchAgent中,本例中主要是判斷connectedToServer是否爲true,即處於鏈接服務器的狀態。對象
public ConditionVarBlocker(Lock lock) { this.lock = lock; this.condition = lock.newCondition(); }
該方法用於防止ElasticSearchAgent因爲某種緣由而須要加鎖時可能會形成嵌套監視器鎖死的問題的。所謂的嵌套監視器鎖死的問題指的是,若是某個線程執行依次獲取了兩個鎖,而因爲先驗條件不知足,從而致使當前線程釋放了內層鎖從而進入等待狀態,而另外的線程爲了檢查當前的先驗條件須要獲取到外層鎖,這就致使了鎖循環等待的問題,在等待先驗條件知足的線程持有外層鎖,其沒法釋放,而嘗試改變先驗條件的線程正在嘗試獲取外層鎖,但其一直沒法獲取到,從而形成了死鎖。這種狀況下就提供了該構造方法,若是ElasticSearchAgent須要對其方法進行加鎖,那麼其須要經過該構造方法將鎖傳遞給ConditionVarBlocker,這樣當前線程在釋放鎖的時候就會將外層鎖和內層鎖同時釋放了(由於都是同一個鎖)。