Springboot 整合 Netty 實戰

前言

這一篇文章主要介紹如何用Springboot 整合 Netty,因爲本人尚處於學習Netty的過程當中,並無將Netty 運用到實際生產項目的經驗,這裏也是在網上搜尋了一些Netty例子學習後總結來的,借鑑了他人的寫法和經驗。若有重複部分,還請見諒。java

關於SpringBoot 如何整合使用 Netty ,我將分爲如下幾步進行分析與討論:python

  • 構建Netty 服務端
  • 構建Netty 客戶端
  • 利用protobuf定義消息格式
  • 服務端空閒檢測
  • 客戶端發送心跳包與斷線重連

PS: 我這裏爲了簡單起見(主要是懶),將 Netty 服務端與客戶端放在了同一個SpringBoot工程裏,固然也能夠將客戶端和服務端分開。git

構建 Netty 服務端

Netty 服務端的代碼其實比較簡單,代碼以下:github

@Component
@Slf4j
public class NettyServer {
    /** * boss 線程組用於處理鏈接工做 */
    private EventLoopGroup boss = new NioEventLoopGroup();
    /** * work 線程組用於數據處理 */
    private EventLoopGroup work = new NioEventLoopGroup();
    @Value("${netty.port}")
    private Integer port;
    /** * 啓動Netty Server * * @throws InterruptedException */
    @PostConstruct
    public void start() throws InterruptedException {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(boss, work)
                // 指定Channel
                .channel(NioServerSocketChannel.class)
                //使用指定的端口設置套接字地址
                .localAddress(new InetSocketAddress(port))

                //服務端可鏈接隊列數,對應TCP/IP協議listen函數中backlog參數
                .option(ChannelOption.SO_BACKLOG, 1024)

                //設置TCP長鏈接,通常若是兩個小時內沒有數據的通訊時,TCP會自動發送一個活動探測數據報文
                .childOption(ChannelOption.SO_KEEPALIVE, true)

                //將小的數據包包裝成更大的幀進行傳送,提升網絡的負載,即TCP延遲傳輸
                .childOption(ChannelOption.TCP_NODELAY, true)

                .childHandler(new NettyServerHandlerInitializer());
        ChannelFuture future = bootstrap.bind().sync();
        if (future.isSuccess()) {
            log.info("啓動 Netty Server");
        }
    }

    @PreDestroy
    public void destory() throws InterruptedException {
        boss.shutdownGracefully().sync();
        work.shutdownGracefully().sync();
        log.info("關閉Netty");
    }
}

複製代碼

由於咱們在springboot 項目中使用 Netty ,因此咱們將Netty 服務器的啓動封裝在一個 start()方法,並使用 @PostConstruct註解,在指定的方法上加上 @PostConstruct註解來表示該方法在 Spring 初始化 NettyServer類後調用。spring

考慮到使用心跳機制等操做,關於ChannelHandler邏輯處理鏈的部分將在後面進行闡述。json

構建 Netty 客戶端

Netty 客戶端代碼與服務端相似,代碼以下:bootstrap

@Component
@Slf4j
public class NettyClient {
    private EventLoopGroup group = new NioEventLoopGroup();
    @Value("${netty.port}")
    private int port;
    @Value("${netty.host}")
    private String host;
    private SocketChannel socketChannel;

    public void sendMsg(MessageBase.Message message) {
        socketChannel.writeAndFlush(message);
    }

    @PostConstruct
    public void start() {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .remoteAddress(host, port)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ClientHandlerInitilizer());
        ChannelFuture future = bootstrap.connect();
        //客戶端斷線重連邏輯
        future.addListener((ChannelFutureListener) future1 -> {
            if (future1.isSuccess()) {
                log.info("鏈接Netty服務端成功");
            } else {
                log.info("鏈接失敗,進行斷線重連");
                future1.channel().eventLoop().schedule(() -> start(), 20, TimeUnit.SECONDS);
            }
        });
        socketChannel = (SocketChannel) future.channel();
    }
}
複製代碼

