看看Curator
框架 爲實現對 鏈接狀態ConnectionState
的監聽,都是怎麼構造框架的。後面咱們也能夠應用到業務的各類監聽中。java
Listener
接口,給用戶實現stateChange()
傳入新的狀態,用戶實現對這新的狀態要作什麼邏輯處理。app
public interface ConnectionStateListener { /** * Called when there is a state change in the connection * @param client the client * @param newState the new state */ public void stateChanged(CuratorFramework client, ConnectionState newState); }
/** * Abstracts a listenable object */ public interface Listenable<T> { /** * Add the given listener. The listener will be executed in the containing instance's thread. * * @param listener listener to add */ public void addListener(T listener); /** * Add the given listener. The listener will be executed using the given * executor * * @param listener listener to add * @param executor executor to run listener in */ public void addListener(T listener, Executor executor); public void removeListener(T listener); }
/** * Abstracts an object that has listeners 裝Listener的容器 * <T> Listener類型 */ public class ListenerContainer<T> implements Listenable<T> { private final Map<T, ListenerEntry<T>> listeners = Maps.newConcurrentMap(); @Override public void addListener(T listener) { addListener(listener, MoreExecutors.sameThreadExecutor()); } @Override public void addListener(T listener, Executor executor) { listeners.put(listener, new ListenerEntry<T>(listener, executor)); } /** * 對 Listener 列表的遍歷進行封裝 * Utility - apply the given function to each listener. * @param function function to call for each listener */ public void forEach(final Function<T, Void> function) { for ( final ListenerEntry<T> entry : listeners.values() ) { entry.executor.execute ( new Runnable() { @Override public void run() { try { function.apply(entry.listener); } catch ( Throwable e ) { ThreadUtils.checkInterrupted(e); log.error(String.format("Listener (%s) threw an exception", entry.listener), e); } } } ); } } public void clear() { listeners.clear(); } public int size() { return listeners.size(); } }
// to manage connection state public class ConnectionStateManager { // 又是隊列? 玩消息什麼的都是用隊列。如今是存放 ConnectionState BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE); // 持有 ListenerContainer private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>(); /** * Start the manager,起一個線程去執行 processEvents(),要是這線程掛了怎麼辦?異常怎麼處理的?框架怎麼處理的。。 */ public void start() { service.submit ( new Callable<Object>() { @Override public Object call() throws Exception { processEvents(); return null; } } ); } @Override public void close() { if ( state.compareAndSet(State.STARTED, State.CLOSED) ) { service.shutdownNow(); listeners.clear(); } } // 對不斷產生的 ConnectionState 進行處理,生產者? private void processEvents(){ // 當 ConnectionStateManager 啓動完成 while ( state.get() == State.STARTED ) { // 不斷從隊列拿 Conection 狀態 final ConnectionState newState = eventQueue.take(); // 對每一個 狀態監聽接口 應用 Function, 狀態監聽接口做爲 主語 // forEach 是 listeners封裝的 遍歷全部 listener 的方法而已。。。 listeners.forEach( new Function<ConnectionStateListener, Void>() { // ConnectionStateListener是咱們本身要實現的接口,stateChanged是要實現的方法 @Override public Void apply(ConnectionStateListener listener) { listener.stateChanged(client, newState); return null; } } ); /** 上面這段 若是沒有封裝 Listener 到 ListenerContainer 的話, 全部 Listener 就是個 List列表,就直接調 Listener 的 stateChanged 方法了吧。 for Listener { listener.stateChanged(client, newState); } 由於 封裝 Listener 到 ListenerContainer了, 上面的 forEach 方法內部就能夠有些內部實現,好比 對每一個 Listener 都是用對應的 executor 來執行。 **/ } } // 上面的方法是處理 ConnectionState 的,那 ConnectionState 是怎麼傳進來的呢? 生產者? /** * Post a state change. If the manager is already in that state the change * is ignored. Otherwise the change is queued for listeners. * * @param newConnectionState new state * @return true if the state actually changed, false if it was already at that state */ public synchronized boolean addStateChange(ConnectionState newConnectionState) { // 先判斷 ConnectionStateManager 是否已經啓動好, state 是內部 Enum if ( state.get() != State.STARTED ) { return false; } ConnectionState previousState = currentConnectionState; if ( previousState == newConnectionState ) { return false; } ConnectionState localState = newConnectionState; // !!! notifyAll(); while ( !eventQueue.offer(state) ) { eventQueue.poll(); log.warn("ConnectionStateManager queue full - dropping events to make room"); } return true; } }
啓動框架
// 啓動 connectionStateManager,不斷檢測 connectionState 變化 connectionStateManager.start(); // must be called before client.start() // 來個匿名默認的 ConnectionStateListener final ConnectionStateListener listener = new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { if ( ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState ) { logAsErrorConnectionErrors.set(true); } } }; this.getConnectionStateListenable().addListener(listener);
生產 ConnectionState
,把zk
那裏拿到的state
轉一下,而後addStateChange
ide
void validateConnection(Watcher.Event.KeeperState state) { if ( state == Watcher.Event.KeeperState.Disconnected ) { suspendConnection(); } else if ( state == Watcher.Event.KeeperState.Expired ) { connectionStateManager.addStateChange(ConnectionState.LOST); } else if ( state == Watcher.Event.KeeperState.SyncConnected ) { connectionStateManager.addStateChange(ConnectionState.RECONNECTED); } else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly ) { connectionStateManager.addStateChange(ConnectionState.READ_ONLY); } }
還有其餘各類Listener
,均可以放到 ListenerContainer
this
private final ListenerContainer<CuratorListener> listeners; private final ListenerContainer<UnhandledErrorListener> unhandledErrorListeners; /** * Receives notifications about errors and background events */ public interface CuratorListener { /** * Called when a background task has completed or a watch has triggered * @param event the event * @throws Exception any errors */ public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception; } public interface UnhandledErrorListener { /** * Called when an exception is caught in a background thread, handler, etc. Before this * listener is called, the error will have been logged and a {@link ConnectionState#LOST} * event will have been queued for any {@link ConnectionStateListener}s. * @param message Source message * @param e exception */ public void unhandledError(String message, Throwable e); }
ConnectionStateManager
就是個 生產者消費者模式的代碼,特色就是: public addStateChange()
暴露給外部用戶生產 ConnectionState
,經過隊列eventQueue
傳遞,private processEvents()
在內部對ConnectionState
進行消費。new
匿名類,對接口進行默認實現。Listener
列表對象進行Container
封裝,而後 封裝foreach
方法,傳入Function
接口 就是foreach
每一個元素要執行的業務邏輯,方法體就能夠加一些其餘福利。