netty(5)高級篇-私有協議棧

來源:《Netty權威指南》  做者:李林峯

1、私有協議介紹

因爲現代軟件的複雜性,一個大型軟件系統每每會被人爲地拆分稱爲多個模塊,另外隨着移動互聯網的興起,網站的規模愈來愈大,業務功能愈來愈多,每每須要集羣和分佈式部署。模塊之間的通訊就須要進行跨節點通訊。
傳統的Java應用中節點通訊的經常使用方式:java

  • rmi遠程服務調用
  • Java Socket + Java序列化
  • RPC框架 Thrift、Apache的Avro等
  • 利用標準的公有協議進行跨節點調用,例如HTTP+XML,Restful+JSON或WebService

下面使用Netty設計私有協議node

除了鏈路層的物理鏈接外,還須要對請求和響應消息進行編解碼。 在請求和應答以外,還須要控制和管理類指令,例如鏈路創建的握手信息,鏈路檢測的心跳信息。這些功能組合到一塊兒後,就會造成私有協議。apache

  • 每一個Netty節點(Netty進程)之間創建長鏈接,使用Netty協議進行通訊。
  • Netty節點沒有客戶端和服務端的區別,誰首先發起鏈接,誰就是客戶端。

1. 網絡拓撲圖:

2. 協議棧功能描述:

  1. 基於Netty的NIO通訊框架,提供高性能的異步通訊能力;
  2. 提供消息的編解碼框架,實現POJO的序列化和反序列化
  3. 提供基於IP地址的白名單接入認證機制;
  4. 鏈路的有效性校驗機制;
  5. 鏈路的斷線重連機制;

3. 通訊模型:

具體步驟:bootstrap

  1. Netty協議棧客戶端發送握手請求信息,攜帶節點ID等有效身份認證信息;
  2. Netty協議服務端對握手請求消息進行合法性校驗,包括節點ID有效性校驗、節點重複登陸校驗和IP地址合法性校驗,校驗經過後,返回登陸成功的握手應答消息;
  3. 鏈路創建成功以後,客戶端發送業務消息;
  4. 鏈路成功以後,服務端發送心跳消息;
  5. 鏈路創建成功以後,客戶端發送心跳消息;
  6. 鏈路創建成功以後,服務端發送業務消息;
  7. 服務端退出時,服務端關閉鏈接,客戶端感知對方關閉鏈接後,被動關閉客戶端鏈接。

4. 消息定義

相似於http協議,消息分爲消息頭消息體。其中消息體是一個Object類型,消息頭則以下所示:數組

名稱 類型 長度 描述
length 整型 int 32 消息長度,整個消息,包括消息頭和消息體
sessionId 長整型long 64 集羣節點內全局惟一,由會話ID生成器生成
type Byte 8

0: 表示請求消息緩存

1: 業務響應消息安全

2: 業務ONE WAY消息(便是請求又是響應消息)服務器

3: 握手請求消息網絡

4: 握手應答消息session

5: 心跳請求消息

6: 心跳應答消息

priority Byte 8 消息優先級: 0-255
attachment Map<String,Object> 變長 可選字段,用於擴展消息頭

 

5. 支持的字段類型:

6. Netty協議的編解碼規範

編碼規範:

(1) crcCode: java.nio.ByteBuffer.putInt(int value),若是採用其它緩存區實現,必須與其等價

(2) length: java.nio.ByteBuffer.putInt(int value),若是採用其它緩衝區實現,必須與其等價

(3) sessionID: java.nio.ByteBuffer.putLong(long value),若是採用其它緩衝區實現,必須與其等價

(4) type: java.nio.ByteBuffer.put(byte b),若是採用其它緩衝區實現,必須與其等價

(5) priority: java.nio.ByteBuffer.put(byte b),若是採用其它緩衝區實現,必須與其等價

(6) attachment: 若是長度爲0,表示沒有可選附件,則將長度編碼爲0,即java.nio.ByteBuffer.putInt(0),若是大於0,表示有附件須要編碼,具體規則以下:

首先對附件的個數進行編碼,java.nio.ByteBuffer.putInt(attachment.size());

而後對Key進行編碼,先編碼長度,而後再將它轉換成byte數組以後編碼內容,具體代碼以下:

String key = null;
byte[] value = null;
for (Map.Entry<String, Object> param: attachment:entrySet()) {
    key = param.getKey();
    buffer.writeString(key);
    value = marshaller.writeObject(param.getValue());
    buffer.writeBinary(value);
}
key = null;
value = null;

(7) body的編碼: 經過JBoss Marshalling將其序列化爲byte數組,而後調用java.nio.ByteBuffer.put(byte[] src);將其寫入ByteBuffer緩衝區中。

在全部的內容都編碼完成以後更新消息頭的length字段。

 

解碼規範:

(1) crcCode: java.nio.ByteBuffer.getInt()獲取校驗碼字段,若是採用其它緩存區實現,必須與其等價

