跟我一塊兒開發商業級IM(3)—— 長鏈接穩定性之鏈接及重連

寫在前面

貼個Kula高清圖鎮樓:
git

在以前的跟我一塊兒開發商業級IM(1)—— 技術選型及協議定義跟我一塊兒開發商業級IM(2)—— 接口定義及封裝兩篇文章,咱們已經瞭解IMS的技術選型及接口定義與封裝,接下來,咱們來真正實現鏈接及重連部分。github

一個社交產品,長鏈接穩定是前提,絕大部分業務邏輯的正常運行都須要穩定的長鏈接支撐,可謂重中之重。本篇文章將會講述如何去實現並維護一個穩定的長鏈接,以及各類異常狀況的處理等。閱讀完本篇文章,你將會學到鏈接、重連機制、心跳機制等知識。同時,會在Github(https://github.com/FreddyChen)上開源相關代碼(包含Android客戶端/Java服務端、基於TCP/WebSocket),廢話不說,咱們開始吧。web

初始化配置

初始化配置,也就是在應用程序啓動並進行IMS初始化時,傳入所需配置參數,可根據本身的業務需求自定義。下面咱們來看看NettyTCPIMS初始化接口的代碼實現(「因爲基於NettyWebSocket實現的NettyWebSocketIMS大部分代碼及邏輯都與NettyTCPIMS相同,就不單獨貼出NettyWebSocketIMS代碼了,下面只會講解WebSocket對比TCP實現所不一樣的地方」,有須要完整代碼的話能夠跳轉Github(https://github.com/FreddyChen/ims_kula)查看):算法

 /**
  * 初始化
  * @param context
  * @param options               IMS初始化配置
  * @param connectStatusListener IMS鏈接狀態監聽
  * @param msgReceivedListener   IMS消息接收監聽
  * @return
  */
 @Override
 public boolean init(Context context, IMSOptions options, IMSConnectStatusListener connectStatusListener, IMSMsgReceivedListener msgReceivedListener) {
    if (context == null) {
        Log.d(TAG, "初始化失敗:Context is null.");
        initialized = false;
        return false;
    }

    if (options == null) {
        Log.d(TAG, "初始化失敗:IMSOptions is null.");
        initialized = false;
        return false;
    }
    this.mContext = context;
    this.mIMSOptions = options;
    this.mIMSConnectStatusListener = connectStatusListener;
    this.mIMSMsgReceivedListener = msgReceivedListener;
    executors = new ExecutorServiceFactory();
    // 初始化重連線程池
    executors.initBossLoopGroup();
    // 註冊網絡鏈接狀態監聽
    NetworkManager.getInstance().registerObserver(context, this);
    // 標識ims初始化成功
    initialized = true;
    // 標識ims已打開
    isClosed = false;
    callbackIMSConnectStatus(IMSConnectStatus.Unconnected);
    return true;
}

如上圖,簡單講講初始化的幾個步驟:bootstrap

  1. 「參數」
  • 「context」應用程序上下文,方便IMS獲取系統資源並進行一些系統操做等。
  • 「options」IMS初始化所需配置,其中定義了通訊實現方式、通訊協議、傳輸協議、鏈接超時時間、重連延時時間、重連次數、心跳先後臺間隔時間、服務器地址等一些支持自定義的參數。
  • 「connectStatusListener」IMS鏈接狀態回調,便於把鏈接狀態反饋到應用層。
  • 「msgReceivedListener」消息接收回調,便於IMS把接收到的消息回調到應用層( 「本篇文章主要講解鏈接及重連,因此不涉及消息部分,後續會詳細講解」)。
  1. 「建立線程池組」
    線程池組分爲boss線程池和work線程池,其中boss線程池負責鏈接及重連部分;work線程池負責心跳部分,均爲單線程線程池(由於同時只能有一個線程進行鏈接或心跳)。至於爲何用線程池,純屬我的習慣,你們也能夠分別用一個子線程實現便可。
  2. 「註冊網絡狀態監聽」
    網絡變化時,進行IMS重連。

初始Bootstrap

初始化Bootstrap,可參考Netty ChannelOption並根據實際業務場景進行定製,下面貼出我本身定製的配置:服務器

/**
 * 初始化bootstrap
 */
void initBootstrap() {
    closeBootstrap();// 初始化前先關閉
    NioEventLoopGroup loopGroup = new NioEventLoopGroup(4);
    bootstrap = new Bootstrap();
    bootstrap.group(loopGroup).channel(NioSocketChannel.class)
            // 設置該選項之後,若是在兩小時內沒有數據的通訊時,TCP會自動發送一個活動探測數據報文
            .option(ChannelOption.SO_KEEPALIVE, true)
            // 設置禁用nagle算法,若是要求高實時性,有數據發送時就立刻發送,就將該選項設置爲true關閉Nagle算法;若是要減小發送次數減小網絡交互,就設置爲false等累積必定大小後再發送。默認爲false
            .option(ChannelOption.TCP_NODELAY, true)
            // 設置TCP發送緩衝區大小(字節數)
            .option(ChannelOption.SO_SNDBUF, 32 * 1024)
            // 設置TCP接收緩衝區大小(字節數)
            .option(ChannelOption.SO_RCVBUF, 32 * 1024)
            // 設置鏈接超時時長,單位:毫秒
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, mIMSOptions.getConnectTimeout())
            // 設置初始化ChannelHandler
            .handler(new NettyTCPChannelInitializerHandler(this));
}

至於參數的含義,你們可參照官方文檔的介紹。微信

鏈接

鏈接也能夠認爲是重連,執行重連響應邏輯便可:websocket

/**
 * 鏈接
 */
@Override
public void connect() {
    if(!initialized) {
        Log.w(TAG, "IMS初始化失敗,請查看日誌");
        return;
    }
    isExecConnect = true;// 標識已執行過鏈接
    this.reconnect(true);
}

「因此咱們直接看重連部分,也是整篇文章中最核心最複雜的部分」網絡

重連

由於鏈接及重連部分代碼較多及邏輯較複雜,爲了使NettyTCPIMS代碼儘可能簡潔及邏輯清晰,因此將鏈接及重連部分代碼抽取到NettyTCPReconnectTask框架

public class NettyTCPReconnectTask implements Runnable {

    private static final String TAG = NettyTCPReconnectTask.class.getSimpleName();
    private NettyTCPIMS ims;
    private IMSOptions mIMSOptions;

    NettyTCPReconnectTask(NettyTCPIMS ims) {
        this.ims = ims;
        this.mIMSOptions = ims.getIMSOptions();
    }

    @Override
    public void run() {
        try {
            // 重連時,釋放工做線程組,也就是中止心跳
            ims.getExecutors().destroyWorkLoopGroup();

            // ims未關閉而且網絡可用的狀況下,纔去鏈接
            while (!ims.isClosed() && ims.isNetworkAvailable()) {
                IMSConnectStatus status;
                if ((status = connect()) == IMSConnectStatus.Connected) {
                    ims.callbackIMSConnectStatus(status);
                    break;// 鏈接成功,跳出循環
                }

                if (status == IMSConnectStatus.ConnectFailed
                        || status == IMSConnectStatus.ConnectFailed_IMSClosed
                        || status == IMSConnectStatus.ConnectFailed_ServerListEmpty
                        || status == IMSConnectStatus.ConnectFailed_ServerEmpty
                        || status == IMSConnectStatus.ConnectFailed_ServerIllegitimate
                        || status == IMSConnectStatus.ConnectFailed_NetworkUnavailable) {
                    ims.callbackIMSConnectStatus(status);

                    if(ims.isClosed() || !ims.isNetworkAvailable()) {
                        return;
                    }
                    // 一個服務器地址列表都鏈接失敗後,說明網絡狀況可能不好,延時指定時間(重連間隔時間*2)再去進行下一個服務器地址的鏈接
                    Log.w(TAG, String.format("一個週期鏈接失敗,等待%1$dms後再次嘗試重連", mIMSOptions.getReconnectInterval() * 2));
                    try {
                        Thread.sleep(mIMSOptions.getReconnectInterval() * 2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        } finally {
            // 標識重連任務中止
            ims.setReconnecting(false);
        }
    }

    /**
     * 鏈接服務器
     * @return
     */
    private IMSConnectStatus connect() {
        if (ims.isClosed()) return IMSConnectStatus.ConnectFailed_IMSClosed;
        ims.initBootstrap();
        List<String> serverList = mIMSOptions.getServerList();
        if (serverList == null || serverList.isEmpty()) {
            return IMSConnectStatus.ConnectFailed_ServerListEmpty;
        }

        for (int i = 0; i < serverList.size(); i++) {
            String server = serverList.get(i);
            if (StringUtil.isNullOrEmpty(server)) {
                return IMSConnectStatus.ConnectFailed_ServerEmpty;
            }

            String[] params = null;
            try {
                params = server.split(" ");
            } catch (Exception e) {
                e.printStackTrace();
            }
            if (params == null || params.length < 2) {
                return IMSConnectStatus.ConnectFailed_ServerIllegitimate;
            }

            if(i == 0) {
                ims.callbackIMSConnectStatus(IMSConnectStatus.Connecting);
            }

            // +1是由於首次鏈接也認爲是重連,因此若是重連次數設置爲3,則最大鏈接次數爲3+1次
            for (int j = 0; j < mIMSOptions.getReconnectCount() + 1; j++) {
                if (ims.isClosed()) {
                    return IMSConnectStatus.ConnectFailed_IMSClosed;
                }
                if (!ims.isNetworkAvailable()) {
                    return IMSConnectStatus.ConnectFailed_NetworkUnavailable;
                }

                Log.d(TAG, String.format("正在進行【%1$s】的第%2$d次鏈接", server, j + 1));
                try {
                    String host = params[0];
                    int port = Integer.parseInt(params[1]);
                    Channel channel = toServer(host, port);
                    if (channel != null && channel.isOpen() && channel.isActive() && channel.isRegistered() && channel.isWritable()) {
                        ims.setChannel(channel);
                        return IMSConnectStatus.Connected;
                    } else {
                        if (j == mIMSOptions.getReconnectCount()) {
                            // 若是當前已達到最大重連次數,而且是最後一個服務器地址,則回調鏈接失敗
                            if(i == serverList.size() - 1) {
                                Log.w(TAG, String.format("【%1$s】鏈接失敗", server));
                                return IMSConnectStatus.ConnectFailed;
                            }
                            // 不然,無需回調鏈接失敗,等待一段時間再去進行下一個服務器地址鏈接便可
                            // 也就是說,當服務器地址列表裏的地址都鏈接失敗,才認爲是鏈接失敗
                            else {
                                // 一個服務器地址鏈接失敗後,延時指定時間再去進行下一個服務器地址的鏈接
                                Log.w(TAG, String.format("【%1$s】鏈接失敗,正在等待進行下一個服務器地址的重連,當前重連延時時長:%2$dms", server, mIMSOptions.getReconnectInterval()));
                                Log.w(TAG, "=========================================================================================");
                                Thread.sleep(mIMSOptions.getReconnectInterval());
                            }
                        } else {
                            // 鏈接失敗,則線程休眠(重連間隔時長 / 2 * n) ms
                            int delayTime = mIMSOptions.getReconnectInterval() + mIMSOptions.getReconnectInterval() / 2 * j;
                            Log.w(TAG, String.format("【%1$s】鏈接失敗,正在等待重連,當前重連延時時長:%2$dms", server, delayTime));
                            Thread.sleep(delayTime);
                        }
                    }
                } catch (InterruptedException e) {
                    break;// 線程被中斷,則強制關閉
                }
            }
        }

        return IMSConnectStatus.ConnectFailed;
    }

    /**
     * 真正鏈接服務器的地方
     * @param host
     * @param port
     * @return
     */
    private Channel toServer(String host, int port) {
        Channel channel;
        try {
            channel = ims.getBootstrap().connect(host, port).sync().channel();
        } catch (Exception e) {
            e.printStackTrace();
            channel = null;
        }

        return channel;
    }
}

從以上代碼,能夠看到主要分爲三個方法:

  • 「run()」重連任務是一個Thread,run()方法也就是線程啓動時執行的方法,主要是判斷IMS是否關閉和網絡狀態,知足這兩個條件就一直循環去鏈接,鏈接成功後,回調鏈接狀態並中止線程,不然,一個週期鏈接失敗後(一個鏈接失敗週期,表明從開始鏈接到全部服務器地址達到最大重連次數),延時一段時間再去嘗試重連(你們可能會問爲何要去延時,直接鏈接很差嗎?主要是由於若是鏈接失敗的話,大多數狀況下多是客戶端網絡環境很差或者是服務端存在問題,延時是爲了在下一個時間節點時網絡恢復等,避免頻繁鏈接,節約性能),直至鏈接成功爲止。

  • 「toServer()」toServer()主要是Netty框架進行TCP長鏈接的代碼,比較簡單。

  • 「connect()」鏈接及重連的全部邏輯,都放到connect()方法中進行。TCPWebSocket的方式有細微的區別,下面主要以TCP爲例,至於WebSocket的區別,稍後會列出來。
    「注:ims_kulaSDK中固定的TCP服務器地址的格式爲:IP地址 端口號,例:192.168.0.1 8808,你們也能夠根據本身的需求來定義格式。」

    connect()方法大致邏輯以下:

    • 判斷IMS是否已關閉或網絡是否不可用,若知足兩個條件的其中之一,即返回鏈接失敗狀態;
    • 判斷用戶是否設置了服務器地址列表,若未設置,即返回鏈接失敗狀態;
    • 若以上條件都未知足,也就是IMS未關閉,網絡可用,而且服務器地址已設置,則初始化Bootstrap;
    • 接着須要兩個for循環,外層循環負責遍歷服務器地址列表,取出每個服務器地址;內層循環負責遍歷用戶設置的最大重連次數,默認爲3次,加上鍊接所需的一次,也就是說在不設置最大重連次數的狀況下, ims_kulaSDK會對每一個服務器地址進行 「4」次鏈接。同時,重連間隔時間爲 reconnectInterval + reconnectInterval / 2 * n,也就是若是設置重連間隔時間爲8000ms,那麼第二次重連間隔時間將爲12000ms,第三次爲16000ms,以此類推;
    • 獲取服務器地址後,對地址進行字符串分割,分別獲取host和port;
    • 接着調用Netty鏈接TCP的方法( toServer(String host, int port))進行鏈接便可。

「注:WebSocket鏈接方式與TCP大同小異,惟一的區別就是WebSocket的服務器地址格式與TCP不一樣,ws://IP地址:端口號/websocket,例:ws://192.168.0.1:8808/websocket,因此WebSocket獲取host和port代碼以下:」
(僞代碼,具體代碼可見NettyWebSocketReconnectTask

URI uri = URI.create(server);
String host = uri.getHost();
int port = uri.getPort();

至於其他鏈接及重連部分代碼,WebSocketTCP是一致的,由於WebSocket自己就是基於TCP協議做一層封裝。

什麼時候重連及斷開鏈接

首先明確一下,「重連是相對於客戶端來講的,服務端不存在主動鏈接;斷開鏈接是相對於服務端來講的,嚴格來講是移除響應的Channel。」

客戶端重連時機:

  • 網絡切換
  • 斷開網絡鏈接
  • 可感知的服務端異常
  • 心跳超時等

服務端斷開鏈接時機:

  • 可感知的客戶端異常
  • 心跳超時
  • 同一IP的客戶端重複鏈接等

不知道你們注意到沒有,上述的客戶端重連時機和服務端斷開鏈接時機,都分別有一個可感知異常。什麼是可感知異常?也就是不管客戶端仍是服務端,在對方斷開鏈接的時候,能夠感知到,就是可感知異常。

經測試,在雙方創建鏈接成功的狀態下,對於客戶端來講,若是服務端手動中止服務,Netty會回調exceptionCaught()方法,以下:

「服務端直接關機或者拔網線時,客戶端沒法感知,須要利用心跳超時機制進行重連。」

同理,對於服務端來講,若是客戶端手動殺死進程,Netty會回調channelInactive()方法,以下:

「客戶端直接關機或者斷網時,服務端沒法感知,一樣須要利用心跳機制進行斷開客戶端鏈接(移除channel)。」

「注:利用心跳超時機制進行重連及斷開鏈接會在後續文章講解,本篇文章主要講解鏈接及重連,就不在此展開了。」

效果展現

考慮到GIF圖片體積過大,暫時先把鏈接超時時間和重連間隔時間適當縮短,下面展現幾種狀況下的客戶端鏈接變化:

  • 正常鏈接
  • 客戶端主動斷網重連
  • 服務端中止服務重連

「注:以上GIF圖,客戶端與服務端創建鏈接成功時,會顯示「消息」字樣,不然會顯示鏈接狀態。」

客戶端日誌以下:

服務端日誌以下:

因爲客戶端殺死進程及服務端主動中止服務,日誌會清空,因此就不貼更詳細的日誌了,感興趣的同窗能夠pull代碼自行驗證。

寫在最後

經過以上代碼實現,若是不考慮長鏈接穩定性的狀況下(未加入心跳超時重連邏輯),已經能夠進行客戶端與服務端消息的收發,本文主要講解鏈接及重連模塊,因此暫未加入消息收發功能。

在下一篇文章中,將會講解TCP/WebSocket的拆包與粘包處理,因爲Netty已封裝了各類不一樣的消息編解碼器,因此若是使用我定義的消息格式,拆包與粘包的處理將會很簡單,直接拿來用便可。考慮到你們可能有不一樣業務協議的需求,因此會加入自定義協議的消息編解碼器的實現,敬請期待。

相關代碼已提交Github,須要自取:

  • KulaChat(https://github.com/FreddyChen/KulaChat)
  • kulachat-server(https://github.com/FreddyChen/kulachat-server)
  • ims_kula(https://github.com/FreddyChen/ims_kula)

下篇文章見,古德拜~ ~ ~


本文分享自微信公衆號 - FreddyChen(FreddyChenAndroid)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索