Curator源碼閱讀 - ConnectionState的管理與監聽

看看Curator框架 爲實現對 鏈接狀態ConnectionState的監聽,都是怎麼構造框架的。後面咱們也能夠應用到業務的各類監聽中。java

Curator2.13實現

接口 Listener

Listener接口,給用戶實現stateChange()傳入新的狀態,用戶實現對這新的狀態要作什麼邏輯處理。app

public interface ConnectionStateListener
{
    /**
     * Called when there is a state change in the connection
     * @param client the client
     * @param newState the new state
     */
    public void stateChanged(CuratorFramework client, ConnectionState newState);
}

接口 Listenable

/**
 * Abstracts a listenable object
 */
public interface Listenable<T>
{
    /**
     * Add the given listener. The listener will be executed in the containing instance's thread.
     *
     * @param listener listener to add
     */
    public void     addListener(T listener);

    /**
     * Add the given listener. The listener will be executed using the given
     * executor
     *
     * @param listener listener to add
     * @param executor executor to run listener in
     */
    public void     addListener(T listener, Executor executor);

    public void     removeListener(T listener);
}

抽象類 ListenerContainer<T> implements Listenable<T>

/**
 * Abstracts an object that has listeners 裝Listener的容器
 * <T> Listener類型
 */
public class ListenerContainer<T> implements Listenable<T>
{
    private final Map<T, ListenerEntry<T>> listeners = Maps.newConcurrentMap();

    @Override
    public void addListener(T listener)
    {
        addListener(listener, MoreExecutors.sameThreadExecutor());
    }

    @Override
    public void addListener(T listener, Executor executor)
    {
        listeners.put(listener, new ListenerEntry<T>(listener, executor));
    }
    
    /**
     * 對 Listener 列表的遍歷進行封裝
     * Utility - apply the given function to each listener. 
     * @param function function to call for each listener
     */
    public void forEach(final Function<T, Void> function)
    {
        for ( final ListenerEntry<T> entry : listeners.values() )
        {
            entry.executor.execute
            (
                new Runnable()
                {
                    @Override
                    public void run()
                    {
                        try
                        {
                            function.apply(entry.listener);
                        }
                        catch ( Throwable e )
                        {
                            ThreadUtils.checkInterrupted(e);
                            log.error(String.format("Listener (%s) threw an exception", entry.listener), e);
                        }
                    }
                }
            );
        }
    }
    
    
    public void clear()
    {
        listeners.clear();
    }

    public int size()
    {
        return listeners.size();
    }
    
}

ConnectionStateManager

// to manage connection state
public class ConnectionStateManager {
    // 又是隊列? 玩消息什麼的都是用隊列。如今是存放 ConnectionState
    BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
    // 持有 ListenerContainer
    private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>();
    
    /**
     * Start the manager,起一個線程去執行 processEvents(),要是這線程掛了怎麼辦?異常怎麼處理的?框架怎麼處理的。。
     */
    public void start()
    {
        service.submit
            (
                new Callable<Object>()
                {
                    @Override
                    public Object call() throws Exception
                    {
                        processEvents();
                        return null;
                    }
                }
            );
    }
    
    @Override
    public void close()
    {
        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
        {
            service.shutdownNow();
            listeners.clear();
        }
    }
    