(2) length: java.nio.ByteBuffer.getInt()獲取Netty消息的長度,若是採用其它緩衝區實現,必須與其等價

(3) sessionID: java.nio.ByteBuffer.getLong()獲取會話ID,若是採用其它緩衝區實現,必須與其等價

(4) type: java.nio.ByteBuffer.get()獲取消息類型,若是採用其它緩衝區實現,必須與其等價

(5) priority: java.nio.ByteBuffer.get()獲取消息優先級,若是採用其它緩衝區實現,必須與其等價

(6) attachment: 它的解碼規則爲-首先建立一個新的attachment對象,調用java.nio.ByteBuffer.getInt()獲取附件的長度,若是爲0,說明附件爲空,解碼結束,解析解消息體,不然,根據長度經過for循環進行解碼。

(7) body: 使用JBoss marshaller對其進行解碼

 

7. 鏈路的創建

不區分客戶端和服務端:若是A節點須要B節點的服務,可是A和B之間尚未創建物理鏈路,則由調用方主動發起鏈接,此時調用方爲客戶端,被調用方爲服務端。

使用簡單的黑白名單進行認證,實際環境中,應該使用密鑰,用戶名密碼等方式。

客戶端發送請求消息:

  • 消息頭的type字段爲3;
  • 可選附件個數爲0;
  • 消息體爲空;
  • 握手消息的長度爲22個字節;

 

服務端接收到握手請求消息,若是IP校驗經過,返回握手成功應答給客戶端,應用層鏈路創建成功。握手應答消息:

  • 消息頭type爲4
  • 可選附件個數爲0
  • 消息體爲byte類型的結果,"0"表示認證成功,"-1"表示認證失敗。

鏈路成功創建後,客戶端和服務端就能夠相互發送業務消息了。

 

8. 鏈路的關閉

因爲採用長鏈接通訊,正常的業務運行期間,雙方經過心跳和業務消息維持鏈路,任何一方不須要主動關閉鏈接。

可是,在如下狀況下,客戶端和服務端須要關閉鏈接。

(1) 當對方宕機或者重啓時,會主動釋放鏈路,另外一方讀取到操做系統的通知信號,獲得對方REST鏈路,須要關閉鏈接,釋放自身的句柄等資源。因爲採用TCP全雙工通訊,通訊雙方都須要關閉鏈接,釋放資源;

(2) 消息在讀寫過程當中,發生了I/O異常,須要主動關閉鏈接;

(3) 心跳消息讀寫過程當中發生了I/O異常,須要主動關閉鏈接;

(4) 心跳超時,須要主動關閉鏈接;

(5) 發生編碼異常等不可恢復的錯誤時,須要主動關閉鏈接;

9. 可靠性設計

網絡環境是惡劣的。意外沒法避免,須要在出現意外的時候正常工做或者說是恢復,須要可靠性設計的保證。

(1) 心跳機制

在凌晨等業務低谷期,若是發生網絡閃斷、鏈接被Hang住等網絡問題,因爲沒有業務消息,應用進程很難發現。到了白天業務高峯期,會發生大量的網絡通訊失敗,嚴重的會致使一段時間進程內沒法處理業務消息。

爲了解決這個問題,在網絡空閒的時候採用心跳機制來檢測鏈路的互通性,一旦發現了網絡故障,當即關閉鏈路,主動重連。

設計思路:

  • 當網絡處於空閒時間達到了T(連續週期T沒有讀寫消息)時,客戶端主動發送Ping心跳消息給服務端;
  • 若是在下一個週期T到來時客戶端沒有收到對方發送的Pong心跳應答消息或者讀取到服務端發送的其餘業務消息,則心跳失敗計數器+1
  • 每當客戶端接收到服務的業務消息或者Pong應答消息時,將心跳失敗計數器清0;連續N次沒有接收到服務端的Pong消息或者業務消息,則關閉鏈路間隔INTERVAL時間後發起重連操做;
  • 服務端網絡空閒狀態持續時間達到T後,服務器端將心跳失敗計數器+1;只要接收到客戶端發送的Ping消息或者其餘業務消息,計數器清0
  • 服務器端連續N次沒有接收到客戶端的Ping消息或者其餘業務消息,則關閉鏈路,釋放資源,等待客戶端重連。
(2) 重連機制

若是鏈路中斷,等待INTERVAL時間後,由客戶端發起重連操做,若是重連失敗,間隔週期INTERVAL以後再繼續重連。

不管什麼場景下的重連失敗,客戶端必須保證自身資源被成功及時釋放

重連失敗,須要記錄異常堆棧信息,方便問題定位。

(3) 重複登陸保護

客戶端握手成功以後,鏈路處於正常狀態下,不容許客戶端重複登陸,以防止客戶端在異常狀態下反覆重連致使句柄資源被耗盡