上面還包含了客戶端斷線重連的邏輯,更多細節問題,將在下面進行闡述。segmentfault

使用 protobuf 構建通訊協議

在整合使用 Netty 的過程當中,咱們使用 Google 的protobuf定義消息格式,下面來簡單介紹下 protobufspringboot

protobuf簡介

Google 官方給 protobuf的定義以下:bash

Protocol Buffers 是一種輕便高效的結構化數據存儲格式,能夠用於結構化數據序列化,很適合作數據存儲或 RPC 數據交換格式。它可用於通信協議、數據存儲等領域的語言無關、平臺無關、可擴展的序列化結構數據格式。

在 Netty 中經常使用 protobuf 來作序列化方案,固然也能夠用 protobuf來構建 客戶端與服務端之間的通訊協議

爲何要用protobuf

咱們這裏是用 protobuf 作爲咱們的序列化手段,那咱們爲何要使用 protobuf,而不使用其餘序列化方案呢,好比 jdk 自帶的序列化,Thrift,fastjson等。

首先 jdk 自帶序列化手段有不少缺點,好比:

  • 序列化後的碼流太大
  • 性能過低
  • 沒法跨語言

而 Google Protobuf 跨語言,支持C++、java和python。而後利用protobuf 編碼後的消息更小,有利於存儲和傳輸,而且其性能也很是高,相比其餘序列化框架,它也是很是有優點的,具體的關於Java 各類序列化框架比較此處就很少說了。總之,目前Google Protobuf 普遍的被使用到各類項目,它的諸多優勢讓咱們選擇使用它。

怎麼使用protobuf

對於 Java 而言,使用 protobuf 主要有如下幾步:

  • .proto 文件中定義消息格式
  • 使用 protobuf 編譯器編譯 .proto文件 成 Java 類
  • 使用 Java 對應的 protobuf API來寫或讀消息

定義 protobuf 協議格式

這裏爲我Demo裏的 message.proto文件爲例,以下:

//protobuf語法有 proto2和proto3兩種,這裏指定 proto3
syntax = "proto3"; 
// 文件選項
option java_package = "com.pjmike.server.protocol.protobuf";
option java_outer_classname = "MessageBase";
// 消息模型定義
message Message {
    string requestId = 1;
    CommandType cmd = 2;
    string content = 3;
    enum CommandType {
        NORMAL = 0; //常規業務消息
        HEARTBEAT_REQUEST = 1; //客戶端心跳消息
        HEARTBEAT_RESPONSE = 2; //服務端心跳消息
    }
}
複製代碼

文件解讀:

  • 文中的第一行指定正在使用 proto3語法,若是沒有指定,編譯器默認使用 proto2的語法。如今新項目中可能通常多用 proto3的語法,proto3proto2支持更多的語言但更簡潔。若是首次使用 protobuf,能夠選擇使用 proto3
  • 定義 .proto文件時,能夠標註一系列的選項,一些選項是文件級別的,好比上面的第二行和第三行,java_package文件選項代表protocol編譯器編譯 .proto文件生成的 Java 類所在的包,java_outer_classname選項代表想要生成的 Java 類的名稱
  • Message中定義了具體的消息格式,我這裏定義了三個字段,每一個字段都有惟一的一個數字標識符,這些標識符用來在消息的二進制格式中識別各個字段的
  • Message中還添加了一個枚舉類型,該枚舉中含有類型 CommandType中全部的值,每一個枚舉類型必須將其第一個類型映射爲 0,該0值爲默認值。

消息模型定義

關於消息格式,此處我只是很是很是簡單的定義了幾個字段,requestId表明消息Id,CommandType表示消息的類型,這裏簡單分爲心跳消息類型和業務消息類型,而後content就是具體的消息內容。這裏的消息格式定義是十分簡陋,真正的項目實戰中,關於自定義消息格式的要求是很是多的,是比較複雜的。

