Netty 做爲一個網絡框架,提供了諸多功能,好比咱們以前說的編解碼,Netty 準備不少現成的編解碼,同時,Netty 還爲咱們準備了網絡中,很是重要的一個服務-----心跳機制。經過心跳檢查對方是否有效,這在 RPC 框架中是必不可少的功能。git
Netty 提供了 IdleStateHandler ,ReadTimeoutHandler,WriteTimeoutHandler 檢測鏈接的有效性。固然,你也能夠本身寫個任務。但咱們今天不許備使用自定義任務,而是使用 Netty 內部的。github
說如下這三個 handler 的做用。promise
序 號 | 名稱 | 做用 |
---|---|---|
1 | IdleStateHandler | 當鏈接的空閒時間(讀或者寫)太長時,將會觸發一個 IdleStateEvent 事件。而後,你能夠經過你的 ChannelInboundHandler 中重寫 userEventTrigged 方法來處理該事件。 |
2 | ReadTimeoutHandler | 若是在指定的事件沒有發生讀事件,就會拋出這個異常,並自動關閉這個鏈接。你能夠在 exceptionCaught 方法中處理這個異常。 |
3 | WriteTimeoutHandler | 當一個寫操做不能在必定的時間內完成時,拋出此異常,並關閉鏈接。你一樣能夠在 exceptionCaught 方法中處理這個異常。 |
注意:
其中,關於 WriteTimeoutHandler 的描述,著名的 《Netty 實戰》和 他的英文原版的描述都過期了,原文描述:服務器
若是在指定的時間間隔內沒有任何出站數據寫入,則拋出一個 WriteTimeoutException.網絡
此書出版的時候,Netty 的文檔確實是這樣的,但在 2015 年 12 月 28 號的時候,被一個同窗修改了邏輯,見下方 git 日誌:框架
image.pngide
貌似仍是個國人妹子。。。。而如今的文檔描述是:oop
Raises a {@link WriteTimeoutException} when a write operation cannot finish in a certain period of time.
當一個寫操做不能在必定的時間內完成時,就會產生一個 WriteTimeoutException。源碼分析
ReadTimeout 事件和 WriteTimeout 事件都會自動關閉鏈接,並且,屬於異常處理,因此,這裏只是介紹如下,咱們重點看 IdleStateHandler。性能
當鏈接的空閒時間(讀或者寫)太長時,將會觸發一個 IdleStateEvent 事件。而後,你能夠經過你的 ChannelInboundHandler 中重寫 userEventTrigged 方法來處理該事件。
IdleStateHandler 既是出站處理器也是入站處理器,繼承了 ChannelDuplexHandler 。一般在 initChannel 方法中將 IdleStateHandler 添加到 pipeline 中。而後在本身的 handler 中重寫 userEventTriggered 方法,當發生空閒事件(讀或者寫),就會觸發這個方法,並傳入具體事件。
這時,你能夠經過 Context 對象嘗試向目標 Socekt 寫入數據,並設置一個 監聽器,若是發送失敗就關閉 Socket (Netty 準備了一個ChannelFutureListener.CLOSE_ON_FAILURE
監聽器用來實現關閉 Socket 邏輯)。
這樣,就實現了一個簡單的心跳服務。
1.構造方法,該類有 3 個構造方法,主要對一下 4 個屬性賦值:
private final boolean observeOutput;// 是否考慮出站時較慢的狀況。默認值是false(不考慮)。 private final long readerIdleTimeNanos; // 讀事件空閒時間,0 則禁用事件 private final long writerIdleTimeNanos;// 寫事件空閒時間,0 則禁用事件 private final long allIdleTimeNanos; //讀或寫空閒時間,0 則禁用事件
2. handlerAdded 方法
當該 handler 被添加到 pipeline 中時,則調用 initialize 方法:
private void initialize(ChannelHandlerContext ctx) { switch (state) { case 1: case 2: return; } state = 1; initOutputChanged(ctx); lastReadTime = lastWriteTime = ticksInNanos(); if (readerIdleTimeNanos > 0) { // 這裏的 schedule 方法會調用 eventLoop 的 schedule 方法,將定時任務添加進隊列中 readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx), readerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (writerIdleTimeNanos > 0) { writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx), writerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (allIdleTimeNanos > 0) { allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx), allIdleTimeNanos, TimeUnit.NANOSECONDS); } }
只要給定的參數大於0,就建立一個定時任務,每一個事件都建立。同時,將 state 狀態設置爲 1,防止重複初始化。調用 initOutputChanged 方法,初始化 「監控出站數據屬性」,代碼以下:
private void initOutputChanged(ChannelHandlerContext ctx) { if (observeOutput) { Channel channel = ctx.channel(); Unsafe unsafe = channel.unsafe(); ChannelOutboundBuffer buf = unsafe.outboundBuffer(); // 記錄了出站緩衝區相關的數據,buf 對象的 hash 碼,和 buf 的剩餘緩衝字節數 if (buf != null) { lastMessageHashCode = System.identityHashCode(buf.current()); lastPendingWriteBytes = buf.totalPendingWriteBytes(); } } }
首先說說這個 observeOutput 「監控出站數據屬性」 的做用。由於 github 上有人提了 issue ,issue 地址,原本是沒有這個參數的。爲何須要呢?
假設:當你的客戶端應用每次接收數據是30秒,而你的寫空閒時間是 25 秒,那麼,當你數據尚未寫出的時候,寫空閒時間觸發了。其實是不合乎邏輯的。由於你的應用根本不空閒。
怎麼解決呢?
Netty 的解決方案是:記錄最後一次輸出消息的相關信息,並使用一個值 firstXXXXIdleEvent 表示是否再次活動過,每次讀寫活動都會將對應的 first 值更新爲 true,若是是 false,說明這段時間沒有發生過讀寫事件。同時若是第一次記錄出站的相關數據和第二次獲得的出站相關數據不一樣,則說明數據在緩慢的出站,就不用觸發空閒事件。
總的來講,這個字段就是用來對付 「客戶端接收數據奇慢無比,慢到比空閒時間還多」 的極端狀況。因此,Netty 默認是關閉這個字段的。
3. 該類內部的 3 個定時任務類
以下圖:
這 3 個定時任務分別對應 讀,寫,讀或者寫 事件。共有一個父類。這個父類提供了一個模板方法:
當通道關閉了,就不執行任務了。反之,執行子類的 run 方法。
1. 讀事件的 run 方法
代碼以下:
protected void run(ChannelHandlerContext ctx) { long nextDelay = readerIdleTimeNanos; if (!reading) { nextDelay -= ticksInNanos() - lastReadTime; } if (nextDelay <= 0) { // Reader is idle - set a new timeout and notify the callback. // 用於取消任務 promise readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS); boolean first = firstReaderIdleEvent; firstReaderIdleEvent = false; try { // 再次提交任務 IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first); // 觸發用戶 handler use channelIdle(ctx, event); } catch (Throwable t) { ctx.fireExceptionCaught(t); } } else { // Read occurred before the timeout - set a new timeout with shorter delay. readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS); } }
該方法很簡單:
總的來講,每次讀取操做都會記錄一個時間,定時任務時間到了,會計算當前時間和最後一次讀的時間的間隔,若是間隔超過了設置的時間,就觸發 UserEventTriggered 方法。就是這麼簡單。
再看看寫事件任務。
2. 寫事件的 run 方法
寫任務的邏輯基本和讀任務的邏輯同樣,惟一不一樣的就是有一個針對 出站較慢數據的判斷。
if (hasOutputChanged(ctx, first)) { return; }
若是這個方法返回 true,就不執行觸發事件操做了,即便時間到了。看看該方法實現:
private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) { if (observeOutput) { // 若是最後一次寫的時間和上一次記錄的時間不同,說明寫操做進行過了,則更新此值 if (lastChangeCheckTimeStamp != lastWriteTime) { lastChangeCheckTimeStamp = lastWriteTime; // 但若是,在這個方法的調用間隙修改的,就仍然不觸發事件 if (!first) { // #firstWriterIdleEvent or #firstAllIdleEvent return true; } } Channel channel = ctx.channel(); Unsafe unsafe = channel.unsafe(); ChannelOutboundBuffer buf = unsafe.outboundBuffer(); // 若是出站區有數據 if (buf != null) { // 拿到出站緩衝區的 對象 hashcode int messageHashCode = System.identityHashCode(buf.current()); // 拿到這個 緩衝區的 全部字節 long pendingWriteBytes = buf.totalPendingWriteBytes(); // 若是和以前的不相等,或者字節數不一樣,說明,輸出有變化,將 "最後一個緩衝區引用" 和 「剩餘字節數」 刷新 if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) { lastMessageHashCode = messageHashCode; lastPendingWriteBytes = pendingWriteBytes; // 若是寫操做沒有進行過,則任務寫的慢,不觸發空閒事件 if (!first) { return true; } } } } return false; }
寫了一些註釋,仍是再梳理一下吧:
整個邏輯以下:
流程圖
這裏有個問題,爲何第一次的時候必定要觸發事件呢?假設,客戶端開始變得很慢,這個時候,定時任務監聽發現時間到了,就進入這裏判斷,當上次記錄的緩衝區相關數據已經不一樣,這個時候難道觸發事件嗎?
實際上,這裏是 Netty 的一個考慮:假設真的發生了很寫出速度很慢的問題,極可能引起 OOM,相比叫鏈接空閒,這要嚴重多了。爲何第一次必定要觸發事件呢?若是不觸發,用戶根本不知道發送了什麼,當一次寫空閒事件觸發,隨後出現了 OOM,用戶能夠感知到:多是寫的太慢,後面的數據根本寫不進去,因此發生了OOM。因此,這裏的一次警告仍是必要的。
固然,這是個人一個猜想。有必要的話,能夠去 Netty 那裏提個 issue。
好,關於客戶端寫的慢的特殊處理告一段落。再看看另外一個任務的邏輯。
3. 全部事件的 run 方法
這個類叫作 AllIdleTimeoutTask ,表示這個監控着全部的事件。當讀寫事件發生時,都會記錄。代碼邏輯和寫事件的的基本一致,除了這裏:
long nextDelay = allIdleTimeNanos; if (!reading) { // 當前時間減去 最後一次寫或讀 的時間 ,若大於0,說明超時了 nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime); }
這裏的時間計算是取讀寫事件中的最大值來的。而後像寫事件同樣,判斷是否發生了寫的慢的狀況。最後調用 ctx.fireUserEventTriggered(evt) 方法。
一般這個使用的是最多的。構造方法通常是:
pipeline.addLast(new IdleStateHandler(0, 0, 30, TimeUnit.SECONDS));
讀寫都是 0 表示禁用,30 表示 30 秒內沒有任務讀寫事件發生,就觸發事件。注意,當不是 0 的時候,這三個任務會重疊。
IdleStateHandler 能夠實現心跳功能,當服務器和客戶端沒有任何讀寫交互時,並超過了給定的時間,則會觸發用戶 handler 的 userEventTriggered 方法。用戶能夠在這個方法中嘗試向對方發送信息,若是發送失敗,則關閉鏈接。
IdleStateHandler 的實現基於 EventLoop 的定時任務,每次讀寫都會記錄一個值,在定時任務運行的時候,經過計算當前時間和設置時間和上次事件發生時間的結果,來判斷是否空閒。
內部有 3 個定時任務,分別對應讀事件,寫事件,讀寫事件。一般用戶監聽讀寫事件就足夠了。
同時,IdleStateHandler 內部也考慮了一些極端狀況:客戶端接收緩慢,一次接收數據的速度超過了設置的空閒時間
。Netty 經過構造方法中的 observeOutput 屬性來決定是否對出站緩衝區的狀況進行判斷。
若是出站緩慢,Netty 不認爲這是空閒,也就不觸發空閒事件。但第一次不管如何也是要觸發的。由於第一次沒法判斷是出站緩慢仍是空閒。固然,出站緩慢的話,OOM 比空閒的問題更大。
因此,當你的應用出現了內存溢出,OOM之類,而且寫空閒極少發生(使用了 observeOutput 爲 true),那麼就須要注意是否是數據出站速度過慢。
默認 observeOutput 是 false,意思是,即便你的應用出站緩慢,Netty 認爲是寫空閒。
可見這個 observeOutput 的做用好像不是那麼重要,若是真的發生了出站緩慢,判斷是否空閒根本就不重要了,重要的是 OOM。因此 Netty 選擇了默認 false。
還有一個注意的地方:剛開始咱們說的 ReadTimeoutHandler ,就是繼承自 IdleStateHandler,當觸發讀空閒事件的時候,就觸發 ctx.fireExceptionCaught 方法,並傳入一個 ReadTimeoutException,而後關閉 Socket。
而 WriteTimeoutHandler 的實現不是基於 IdleStateHandler 的,他的原理是,當調用 write 方法的時候,會建立一個定時任務,任務內容是根據傳入的 promise 的完成狀況來判斷是否超出了寫的時間。當定時任務根據指定時間開始運行,發現 promise 的 isDone 方法返回 false,代表尚未寫完,說明超時了,則拋出異常。當 write 方法完成後,會打判定時任務。
好了,關於 Netty 自帶的心跳相關的類就介紹到這裏。這些功能對於開發穩定的高性能 RPC 相當重要。