server在接收到握手消息後,首先進行ip合法性校驗,若是成功,則在緩存的地址表中查看客戶端是否已經登陸,若是已經登陸,則拒絕重複登陸,返回錯誤碼-1,同時關閉鏈路,而且在服務端日誌中打印錯誤信息。

爲了防止由服務端和客戶端對鏈路狀態理解不一致的問題,當服務端連續N次心跳超時以後須要主動關閉鏈路,同時清空該客戶端的緩存信息,保證後續的客戶端能夠重連。

(5) 消息緩存重發

不管是客戶端仍是服務端,在發生鏈路中斷以後,恢復鏈路以前,緩存在消息隊列的待發送的消息不能丟失。同時考慮到內存溢出風險,應該在消息緩存隊列中設置上限。

10  可擴展性設計

Netty協議棧須要具有必定的擴展能力,例如統一的消息攔截、接口日誌、安全、加密解密等能夠被方便地添加和刪除,推薦使用Servelt的FilterChain機制,考慮到性能因素,不推薦AOP。

2、Netty協議棧開發

2.1 數據結構定義

無論心跳消息、握手請求和握手應答消息均可以用NettyMessage來定義,只是type不一樣而已。

消息頭:
import java.util.HashMap;
import java.util.Map;

/**
 * @author Lilinfeng
 * @version 1.0
 * @date 2014年3月14日
 */
public final class Header {

    private int crcCode = 0xabef0101;

    private int length;// 消息長度

    private long sessionID;// 會話ID

    private byte type;// 消息類型

    private byte priority;// 消息優先級

    private Map<String, Object> attachment = new HashMap<String, Object>(); // 附件

    /**
     * @return the crcCode
     */
    public final int getCrcCode() {
        return crcCode;
    }

    /**
     * @param crcCode the crcCode to set
     */
    public final void setCrcCode(int crcCode) {
        this.crcCode = crcCode;
    }

    /**
     * @return the length
     */
    public final int getLength() {
        return length;
    }

    /**
     * @param length the length to set
     */
    public final void setLength(int length) {
        this.length = length;
    }

    /**
     * @return the sessionID
     */
    public final long getSessionID() {
        return sessionID;
    }

    /**
     * @param sessionID the sessionID to set
     */
    public final void setSessionID(long sessionID) {
        this.sessionID = sessionID;
    }

    /**
     * @return the type
     */
    public final byte getType() {
        return type;
    }

    /**
     * @param type the type to set
     */
    public final void setType(byte type) {
        this.type = type;
    }

    /**
     * @return the priority
     */
    public final byte getPriority() {
        return priority;
    }

    /**
     * @param priority the priority to set
     */
    public final void setPriority(byte priority) {
        this.priority = priority;
    }

    /**
     * @return the attachment
     */
    public final Map<String, Object> getAttachment() {
        return attachment;
    }

    /**
     * @param attachment the attachment to set
     */
    public final void setAttachment(Map<String, Object> attachment) {
        this.attachment = attachment;
    }

    /*
     * (non-Javadoc)
     * 
     * @see java.lang.Object#toString()
     */
    @Override
    public String toString() {
        return "Header [crcCode=" + crcCode + ", length=" + length
                + ", sessionID=" + sessionID + ", type=" + type + ", priority="
                + priority + ", attachment=" + attachment + "]";
    }

}
消息:
/**
 * @author lilinfeng
 * @version 1.0
 * @date 2014年3月14日
 */
public final class NettyMessage {

    private Header header;

    private Object body;

    /**
     * @return the header
     */
    public final Header getHeader() {
        return header;
    }

    /**
     * @param header the header to set
     */
    public final void setHeader(Header header) {
        this.header = header;
    }

    /**
     * @return the body
     */
    public final Object getBody() {
        return body;
    }

    /**
     * @param body the body to set
     */
    public final void setBody(Object body) {
        this.body = body;
    }

    /*
     * (non-Javadoc)
     * 
     * @see java.lang.Object#toString()
     */
    @Override
    public String toString() {
        return "NettyMessage [header=" + header + "]";
    }
}

2.2 消息編解碼

因爲依賴於JBoss Marshalling...,添加maven依賴

        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling</artifactId>
            <version>1.4.10.Final</version>
        </dependency>
        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling-serial</artifactId>
            <version>1.4.10.Final</version>
        </dependency>

JBossMarshallingFactory:

import org.jboss.marshalling.*;

import java.io.IOException;

/**
 * @author Administrator
 * @version 1.0
 * @date 2014年3月15日
 */
public final class MarshallingCodecFactory {

    /**
     * 建立Jboss Marshaller
     *
     * @return
     * @throws IOException
     */
    protected static Marshaller buildMarshalling() throws IOException {
        final MarshallerFactory marshallerFactory = Marshalling
                .getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        Marshaller marshaller = marshallerFactory
                .createMarshaller(configuration);
        return marshaller;
    }

