寫在前面
貼個Kula高清圖鎮樓:git
在以前的跟我一塊兒開發商業級IM(1)—— 技術選型及協議定義和跟我一塊兒開發商業級IM(2)—— 接口定義及封裝兩篇文章,咱們已經瞭解IMS的技術選型及接口定義與封裝,接下來,咱們來真正實現鏈接及重連部分。github
一個社交產品,長鏈接穩定是前提,絕大部分業務邏輯的正常運行都須要穩定的長鏈接支撐,可謂重中之重。本篇文章將會講述如何去實現並維護一個穩定的長鏈接,以及各類異常狀況的處理等。閱讀完本篇文章,你將會學到鏈接、重連機制、心跳機制等知識。同時,會在Github(https://github.com/FreddyChen)上開源相關代碼(包含Android客戶端/Java服務端、基於TCP/WebSocket),廢話不說,咱們開始吧。web
初始化配置
初始化配置,也就是在應用程序啓動並進行IMS
初始化時,傳入所需配置參數,可根據本身的業務需求自定義。下面咱們來看看NettyTCPIMS
初始化接口的代碼實現(「因爲基於Netty
和WebSocket
實現的NettyWebSocketIMS
大部分代碼及邏輯都與NettyTCPIMS
相同,就不單獨貼出NettyWebSocketIMS
代碼了,下面只會講解WebSocket
對比TCP
實現所不一樣的地方」,有須要完整代碼的話能夠跳轉Github(https://github.com/FreddyChen/ims_kula)查看):算法
/**
* 初始化
* @param context
* @param options IMS初始化配置
* @param connectStatusListener IMS鏈接狀態監聽
* @param msgReceivedListener IMS消息接收監聽
* @return
*/
@Override
public boolean init(Context context, IMSOptions options, IMSConnectStatusListener connectStatusListener, IMSMsgReceivedListener msgReceivedListener) {
if (context == null) {
Log.d(TAG, "初始化失敗:Context is null.");
initialized = false;
return false;
}
if (options == null) {
Log.d(TAG, "初始化失敗:IMSOptions is null.");
initialized = false;
return false;
}
this.mContext = context;
this.mIMSOptions = options;
this.mIMSConnectStatusListener = connectStatusListener;
this.mIMSMsgReceivedListener = msgReceivedListener;
executors = new ExecutorServiceFactory();
// 初始化重連線程池
executors.initBossLoopGroup();
// 註冊網絡鏈接狀態監聽
NetworkManager.getInstance().registerObserver(context, this);
// 標識ims初始化成功
initialized = true;
// 標識ims已打開
isClosed = false;
callbackIMSConnectStatus(IMSConnectStatus.Unconnected);
return true;
}
如上圖,簡單講講初始化的幾個步驟:bootstrap
-
「參數」
-
「context」應用程序上下文,方便IMS獲取系統資源並進行一些系統操做等。 -
「options」IMS初始化所需配置,其中定義了通訊實現方式、通訊協議、傳輸協議、鏈接超時時間、重連延時時間、重連次數、心跳先後臺間隔時間、服務器地址等一些支持自定義的參數。 -
「connectStatusListener」IMS鏈接狀態回調,便於把鏈接狀態反饋到應用層。 -
「msgReceivedListener」消息接收回調,便於IMS把接收到的消息回調到應用層( 「本篇文章主要講解鏈接及重連,因此不涉及消息部分,後續會詳細講解」)。
-
「建立線程池組」
線程池組分爲boss線程池和work線程池,其中boss線程池負責鏈接及重連部分;work線程池負責心跳部分,均爲單線程線程池(由於同時只能有一個線程進行鏈接或心跳)。至於爲何用線程池,純屬我的習慣,你們也能夠分別用一個子線程實現便可。 -
「註冊網絡狀態監聽」
網絡變化時,進行IMS重連。
初始Bootstrap
初始化Bootstrap,可參考Netty ChannelOption並根據實際業務場景進行定製,下面貼出我本身定製的配置:服務器
/**
* 初始化bootstrap
*/
void initBootstrap() {
closeBootstrap();// 初始化前先關閉
NioEventLoopGroup loopGroup = new NioEventLoopGroup(4);
bootstrap = new Bootstrap();
bootstrap.group(loopGroup).channel(NioSocketChannel.class)
// 設置該選項之後,若是在兩小時內沒有數據的通訊時,TCP會自動發送一個活動探測數據報文
.option(ChannelOption.SO_KEEPALIVE, true)
// 設置禁用nagle算法,若是要求高實時性,有數據發送時就立刻發送,就將該選項設置爲true關閉Nagle算法;若是要減小發送次數減小網絡交互,就設置爲false等累積必定大小後再發送。默認爲false
.option(ChannelOption.TCP_NODELAY, true)
// 設置TCP發送緩衝區大小(字節數)
.option(ChannelOption.SO_SNDBUF, 32 * 1024)
// 設置TCP接收緩衝區大小(字節數)
.option(ChannelOption.SO_RCVBUF, 32 * 1024)
// 設置鏈接超時時長,單位:毫秒
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, mIMSOptions.getConnectTimeout())
// 設置初始化ChannelHandler
.handler(new NettyTCPChannelInitializerHandler(this));
}
至於參數的含義,你們可參照官方文檔的介紹。微信
鏈接
鏈接也能夠認爲是重連,執行重連響應邏輯便可:websocket
/**
* 鏈接
*/
@Override
public void connect() {
if(!initialized) {
Log.w(TAG, "IMS初始化失敗,請查看日誌");
return;
}
isExecConnect = true;// 標識已執行過鏈接
this.reconnect(true);
}
「因此咱們直接看重連部分,也是整篇文章中最核心最複雜的部分」。網絡
重連
由於鏈接及重連部分代碼較多及邏輯較複雜,爲了使NettyTCPIMS
代碼儘可能簡潔及邏輯清晰,因此將鏈接及重連部分代碼抽取到NettyTCPReconnectTask
:框架
public class NettyTCPReconnectTask implements Runnable {
private static final String TAG = NettyTCPReconnectTask.class.getSimpleName();
private NettyTCPIMS ims;
private IMSOptions mIMSOptions;
NettyTCPReconnectTask(NettyTCPIMS ims) {
this.ims = ims;
this.mIMSOptions = ims.getIMSOptions();
}
@Override
public void run() {
try {
// 重連時,釋放工做線程組,也就是中止心跳
ims.getExecutors().destroyWorkLoopGroup();
// ims未關閉而且網絡可用的狀況下,纔去鏈接
while (!ims.isClosed() && ims.isNetworkAvailable()) {
IMSConnectStatus status;
if ((status = connect()) == IMSConnectStatus.Connected) {
ims.callbackIMSConnectStatus(status);
break;// 鏈接成功,跳出循環
}
if (status == IMSConnectStatus.ConnectFailed
|| status == IMSConnectStatus.ConnectFailed_IMSClosed
|| status == IMSConnectStatus.ConnectFailed_ServerListEmpty
|| status == IMSConnectStatus.ConnectFailed_ServerEmpty
|| status == IMSConnectStatus.ConnectFailed_ServerIllegitimate
|| status == IMSConnectStatus.ConnectFailed_NetworkUnavailable) {
ims.callbackIMSConnectStatus(status);
if(ims.isClosed() || !ims.isNetworkAvailable()) {
return;
}
// 一個服務器地址列表都鏈接失敗後,說明網絡狀況可能不好,延時指定時間(重連間隔時間*2)再去進行下一個服務器地址的鏈接
Log.w(TAG, String.format("一個週期鏈接失敗,等待%1$dms後再次嘗試重連", mIMSOptions.getReconnectInterval() * 2));
try {
Thread.sleep(mIMSOptions.getReconnectInterval() * 2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} finally {
// 標識重連任務中止
ims.setReconnecting(false);
}
}
/**
* 鏈接服務器
* @return
*/
private IMSConnectStatus connect() {
if (ims.isClosed()) return IMSConnectStatus.ConnectFailed_IMSClosed;
ims.initBootstrap();
List<String> serverList = mIMSOptions.getServerList();
if (serverList == null || serverList.isEmpty()) {
return IMSConnectStatus.ConnectFailed_ServerListEmpty;
}
for (int i = 0; i < serverList.size(); i++) {
String server = serverList.get(i);
if (StringUtil.isNullOrEmpty(server)) {
return IMSConnectStatus.ConnectFailed_ServerEmpty;
}
String[] params = null;
try {
params = server.split(" ");
} catch (Exception e) {
e.printStackTrace();
}
if (params == null || params.length < 2) {
return IMSConnectStatus.ConnectFailed_ServerIllegitimate;
}
if(i == 0) {
ims.callbackIMSConnectStatus(IMSConnectStatus.Connecting);
}
// +1是由於首次鏈接也認爲是重連,因此若是重連次數設置爲3,則最大鏈接次數爲3+1次
for (int j = 0; j < mIMSOptions.getReconnectCount() + 1; j++) {
if (ims.isClosed()) {
return IMSConnectStatus.ConnectFailed_IMSClosed;
}
if (!ims.isNetworkAvailable()) {
return IMSConnectStatus.ConnectFailed_NetworkUnavailable;
}
Log.d(TAG, String.format("正在進行【%1$s】的第%2$d次鏈接", server, j + 1));
try {
String host = params[0];
int port = Integer.parseInt(params[1]);
Channel channel = toServer(host, port);
if (channel != null && channel.isOpen() && channel.isActive() && channel.isRegistered() && channel.isWritable()) {
ims.setChannel(channel);
return IMSConnectStatus.Connected;
} else {
if (j == mIMSOptions.getReconnectCount()) {
// 若是當前已達到最大重連次數,而且是最後一個服務器地址,則回調鏈接失敗
if(i == serverList.size() - 1) {
Log.w(TAG, String.format("【%1$s】鏈接失敗", server));
return IMSConnectStatus.ConnectFailed;
}
// 不然,無需回調鏈接失敗,等待一段時間再去進行下一個服務器地址鏈接便可
// 也就是說,當服務器地址列表裏的地址都鏈接失敗,才認爲是鏈接失敗
else {
// 一個服務器地址鏈接失敗後,延時指定時間再去進行下一個服務器地址的鏈接
Log.w(TAG, String.format("【%1$s】鏈接失敗,正在等待進行下一個服務器地址的重連,當前重連延時時長:%2$dms", server, mIMSOptions.getReconnectInterval()));
Log.w(TAG, "=========================================================================================");
Thread.sleep(mIMSOptions.getReconnectInterval());
}
} else {
// 鏈接失敗,則線程休眠(重連間隔時長 / 2 * n) ms
int delayTime = mIMSOptions.getReconnectInterval() + mIMSOptions.getReconnectInterval() / 2 * j;
Log.w(TAG, String.format("【%1$s】鏈接失敗,正在等待重連,當前重連延時時長:%2$dms", server, delayTime));
Thread.sleep(delayTime);
}
}
} catch (InterruptedException e) {
break;// 線程被中斷,則強制關閉
}
}
}
return IMSConnectStatus.ConnectFailed;
}
/**
* 真正鏈接服務器的地方
* @param host
* @param port
* @return
*/
private Channel toServer(String host, int port) {
Channel channel;
try {
channel = ims.getBootstrap().connect(host, port).sync().channel();
} catch (Exception e) {
e.printStackTrace();
channel = null;
}
return channel;
}
}
從以上代碼,能夠看到主要分爲三個方法:
-
「run()」重連任務是一個Thread,
run()
方法也就是線程啓動時執行的方法,主要是判斷IMS是否關閉和網絡狀態,知足這兩個條件就一直循環去鏈接,鏈接成功後,回調鏈接狀態並中止線程,不然,一個週期鏈接失敗後(一個鏈接失敗週期,表明從開始鏈接到全部服務器地址達到最大重連次數),延時一段時間再去嘗試重連(你們可能會問爲何要去延時,直接鏈接很差嗎?主要是由於若是鏈接失敗的話,大多數狀況下多是客戶端網絡環境很差或者是服務端存在問題,延時是爲了在下一個時間節點時網絡恢復等,避免頻繁鏈接,節約性能),直至鏈接成功爲止。 -
「toServer()」
toServer()
主要是Netty框架進行TCP長鏈接的代碼,比較簡單。 -
「connect()」鏈接及重連的全部邏輯,都放到
connect()
方法中進行。TCP
和WebSocket
的方式有細微的區別,下面主要以TCP
爲例,至於WebSocket
的區別,稍後會列出來。
「注:ims_kula
SDK中固定的TCP服務器地址的格式爲:IP地址 端口號,例:192.168.0.1 8808,你們也能夠根據本身的需求來定義格式。」connect()
方法大致邏輯以下: -
判斷IMS是否已關閉或網絡是否不可用,若知足兩個條件的其中之一,即返回鏈接失敗狀態; -
判斷用戶是否設置了服務器地址列表,若未設置,即返回鏈接失敗狀態; -
若以上條件都未知足,也就是IMS未關閉,網絡可用,而且服務器地址已設置,則初始化Bootstrap; -
接着須要兩個for循環,外層循環負責遍歷服務器地址列表,取出每個服務器地址;內層循環負責遍歷用戶設置的最大重連次數,默認爲3次,加上鍊接所需的一次,也就是說在不設置最大重連次數的狀況下, ims_kula
SDK會對每一個服務器地址進行 「4」次鏈接。同時,重連間隔時間爲 reconnectInterval + reconnectInterval / 2 * n,也就是若是設置重連間隔時間爲8000ms,那麼第二次重連間隔時間將爲12000ms,第三次爲16000ms,以此類推; -
獲取服務器地址後,對地址進行字符串分割,分別獲取host和port; -
接着調用Netty鏈接TCP的方法( toServer(String host, int port)
)進行鏈接便可。
「注:WebSocket
鏈接方式與TCP
大同小異,惟一的區別就是WebSocket
的服務器地址格式與TCP
不一樣,ws://IP地址:端口號/websocket,例:ws://192.168.0.1:8808/websocket,因此WebSocket
獲取host和port代碼以下:」
(僞代碼,具體代碼可見NettyWebSocketReconnectTask
)
URI uri = URI.create(server);
String host = uri.getHost();
int port = uri.getPort();
至於其他鏈接及重連部分代碼,WebSocket
與TCP
是一致的,由於WebSocket
自己就是基於TCP
協議做一層封裝。
什麼時候重連及斷開鏈接
首先明確一下,「重連是相對於客戶端來講的,服務端不存在主動鏈接;斷開鏈接是相對於服務端來講的,嚴格來講是移除響應的Channel。」
客戶端重連時機:
-
網絡切換 -
斷開網絡鏈接 -
可感知的服務端異常 -
心跳超時等
服務端斷開鏈接時機:
-
可感知的客戶端異常 -
心跳超時 -
同一IP的客戶端重複鏈接等
不知道你們注意到沒有,上述的客戶端重連時機和服務端斷開鏈接時機,都分別有一個可感知異常。什麼是可感知異常?也就是不管客戶端仍是服務端,在對方斷開鏈接的時候,能夠感知到,就是可感知異常。
經測試,在雙方創建鏈接成功的狀態下,對於客戶端來講,若是服務端手動中止服務,Netty會回調exceptionCaught()
方法,以下:
「服務端直接關機或者拔網線時,客戶端沒法感知,須要利用心跳超時機制進行重連。」
同理,對於服務端來講,若是客戶端手動殺死進程,Netty會回調channelInactive()
方法,以下:
「客戶端直接關機或者斷網時,服務端沒法感知,一樣須要利用心跳機制進行斷開客戶端鏈接(移除channel)。」
「注:利用心跳超時機制進行重連及斷開鏈接會在後續文章講解,本篇文章主要講解鏈接及重連,就不在此展開了。」
效果展現
考慮到GIF圖片體積過大,暫時先把鏈接超時時間和重連間隔時間適當縮短,下面展現幾種狀況下的客戶端鏈接變化:
-
正常鏈接
-
客戶端主動斷網重連
-
服務端中止服務重連
「注:以上GIF圖,客戶端與服務端創建鏈接成功時,會顯示「消息」字樣,不然會顯示鏈接狀態。」
客戶端日誌以下:
服務端日誌以下:
因爲客戶端殺死進程及服務端主動中止服務,日誌會清空,因此就不貼更詳細的日誌了,感興趣的同窗能夠pull代碼自行驗證。
寫在最後
經過以上代碼實現,若是不考慮長鏈接穩定性的狀況下(未加入心跳超時重連邏輯),已經能夠進行客戶端與服務端消息的收發,本文主要講解鏈接及重連模塊,因此暫未加入消息收發功能。
在下一篇文章中,將會講解TCP/WebSocket的拆包與粘包處理,因爲Netty已封裝了各類不一樣的消息編解碼器,因此若是使用我定義的消息格式,拆包與粘包的處理將會很簡單,直接拿來用便可。考慮到你們可能有不一樣業務協議的需求,因此會加入自定義協議的消息編解碼器的實現,敬請期待。
相關代碼已提交Github,須要自取:
-
KulaChat(https://github.com/FreddyChen/KulaChat) -
kulachat-server(https://github.com/FreddyChen/kulachat-server) -
ims_kula(https://github.com/FreddyChen/ims_kula)
下篇文章見,古德拜~ ~ ~
本文分享自微信公衆號 - FreddyChen(FreddyChenAndroid)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。