    // 對不斷產生的 ConnectionState 進行處理,生產者?
     private void processEvents(){
        // 當 ConnectionStateManager 啓動完成
        while ( state.get() == State.STARTED )
        {
            // 不斷從隊列拿 Conection 狀態
            final ConnectionState newState = eventQueue.take();
            // 對每一個 狀態監聽接口  應用 Function, 狀態監聽接口做爲 主語
            // forEach 是 listeners封裝的 遍歷全部 listener 的方法而已。。。
            listeners.forEach(
                new Function<ConnectionStateListener, Void>() {
                    // ConnectionStateListener是咱們本身要實現的接口,stateChanged是要實現的方法
                    @Override 
                    public Void apply(ConnectionStateListener listener)
                    {
                        listener.stateChanged(client, newState);
                        return null;
                    }
                }
            );
            /**
            上面這段
            若是沒有封裝 Listener 到 ListenerContainer 的話, 全部 Listener 就是個 List列表,就直接調 Listener 的 stateChanged 方法了吧。
            for Listener {
                listener.stateChanged(client, newState);
            }
            
            由於 封裝 Listener 到 ListenerContainer了, 上面的 forEach 方法內部就能夠有些內部實現,好比 對每一個 Listener 都是用對應的 executor 來執行。
            **/
        }
    }
    
    
    // 上面的方法是處理 ConnectionState 的,那 ConnectionState 是怎麼傳進來的呢? 生產者?
    /**
     * Post a state change. If the manager is already in that state the change
     * is ignored. Otherwise the change is queued for listeners.
     *
     * @param newConnectionState new state
     * @return true if the state actually changed, false if it was already at that state
     */
    public synchronized boolean addStateChange(ConnectionState newConnectionState)
    {
        // 先判斷 ConnectionStateManager 是否已經啓動好, state 是內部 Enum
        if ( state.get() != State.STARTED )
        {
            return false;
        }

        ConnectionState previousState = currentConnectionState;
        if ( previousState == newConnectionState )
        {
            return false;
        }
        ConnectionState localState = newConnectionState;
        // !!!
        notifyAll();

        while ( !eventQueue.offer(state) )
        {
            eventQueue.poll();
            log.warn("ConnectionStateManager queue full - dropping events to make room");
        }
        return true;
    }
    
}

調用

啓動框架

// 啓動 connectionStateManager,不斷檢測 connectionState 變化
connectionStateManager.start(); // must be called before client.start()
// 來個匿名默認的 ConnectionStateListener
final ConnectionStateListener listener = new ConnectionStateListener()
{
    @Override
    public void stateChanged(CuratorFramework client, ConnectionState newState)
    {
        if ( ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState )
        {
            logAsErrorConnectionErrors.set(true);
        }
    }
};
this.getConnectionStateListenable().addListener(listener);

生產 ConnectionState,把zk那裏拿到的state轉一下,而後addStateChangeide

void validateConnection(Watcher.Event.KeeperState state)
{
    if ( state == Watcher.Event.KeeperState.Disconnected )
    {
        suspendConnection();
    }
    else if ( state == Watcher.Event.KeeperState.Expired )
    {
        connectionStateManager.addStateChange(ConnectionState.LOST);
    }
    else if ( state == Watcher.Event.KeeperState.SyncConnected )
    {
        connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
    }
    else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
    {
        connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
    }
}

複用?

還有其餘各類Listener,均可以放到 ListenerContainerthis

private final ListenerContainer<CuratorListener> listeners;
private final ListenerContainer<UnhandledErrorListener> unhandledErrorListeners;


/**
 * Receives notifications about errors and background events
 */
public interface CuratorListener {
    /**
     * Called when a background task has completed or a watch has triggered
     * @param event the event
     * @throws Exception any errors
     */
    public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
}

public interface UnhandledErrorListener
{
    /**
     * Called when an exception is caught in a background thread, handler, etc. Before this
     * listener is called, the error will have been logged and a {@link ConnectionState#LOST}
     * event will have been queued for any {@link ConnectionStateListener}s.
     * @param message Source message
     * @param e exception
     */
    public void     unhandledError(String message, Throwable e);
}

總結一下源碼技巧

  1. ConnectionStateManager 就是個 生產者消費者模式的代碼,特色就是: public addStateChange() 暴露給外部用戶生產 ConnectionState,經過隊列eventQueue傳遞,private processEvents()在內部對ConnectionState進行消費。
  2. 直接new匿名類,對接口進行默認實現。
  3. Listener列表對象進行Container封裝,而後 封裝foreach方法,傳入Function接口 就是foreach每一個元素要執行的業務邏輯,方法體就能夠加一些其餘福利。
相關文章
相關標籤/搜索