    /**
     * 建立Jboss Unmarshaller
     *
     * @return
     * @throws IOException
     */
    protected static Unmarshaller buildUnMarshalling() throws IOException {
        final MarshallerFactory marshallerFactory = Marshalling
                .getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        final Unmarshaller unmarshaller = marshallerFactory
                .createUnmarshaller(configuration);
        return unmarshaller;
    }
}

增長JBossMarshalling序列化對象->ByteBuf工具

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import org.jboss.marshalling.Marshaller;

import java.io.IOException;

/**
 * @author Lilinfeng
 * @version 1.0
 * @date 2014年3月14日
 */
@Sharable
public class MarshallingEncoder {

    private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
    Marshaller marshaller;

    public MarshallingEncoder() throws IOException {
        marshaller = MarshallingCodecFactory.buildMarshalling();
    }

    // 使用marshall對Object進行編碼,而且寫入bytebuf...
    protected void encode(Object msg, ByteBuf out) throws Exception {
        try {
            //1. 獲取寫入位置
            int lengthPos = out.writerIndex();
            //2. 先寫入4個bytes,用於記錄Object對象編碼後長度
            out.writeBytes(LENGTH_PLACEHOLDER);
            //3. 使用代理對象,防止marshaller寫完以後關閉byte buf
            ChannelBufferByteOutput output = new ChannelBufferByteOutput(out);
            //4. 開始使用marshaller往bytebuf中編碼
            marshaller.start(output);
            marshaller.writeObject(msg);
            //5. 結束編碼
            marshaller.finish();
            //6. 設置對象長度
            out.setInt(lengthPos, out.writerIndex() - lengthPos - 4);
        } finally {
            marshaller.close();
        }
    }
}
import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.ByteOutput;

import java.io.IOException;

/**
 * {@link ByteOutput} implementation which writes the data to a {@link ByteBuf}
 *
 *
 */
class ChannelBufferByteOutput implements ByteOutput {

    private final ByteBuf buffer;

    /**
     * Create a new instance which use the given {@link ByteBuf}
     */
    public ChannelBufferByteOutput(ByteBuf buffer) {
        this.buffer = buffer;
    }

    @Override
    public void close() throws IOException {
        // Nothing to do
    }

    @Override
    public void flush() throws IOException {
        // nothing to do
    }

    @Override
    public void write(int b) throws IOException {
        buffer.writeByte(b);
    }

    @Override
    public void write(byte[] bytes) throws IOException {
        buffer.writeBytes(bytes);
    }

    @Override
    public void write(byte[] bytes, int srcIndex, int length) throws IOException {
        buffer.writeBytes(bytes, srcIndex, length);
    }

    /**
     * Return the {@link ByteBuf} which contains the written content
     *
     */
    ByteBuf getBuffer() {
        return buffer;
    }
}

增長JBossMarshalling反序列化對象<-ByteBuf工具

import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.ByteInput;
import org.jboss.marshalling.Unmarshaller;

import java.io.IOException;
import java.io.StreamCorruptedException;

/**
 * @author Lilinfeng
 * @version 1.0
 * @date 2014年3月14日
 */
public class MarshallingDecoder {

    private final Unmarshaller unmarshaller;

    /**
     * Creates a new decoder whose maximum object size is {@code 1048576} bytes.
     * If the size of the received object is greater than {@code 1048576} bytes,
     * a {@link StreamCorruptedException} will be raised.
     *
     * @throws IOException
     */
    public MarshallingDecoder() throws IOException {
        unmarshaller = MarshallingCodecFactory.buildUnMarshalling();
    }

    protected Object decode(ByteBuf in) throws Exception {
        //1. 讀取第一個4bytes,裏面放置的是object對象的byte長度
        int objectSize = in.readInt();
        ByteBuf buf = in.slice(in.readerIndex(), objectSize);
        //2 . 使用bytebuf的代理類
        ByteInput input = new ChannelBufferByteInput(buf);
        try {
            //3. 開始解碼
            unmarshaller.start(input);
            Object obj = unmarshaller.readObject();
            unmarshaller.finish();
            //4. 讀完以後設置讀取的位置
            in.readerIndex(in.readerIndex() + objectSize);
            return obj;
        } finally {
            unmarshaller.close();
        }
    }
}
import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.ByteInput;

import java.io.IOException;

/**
 * {@link ByteInput} implementation which reads its data from a {@link ByteBuf}
 */
class ChannelBufferByteInput implements ByteInput {

    private final ByteBuf buffer;

    public ChannelBufferByteInput(ByteBuf buffer) {
        this.buffer = buffer;
    }

    @Override
    public void close() throws IOException {
        // nothing to do
    }

    @Override
    public int available() throws IOException {
        return buffer.readableBytes();
    }

    @Override
    public int read() throws IOException {
        if (buffer.isReadable()) {
            return buffer.readByte() & 0xff;
        }
        return -1;
    }