上面簡單的介紹了 protobuf的一些語法規則,關於 protobuf語法的更多介紹參考官方文檔:developers.google.com/protocol-bu…

使用 .proto編譯器編譯

第一步已經定義好了 protobuf的消息格式,而後咱們用 .proto文件的編譯器將咱們定義的 消息格式編譯生成對應的 Java類,以便於咱們在項目中使用該消息類。

關於protobuf編譯器的安裝這裏我就不細說,詳情見官方文檔: developers.google.com/protocol-bu…

安裝好編譯器之後,使用如下命令編譯.proto文件:

protoc -I = ./ --java_out=./ ./Message.proto
複製代碼
  • -I 選項用於指定待編譯的 .proto消息定義文件所在的目錄,該選項也能夠寫做爲 --proto_path
  • --java_out選項表示生成 Java代碼後存放位置,對於不一樣語言,咱們的選項可能不一樣,好比生成C++代碼爲 --cpp_out
  • 在前兩個選項後再加上 待編譯的消息定義文件

使用 Java 對應 的 protobuf API來讀寫消息

前面已經根據 .proto消息定義文件生成的Java類,咱們這裏代碼根據 Message.proto生成了MessageBase類,可是要正常的使用生成的 Java 類,咱們還須要引入 protobuf-java的依賴:

<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.5.1</version>
</dependency>
複製代碼

使用 protobuf 生成的每個 Java類中,都會包含兩種內部類:Msg 和 Msg 包含的 Builder(這裏的Msg就是實際消息傳輸類)。具體是.proto中定義的每個message 都會生成一個 Msg,每個Msg對應一個 Builder:

  • Buidler提供了構建類,查詢類的API
  • Msg提供了查詢,序列化,反序列化的API

好比咱們使用 Builder來構建 Msg,例子以下:

public class MessageBaseTest {
    public static void main(String[] args) {
        MessageBase.Message message = MessageBase.Message.newBuilder()
                .setRequestId(UUID.randomUUID().toString())
                .setContent("hello world").build();
        System.out.println("message: "+message.toString());
    }
}
複製代碼

這裏就很少介紹protobuf-java API的相關用法了,更多詳情仍是參考官方文檔:developers.google.com/protocol-bu…

protobuf的編解碼器

上面說了這麼多,消息傳輸格式已經定義好了,可是在客戶端和服務端傳輸過程當中咱們還須要對這種 protobuf格式進行編解碼,固然咱們能夠自定義消息的編解碼,protobuf-java 的API中提供了相關的序列化和反序列化方法。好消息是,Netty 爲了支持 protobuf提供了針對 protobuf的編解碼器,以下表所示(摘自《Netty實戰》) :

名稱 描述
ProtobufDecoder 使用 protobuf 對消息進行解碼
ProtobufEncoder 使用 protobuf 對消息進行編碼
ProtobufVarint32FrameDecoder 根據消息中的 Google Protocol Buffers 的 「Base 128 Varint" 整型長度字段值動態地分割所接收到的 ByteBuf
ProtobufVarint32LengthFieldPrepender 向 ByteBuf 前追加一個Google Protocol Buffers 的 「Base 128 Varint" 整型長度字段值

有了這些編解碼器,將其加入客戶端和服務端的 ChannelPipeline中以用於對消息進行編解碼,以下:

public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline()
                //空閒檢測
                .addLast(new ServerIdleStateHandler())
                .addLast(new ProtobufVarint32FrameDecoder())
                .addLast(new ProtobufDecoder(MessageBase.Message.getDefaultInstance()))
                .addLast(new ProtobufVarint32LengthFieldPrepender())
                .addLast(new ProtobufEncoder())
                .addLast(new NettyServerHandler());
    }
}
複製代碼

客戶端心跳機制

心跳機制簡介

心跳是在TCP長鏈接中,客戶端與服務端之間按期發送的一種特殊的數據包,通知對方在線以確保TCP鏈接的有效性。

如何實現心跳機制