    @Override
    public int read(byte[] array) throws IOException {
        return read(array, 0, array.length);
    }

    @Override
    public int read(byte[] dst, int dstIndex, int length) throws IOException {
        int available = available();
        if (available == 0) {
            return -1;
        }

        length = Math.min(available, length);
        buffer.readBytes(dst, dstIndex, length);
        return length;
    }

    @Override
    public long skip(long bytes) throws IOException {
        int readable = buffer.readableBytes();
        if (readable < bytes) {
            bytes = readable;
        }
        buffer.readerIndex((int) (buffer.readerIndex() + bytes));
        return bytes;
    }

}

下面根據上述所說的進行對消息編解碼:

import demo.protocol.netty.struct.NettyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

import java.io.IOException;
import java.util.Map;

/**
 * Created by carl.yu on 2016/12/19.
 */
public class NettyMessageEncoder extends MessageToByteEncoder<NettyMessage> {

    MarshallingEncoder marshallingEncoder;

    public NettyMessageEncoder() throws IOException {
        this.marshallingEncoder = new MarshallingEncoder();
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, NettyMessage msg, ByteBuf sendBuf) throws Exception {
        if (null == msg || null == msg.getHeader()) {
            throw new Exception("The encode message is null");
        }
        //---寫入crcCode---
        sendBuf.writeInt((msg.getHeader().getCrcCode()));
        //---寫入length---
        sendBuf.writeInt((msg.getHeader().getLength()));
        //---寫入sessionId---
        sendBuf.writeLong((msg.getHeader().getSessionID()));
        //---寫入type---
        sendBuf.writeByte((msg.getHeader().getType()));
        //---寫入priority---
        sendBuf.writeByte((msg.getHeader().getPriority()));
        //---寫入附件大小---
        sendBuf.writeInt((msg.getHeader().getAttachment().size()));

        String key = null;
        byte[] keyArray = null;
        Object value = null;
        for (Map.Entry<String, Object> param : msg.getHeader().getAttachment()
                .entrySet()) {
            key = param.getKey();
            keyArray = key.getBytes("UTF-8");
            sendBuf.writeInt(keyArray.length);
            sendBuf.writeBytes(keyArray);
            value = param.getValue();
            // marshallingEncoder.encode(value, sendBuf);
        }
        // for gc
        key = null;
        keyArray = null;
        value = null;

        if (msg.getBody() != null) {
            marshallingEncoder.encode(msg.getBody(), sendBuf);
        } else
            sendBuf.writeInt(0);
        // 以前寫了crcCode 4bytes,除去crcCode和length 8bytes即爲更新以後的字節
        sendBuf.setInt(4, sendBuf.readableBytes() - 8);
    }
}
import demo.protocol.netty.struct.Header;
import demo.protocol.netty.struct.NettyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * @author Lilinfeng
 * @version 1.0
 * @date 2014年3月15日
 */
public class NettyMessageDecoder extends LengthFieldBasedFrameDecoder {

    MarshallingDecoder marshallingDecoder;

    public NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset,
                               int lengthFieldLength) throws IOException {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
        marshallingDecoder = new MarshallingDecoder();
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in)
            throws Exception {
        ByteBuf frame = (ByteBuf) super.decode(ctx, in);
        if (frame == null) {
            return null;
        }

        NettyMessage message = new NettyMessage();
        Header header = new Header();
        header.setCrcCode(frame.readInt());
        header.setLength(frame.readInt());
        header.setSessionID(frame.readLong());
        header.setType(frame.readByte());
        header.setPriority(frame.readByte());

        int size = frame.readInt();
        if (size > 0) {
            Map<String, Object> attch = new HashMap<String, Object>(size);
            int keySize = 0;
            byte[] keyArray = null;
            String key = null;
            for (int i = 0; i < size; i++) {
                keySize = frame.readInt();
                keyArray = new byte[keySize];
                frame.readBytes(keyArray);
                key = new String(keyArray, "UTF-8");
                attch.put(key, marshallingDecoder.decode(frame));
            }
            keyArray = null;
            key = null;
            header.setAttachment(attch);
        }
        if (frame.readableBytes() > 4) {
            message.setBody(marshallingDecoder.decode(frame));
        }
        message.setHeader(header);
        return message;
    }
}

關鍵在於解碼器繼承了LengthFieldBasedFrameDecoder,三個參數:

ch.pipeline().addLast(
                                    new NettyMessageDecoder(1024 * 1024, 4, 4));

第一個參數:1024*1024: 最大長度

第二個參數: 從第4個bytes開始表示是長度

第三個參數: 有4個bytes的長度表示是長度

2.3 握手和安全認證

Netty的機制大可能是基於Handler鏈。

client端在通道激活時構建login請求:

/**
 * @author Lilinfeng
 * @version 1.0
 * @date 2014年3月15日
 */
public class LoginAuthRespHandler extends ChannelInboundHandlerAdapter {

    private final static Log LOG = LogFactory.getLog(LoginAuthRespHandler.class);

    /**
     * 本地緩存
     */
    private Map<String, Boolean> nodeCheck = new ConcurrentHashMap<String, Boolean>();
    private String[] whitekList = {"127.0.0.1", "192.168.1.104"};

    /**
     * Calls {@link ChannelHandlerContext#fireChannelRead(Object)} to forward to
     * the next {@link ChannelHandler} in the {@link ChannelPipeline}.
     * <p>
     * Sub-classes may override this method to change behavior.
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        NettyMessage message = (NettyMessage) msg;

        // 若是是握手請求消息,處理,其它消息透傳
        if (message.getHeader() != null
                && message.getHeader().getType() == MessageType.LOGIN_REQ
                .value()) {
            String nodeIndex = ctx.channel().remoteAddress().toString();
            NettyMessage loginResp = null;
            // 重複登錄,拒絕
            if (nodeCheck.containsKey(nodeIndex)) {
                loginResp = buildResponse((byte) -1);
            } else {
                InetSocketAddress address = (InetSocketAddress) ctx.channel()
                        .remoteAddress();
                String ip = address.getAddress().getHostAddress();
                boolean isOK = false;
                for (String WIP : whitekList) {
                    if (WIP.equals(ip)) {
                        isOK = true;
                        break;
                    }
                }
                loginResp = isOK ? buildResponse((byte) 0)
                        : buildResponse((byte) -1);
                if (isOK)
                    nodeCheck.put(nodeIndex, true);
            }
            LOG.info("The login response is : " + loginResp
                    + " body [" + loginResp.getBody() + "]");
            ctx.writeAndFlush(loginResp);
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    private NettyMessage buildResponse(byte result) {
        NettyMessage message = new NettyMessage();
        Header header = new Header();
        header.setType(MessageType.LOGIN_RESP.value());
        message.setHeader(header);
        message.setBody(result);
        return message;
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        nodeCheck.remove(ctx.channel().remoteAddress().toString());// 刪除緩存
        ctx.close();
        ctx.fireExceptionCaught(cause);
    }
}

server端判斷是不是login請求,並對ip進行驗證:

/**
 * @author Lilinfeng
 * @version 1.0
 * @date 2014年3月15日
 */
public class LoginAuthReqHandler extends ChannelInboundHandlerAdapter {

    private static final Log LOG = LogFactory.getLog(LoginAuthReqHandler.class);

    /**
     * Calls {@link ChannelHandlerContext#fireChannelActive()} to forward to the
     * next {@link ChannelHandler} in the {@link ChannelPipeline}.
     * <p/>
     * Sub-classes may override this method to change behavior.
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(buildLoginReq());
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelRead(Object)} to forward to
     * the next {@link ChannelHandler} in the {@link ChannelPipeline}.
     * <p/>
     * Sub-classes may override this method to change behavior.
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        NettyMessage message = (NettyMessage) msg;

        // 若是是握手應答消息,須要判斷是否定證成功
        if (message.getHeader() != null
                && message.getHeader().getType() == MessageType.LOGIN_RESP
                .value()) {
            byte loginResult = (byte) message.getBody();
            if (loginResult != (byte) 0) {
                // 握手失敗,關閉鏈接
                ctx.close();
            } else {
                LOG.info("Login is ok : " + message);
                ctx.fireChannelRead(msg);
            }
        } else
            //調用下一個channel鏈..
            ctx.fireChannelRead(msg);
    }

    /**
     * 構建登陸請求
     */
    private NettyMessage buildLoginReq() {
        NettyMessage message = new NettyMessage();
        Header header = new Header();
        header.setType(MessageType.LOGIN_REQ.value());
        message.setHeader(header);
        return message;
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}

2.4 心跳機制檢測

握手成功以後,由客戶端主動發送心跳消息,服務端接收到心跳消息以後,返回應答,因爲心跳消息的目的是爲了檢測鏈路的可用性,所以不須要攜帶消息體。

/**
 * @author Lilinfeng
 * @version 1.0
 * @date 2014年3月15日
 */
public class HeartBeatReqHandler extends ChannelInboundHandlerAdapter {

    private static final Log LOG = LogFactory.getLog(HeartBeatReqHandler.class);

    //使用定時任務發送
    private volatile ScheduledFuture<?> heartBeat;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        NettyMessage message = (NettyMessage) msg;
        // 當握手成功後,Login響應向下透傳,主動發送心跳消息
        if (message.getHeader() != null
                && message.getHeader().getType() == MessageType.LOGIN_RESP
                .value()) {
            //NioEventLoop是一個Schedule,所以支持定時器的執行,建立心跳計時器
            heartBeat = ctx.executor().scheduleAtFixedRate(
                    new HeartBeatReqHandler.HeartBeatTask(ctx), 0, 5000,
                    TimeUnit.MILLISECONDS);
        } else if (message.getHeader() != null
                && message.getHeader().getType() == MessageType.HEARTBEAT_RESP
                .value()) {
            LOG.info("Client receive server heart beat message : ---> "
                    + message);
        } else
            ctx.fireChannelRead(msg);
    }