有兩種方式實現心跳機制:

  • 使用TCP協議層面的 keepalive 機制
  • 在應用層上自定義的心跳機制

TCP層面的 keepalive 機制咱們在以前構建 Netty服務端和客戶端啓動過程當中也有定義,咱們須要手動開啓,示例以下:

// 設置TCP的長鏈接,默認的 keepalive的心跳時間是兩個小時
childOption(ChannelOption.SO_KEEPALIVE, true)
複製代碼

除了開啓 TCP協議的 keepalive 以外,在我研究了github的一些開源Demo發現,人們每每也會自定義本身的心跳機制,定義心跳數據包。而Netty也提供了 IdleStateHandler 來實現心跳機制

Netty 實現心跳機制

下面來看看客戶端如何實現心跳機制:

@Slf4j
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
                log.info("已經10s沒有發送消息給服務端");
                //向服務端送心跳包
                //這裏使用 protobuf定義的消息格式
                MessageBase.Message heartbeat = new MessageBase.Message().toBuilder().setCmd(MessageBase.Message.CommandType.HEARTBEAT_REQUEST)
                        .setRequestId(UUID.randomUUID().toString())
                        .setContent("heartbeat").build();
                //發送心跳消息,並在發送失敗時關閉該鏈接
                ctx.writeAndFlush(heartbeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}
複製代碼

咱們這裏建立了一個ChannelHandler類並重寫了userEventTriggered方法,在該方法裏實現發送心跳數據包的邏輯,同時將 IdleStateEvent類加入邏輯處理鏈上。

其實是當鏈接空閒時間太長時,將會觸發一個 IdleStateEvent事件,而後咱們調用 userEventTriggered來處理該 IdleStateEvent事件。

當啓動客戶端和服務端以後,控制檯打印心跳消息以下:

2018-10-28 16:30:46.825  INFO 42648 --- [ntLoopGroup-2-1] c.pjmike.server.client.HeartbeatHandler  : 已經10s沒有發送消息給服務端
2018-10-28 16:30:47.176  INFO 42648 --- [ntLoopGroup-4-1] c.p.server.server.NettyServerHandler     : 收到客戶端發來的心跳消息:requestId: "80723780-2ce0-4b43-ad3a-53060a6e81ab"
cmd: HEARTBEAT_REQUEST
content: "heartbeat"
複製代碼

上面咱們只討論了客戶端發送心跳消息給服務端,那麼服務端還須要發心跳消息給客戶端嗎?

通常狀況是,對於長鏈接而言,一種方案是兩邊都發送心跳消息,另外一種是服務端做爲被動接收一方,若是一段時間內服務端沒有收到心跳包那麼就直接斷開鏈接。

咱們這裏採用第二種方案,只須要客戶端發送心跳消息,而後服務端被動接收,而後設置一段時間,在這段時間內若是服務端沒有收到任何消息,那麼就主動斷開鏈接,這也就是後面要說的 空閒檢測

Netty 客戶端斷線重連

通常有如下兩種狀況,Netty 客戶端須要重連服務端:

  • Netty 客戶端啓動時,服務端掛掉,連不上服務端
  • 在程序運行過程當中,服務端忽然掛掉

第一種狀況實現 ChannelFutureListener用來監測鏈接是否成功,不成功就進行斷連重試機制,代碼以下:

@Component
@Slf4j
public class NettyClient {
    private EventLoopGroup group = new NioEventLoopGroup();
    @Value("${netty.port}")
    private int port;
    @Value("${netty.host}")
    private String host;
    private SocketChannel socketChannel;

    public void sendMsg(MessageBase.Message message) {
        socketChannel.writeAndFlush(message);
    }

    @PostConstruct
    public void start() {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .remoteAddress(host, port)
                .handler(new ClientHandlerInitilizer());
        ChannelFuture future = bootstrap.connect();
        //客戶端斷線重連邏輯
        future.addListener((ChannelFutureListener) future1 -> {
            if (future1.isSuccess()) {
                log.info("鏈接Netty服務端成功");
            } else {
                log.info("鏈接失敗,進行斷線重連");
                future1.channel().eventLoop().schedule(() -> start(), 20, TimeUnit.SECONDS);
            }
        });
        socketChannel = (SocketChannel) future.channel();
    }
}
複製代碼