    //Ping消息任務類
    private class HeartBeatTask implements Runnable {
        private final ChannelHandlerContext ctx;

        public HeartBeatTask(final ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }

        @Override
        public void run() {
            NettyMessage heatBeat = buildHeatBeat();
            LOG.info("Client send heart beat messsage to server : ---> "
                    + heatBeat);
            ctx.writeAndFlush(heatBeat);
        }

        private NettyMessage buildHeatBeat() {
            NettyMessage message = new NettyMessage();
            Header header = new Header();
            header.setType(MessageType.HEARTBEAT_REQ.value());
            message.setHeader(header);
            return message;
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        if (heartBeat != null) {
            heartBeat.cancel(true);
            heartBeat = null;
        }
        ctx.fireExceptionCaught(cause);
    }
}
import demo.protocol.netty.MessageType;
import demo.protocol.netty.struct.Header;
import demo.protocol.netty.struct.NettyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 * @author Lilinfeng
 * @version 1.0
 * @date 2014年3月15日
 */
public class HeartBeatRespHandler extends ChannelInboundHandlerAdapter {

    private static final Log LOG = LogFactory.getLog(HeartBeatRespHandler.class);


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        NettyMessage message = (NettyMessage) msg;
        // 返回心跳應答消息
        if (message.getHeader() != null
                && message.getHeader().getType() == MessageType.HEARTBEAT_REQ
                .value()) {
            LOG.info("Receive client heart beat message : ---> "
                    + message);
            NettyMessage heartBeat = buildHeatBeat();
            LOG.info("Send heart beat response message to client : ---> "
                    + heartBeat);
            ctx.writeAndFlush(heartBeat);
        } else
            ctx.fireChannelRead(msg);
    }

    private NettyMessage buildHeatBeat() {
        NettyMessage message = new NettyMessage();
        Header header = new Header();
        header.setType(MessageType.HEARTBEAT_RESP.value());
        message.setHeader(header);
        return message;
    }

}

心跳超時的機制很是簡單,直接利用Netty的ReadTimeoutHandler進行實現,當必定週期內(50s)沒有接收到任何對方消息時,須要主動關閉鏈路。若是是客戶端,則從新發起鏈接,若是是服務端,則釋放資源,清除客戶端登陸緩存信息,等待服務器端重連。

2.5 斷線重連機制

在client感知到斷連事件以後,釋放資源,從新發起鏈接,具體代碼如如下部分

首先監聽網絡斷連事件,若是Channel關閉,則執行後續的重連任務,經過Bootstrap從新發起鏈接,客戶端掛在closeFuture上監聽鏈路關閉信號,一旦關閉,則建立定時器,重連。

服務端在監聽到斷連事件後,還須要清空緩存中的登陸認證註冊信息,以保證後續客戶端能夠正常重連。

2.6 客戶端代碼

public final class NettyConstant {
    public static final String REMOTEIP = "127.0.0.1";
    public static final int PORT = 8080;
    public static final int LOCAL_PORT = 12088;
    public static final String LOCALIP = "127.0.0.1";
}
import demo.protocol.netty.NettyConstant;
import demo.protocol.netty.codec.NettyMessageDecoder;
import demo.protocol.netty.codec.NettyMessageEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @author Lilinfeng
 * @version 1.0
 * @date 2014年3月15日
 */
public class NettyClient {

    private static final Log LOG = LogFactory.getLog(NettyClient.class);

    private ScheduledExecutorService executor = Executors
            .newScheduledThreadPool(1);

    EventLoopGroup group = new NioEventLoopGroup();

    public void connect(int port, String host) throws Exception {

        // 配置客戶端NIO線程組

        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                            ch.pipeline().addLast(
                                    new NettyMessageDecoder(1024 * 1024, 4, 4));
                            ch.pipeline().addLast("MessageEncoder",
                                    new NettyMessageEncoder());
                            ch.pipeline().addLast("readTimeoutHandler",
                                    new ReadTimeoutHandler(50));
                            ch.pipeline().addLast("LoginAuthHandler",
                                    new LoginAuthReqHandler());
                            ch.pipeline().addLast("HeartBeatHandler",
                                    new HeartBeatReqHandler());
                        }
                    });
            // 發起異步鏈接操做
            ChannelFuture future = b.connect(
                    new InetSocketAddress(host, port),
                    new InetSocketAddress(NettyConstant.LOCALIP,
                            NettyConstant.LOCAL_PORT)).sync();
            // 當對應的channel關閉的時候,就會返回對應的channel。
            // Returns the ChannelFuture which will be notified when this channel is closed. This method always returns the same future instance.
            future.channel().closeFuture().sync();
        } finally {
            // 全部資源釋放完成以後,清空資源,再次發起重連操做
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                        try {
                            connect(NettyConstant.PORT, NettyConstant.REMOTEIP);// 發起重連操做
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        new NettyClient().connect(NettyConstant.PORT, NettyConstant.REMOTEIP);
    }

}

 

2.7 服務端

import demo.protocol.netty.NettyConstant;
import demo.protocol.netty.codec.NettyMessageDecoder;
import demo.protocol.netty.codec.NettyMessageEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.io.IOException;

/**
 * @author Lilinfeng
 * @version 1.0
 * @date 2014年3月15日
 */
public class NettyServer {

    private static final Log LOG = LogFactory.getLog(NettyServer.class);

    public void bind() throws Exception {
        // 配置服務端的NIO線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 100)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch)
                            throws IOException {
                        ch.pipeline().addLast(
                                new NettyMessageDecoder(1024 * 1024, 4, 4));
                        ch.pipeline().addLast(new NettyMessageEncoder());
                        ch.pipeline().addLast("readTimeoutHandler",
                                new ReadTimeoutHandler(50));
                        ch.pipeline().addLast(new LoginAuthRespHandler());
                        ch.pipeline().addLast("HeartBeatHandler",
                                new HeartBeatRespHandler());
                    }
                });

        // 綁定端口,同步等待成功
        b.bind(NettyConstant.REMOTEIP, NettyConstant.PORT).sync();
        LOG.info("Netty server start ok : "
                + (NettyConstant.REMOTEIP + " : " + NettyConstant.PORT));
    }

    public static void main(String[] args) throws Exception {
        new NettyServer().bind();
    }
}

3、測試

3.1 正常測試

啓動server端,再啓動client端

2016-12-19 20:52:23 INFO  HeartBeatRespHandler:44 - Receive client heart beat message : ---> NettyMessage [header=Header [crcCode=-1410399999, length=18, sessionID=0, type=5, priority=0, attachment={}]]
2016-12-19 20:52:23 INFO  HeartBeatRespHandler:47 - Send heart beat response message to client : ---> NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionID=0, type=6, priority=0, attachment={}]]
2016-12-19 20:52:28 INFO  HeartBeatRespHandler:44 - Receive client heart beat message : ---> NettyMessage [header=Header [crcCode=-1410399999, length=18, sessionID=0, type=5, priority=0, attachment={}]]
2016-12-19 20:52:28 INFO  HeartBeatRespHandler:47 - Send heart beat response message to client : ---> NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionID=0, type=6, priority=0, attachment={}]]
2016-12-19 20:52:33 INFO  HeartBeatRespHandler:44 - Receive client heart beat message : ---> NettyMessage [header=Header [crcCode=-1410399999, length=18, sessionID=0, type=5, priority=0, attachment={}]]
2016-12-19 20:52:33 INFO  HeartBeatRespHandler:47 - Send heart beat response message to client : ---> NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionID=0, type=6, priority=0, attachment={}]]
2016-12-19 20:52:38 INFO  HeartBeatRespHandler:44 - Receive client heart beat message : ---> NettyMessage [header=Header [crcCode=-1410399999, length=18, sessionID=0, type=5, priority=0, attachment={}]]
2016-12-19 20:52:38 INFO  HeartBeatRespHandler:47 - Send heart beat response message to client : ---> NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionID=0, type=6, priority=0, attachment={}]]
2016-12-19 20:52:43 INFO  HeartBeatRespHandler:44 - Receive client heart beat message : ---> NettyMessage [header=Header [crcCode=-1410399999, length=18, sessionID=0, type=5, priority=0, attachment={}]]

3.2 服務端宕機重啓

關閉服務端,client因爲心跳,一直報錯:

io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: /127.0.0.1:8080
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:347)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:627)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:551)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:465)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:437)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    at java.lang.Thread.run(Thread.java:745)

須要測試信息以下:

(1) 客戶端是否可以正常發起重連

(2) 重連以後,再也不重連

(3) 斷連期間,心跳定時器中止工做,再也不發送心跳請求消息

(4) 服務器重啓成功後,容許客戶端從新登陸

(5) 服務器重啓成功之,客戶端可以重連和握手成功

(6) 重連成功以後,雙方的心跳可以正常護法

(7) 性能指標:重連期間,客戶端能源獲得了正常回收,不會致使句柄等資源泄露

使用vituralvm或者Jconsole工具,監控斷連期間,cpu,線程,堆內存等資源佔用正常.

重連以後,能夠繼續通訊

3.3 客戶端斷開重連

也能夠從新啓動,且清空緩存信息,清空代碼在LoginAuthHandler中的異常捕獲部分:

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        nodeCheck.remove(ctx.channel().remoteAddress().toString());// 刪除緩存
        ctx.close();
        ctx.fireExceptionCaught(cause);
    }
相關文章
相關標籤/搜索