ChannelFuture添加一個監聽器,若是客戶端鏈接服務端失敗,調用 channel().eventLoop().schedule()方法執行重試邏輯。

第二種狀況是運行過程當中 服務端忽然掛掉了,這種狀況咱們在處理數據讀寫的Handler中實現,代碼以下:

@Slf4j
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
    @Autowired
    private NettyClient nettyClient;
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
                log.info("已經10s沒有發送消息給服務端");
                //向服務端送心跳包
                MessageBase.Message heartbeat = new MessageBase.Message().toBuilder().setCmd(MessageBase.Message.CommandType.HEARTBEAT_REQUEST)
                        .setRequestId(UUID.randomUUID().toString())
                        .setContent("heartbeat").build();
                //發送心跳消息,並在發送失敗時關閉該鏈接
                ctx.writeAndFlush(heartbeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        //若是運行過程當中服務端掛了,執行重連機制
        EventLoop eventLoop = ctx.channel().eventLoop();
        eventLoop.schedule(() -> nettyClient.start(), 10L, TimeUnit.SECONDS);
        super.channelInactive(ctx);
    }
}
複製代碼

咱們這裏直接在實現心跳機制的 Handler中重寫channelInactive方法,而後在該方法中執行重試邏輯,這裏注入了 NettyClient類,目的是方便調用 NettyClientstart()方法從新鏈接服務端

channelInactive()方法是指若是當前Channel沒有鏈接到遠程節點,那麼該方法將會被調用。

服務端空閒檢測

空閒檢測是什麼?實際上空閒檢測是每隔一段時間,檢測這段時間內是否有數據讀寫。好比,服務端檢測一段時間內,是否收到客戶端發送來的數據,若是沒有,就及時釋放資源,關閉鏈接。

對於空閒檢測,Netty 特意提供了 IdleStateHandler 來實現這個功能。下面的代碼參考自《Netty 入門與實戰:仿寫微信 IM 即時通信系統》中空閒檢測部分的實現:

@Slf4j
public class ServerIdleStateHandler extends IdleStateHandler {
    /** * 設置空閒檢測時間爲 30s */
    private static final int READER_IDLE_TIME = 30;
    public ServerIdleStateHandler() {
        super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
    }

    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        log.info("{} 秒內沒有讀取到數據,關閉鏈接", READER_IDLE_TIME);
        ctx.channel().close();
複製代碼

Controller方法測試

由於這是 SpringBoot 整合 Netty 的一個Demo,咱們建立一個Controller方法對Netty 服務端與客戶端之間的通訊進行測試,controller代碼以下,很是簡單:

@RestController
public class ConsumerController {
    @Autowired
    private NettyClient nettyClient;

    @GetMapping("/send")
    public String send() {
        MessageBase.Message message = new MessageBase.Message()
                .toBuilder().setCmd(MessageBase.Message.CommandType.NORMAL)
                .setContent("hello server")
                .setRequestId(UUID.randomUUID().toString()).build();
        nettyClient.sendMsg(message);
        return "send ok";
    }
}
複製代碼

注入 NettyClient,調用其 sendMsg方法發送消息,結果以下:

c.p.server.server.NettyServerHandler     : 收到客戶端的業務消息:requestId: "aba74c28-1b6e-42b3-9f27-889e7044dcbf"
content: "hello server"
複製代碼

小結

上面詳細闡述了 如何用 SpringBoot 整合 Netty ,其中借鑑不少前輩大佬的例子與文章,算是初步瞭解瞭如何使用 Netty。上文中若有錯誤之處,歡迎指出。github地址: github.com/pjmike/spri…

參考資料 & 鳴謝

相關文章
相關標籤/搜索