這一篇文章主要介紹如何用Springboot 整合 Netty,因爲本人尚處於學習Netty的過程當中,並無將Netty 運用到實際生產項目的經驗,這裏也是在網上搜尋了一些Netty例子學習後總結來的,借鑑了他人的寫法和經驗。若有重複部分,還請見諒。java
關於SpringBoot 如何整合使用 Netty ,我將分爲如下幾步進行分析與討論:python
PS: 我這裏爲了簡單起見(主要是懶),將 Netty 服務端與客戶端放在了同一個SpringBoot工程裏,固然也能夠將客戶端和服務端分開。git
Netty 服務端的代碼其實比較簡單,代碼以下:github
@Component
@Slf4j
public class NettyServer {
/** * boss 線程組用於處理鏈接工做 */
private EventLoopGroup boss = new NioEventLoopGroup();
/** * work 線程組用於數據處理 */
private EventLoopGroup work = new NioEventLoopGroup();
@Value("${netty.port}")
private Integer port;
/** * 啓動Netty Server * * @throws InterruptedException */
@PostConstruct
public void start() throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, work)
// 指定Channel
.channel(NioServerSocketChannel.class)
//使用指定的端口設置套接字地址
.localAddress(new InetSocketAddress(port))
//服務端可鏈接隊列數,對應TCP/IP協議listen函數中backlog參數
.option(ChannelOption.SO_BACKLOG, 1024)
//設置TCP長鏈接,通常若是兩個小時內沒有數據的通訊時,TCP會自動發送一個活動探測數據報文
.childOption(ChannelOption.SO_KEEPALIVE, true)
//將小的數據包包裝成更大的幀進行傳送,提升網絡的負載,即TCP延遲傳輸
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new NettyServerHandlerInitializer());
ChannelFuture future = bootstrap.bind().sync();
if (future.isSuccess()) {
log.info("啓動 Netty Server");
}
}
@PreDestroy
public void destory() throws InterruptedException {
boss.shutdownGracefully().sync();
work.shutdownGracefully().sync();
log.info("關閉Netty");
}
}
複製代碼
由於咱們在springboot 項目中使用 Netty ,因此咱們將Netty 服務器的啓動封裝在一個 start()
方法,並使用 @PostConstruct
註解,在指定的方法上加上 @PostConstruct
註解來表示該方法在 Spring 初始化 NettyServer
類後調用。spring
考慮到使用心跳機制等操做,關於ChannelHandler邏輯處理鏈的部分將在後面進行闡述。json
Netty 客戶端代碼與服務端相似,代碼以下:bootstrap
@Component
@Slf4j
public class NettyClient {
private EventLoopGroup group = new NioEventLoopGroup();
@Value("${netty.port}")
private int port;
@Value("${netty.host}")
private String host;
private SocketChannel socketChannel;
public void sendMsg(MessageBase.Message message) {
socketChannel.writeAndFlush(message);
}
@PostConstruct
public void start() {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(host, port)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ClientHandlerInitilizer());
ChannelFuture future = bootstrap.connect();
//客戶端斷線重連邏輯
future.addListener((ChannelFutureListener) future1 -> {
if (future1.isSuccess()) {
log.info("鏈接Netty服務端成功");
} else {
log.info("鏈接失敗,進行斷線重連");
future1.channel().eventLoop().schedule(() -> start(), 20, TimeUnit.SECONDS);
}
});
socketChannel = (SocketChannel) future.channel();
}
}
複製代碼
上面還包含了客戶端斷線重連的邏輯,更多細節問題,將在下面進行闡述。segmentfault
在整合使用 Netty 的過程當中,咱們使用 Google 的protobuf定義消息格式,下面來簡單介紹下 protobufspringboot
Google 官方給 protobuf的定義以下:bash
Protocol Buffers 是一種輕便高效的結構化數據存儲格式,能夠用於結構化數據序列化,很適合作數據存儲或 RPC 數據交換格式。它可用於通信協議、數據存儲等領域的語言無關、平臺無關、可擴展的序列化結構數據格式。
在 Netty 中經常使用 protobuf 來作序列化方案,固然也能夠用 protobuf來構建 客戶端與服務端之間的通訊協議
咱們這裏是用 protobuf 作爲咱們的序列化手段,那咱們爲何要使用 protobuf,而不使用其餘序列化方案呢,好比 jdk 自帶的序列化,Thrift,fastjson等。
首先 jdk 自帶序列化手段有不少缺點,好比:
而 Google Protobuf 跨語言,支持C++、java和python。而後利用protobuf 編碼後的消息更小,有利於存儲和傳輸,而且其性能也很是高,相比其餘序列化框架,它也是很是有優點的,具體的關於Java 各類序列化框架比較此處就很少說了。總之,目前Google Protobuf 普遍的被使用到各類項目,它的諸多優勢讓咱們選擇使用它。
對於 Java 而言,使用 protobuf 主要有如下幾步:
.proto
文件中定義消息格式.proto
文件 成 Java 類這裏爲我Demo裏的 message.proto
文件爲例,以下:
//protobuf語法有 proto2和proto3兩種,這裏指定 proto3
syntax = "proto3";
// 文件選項
option java_package = "com.pjmike.server.protocol.protobuf";
option java_outer_classname = "MessageBase";
// 消息模型定義
message Message {
string requestId = 1;
CommandType cmd = 2;
string content = 3;
enum CommandType {
NORMAL = 0; //常規業務消息
HEARTBEAT_REQUEST = 1; //客戶端心跳消息
HEARTBEAT_RESPONSE = 2; //服務端心跳消息
}
}
複製代碼
文件解讀:
proto3
語法,若是沒有指定,編譯器默認使用 proto2
的語法。如今新項目中可能通常多用 proto3
的語法,proto3
比 proto2
支持更多的語言但更簡潔。若是首次使用 protobuf,能夠選擇使用 proto3
.proto
文件時,能夠標註一系列的選項,一些選項是文件級別的,好比上面的第二行和第三行,java_package
文件選項代表protocol編譯器編譯 .proto
文件生成的 Java 類所在的包,java_outer_classname
選項代表想要生成的 Java 類的名稱Message
中定義了具體的消息格式,我這裏定義了三個字段,每一個字段都有惟一的一個數字標識符,這些標識符用來在消息的二進制格式中識別各個字段的Message
中還添加了一個枚舉類型,該枚舉中含有類型 CommandType
中全部的值,每一個枚舉類型必須將其第一個類型映射爲 0,該0值爲默認值。消息模型定義
關於消息格式,此處我只是很是很是簡單的定義了幾個字段,requestId
表明消息Id,CommandType
表示消息的類型,這裏簡單分爲心跳消息類型和業務消息類型,而後content
就是具體的消息內容。這裏的消息格式定義是十分簡陋,真正的項目實戰中,關於自定義消息格式的要求是很是多的,是比較複雜的。
上面簡單的介紹了 protobuf的一些語法規則,關於 protobuf語法的更多介紹參考官方文檔:developers.google.com/protocol-bu…
.proto
編譯器編譯第一步已經定義好了 protobuf的消息格式,而後咱們用 .proto
文件的編譯器將咱們定義的 消息格式編譯生成對應的 Java類,以便於咱們在項目中使用該消息類。
關於protobuf編譯器的安裝這裏我就不細說,詳情見官方文檔: developers.google.com/protocol-bu…
安裝好編譯器之後,使用如下命令編譯.proto
文件:
protoc -I = ./ --java_out=./ ./Message.proto
複製代碼
-I
選項用於指定待編譯的 .proto
消息定義文件所在的目錄,該選項也能夠寫做爲 --proto_path
--java_out
選項表示生成 Java代碼後存放位置,對於不一樣語言,咱們的選項可能不一樣,好比生成C++代碼爲 --cpp_out
前面已經根據 .proto
消息定義文件生成的Java類,咱們這裏代碼根據 Message.proto
生成了MessageBase
類,可是要正常的使用生成的 Java 類,咱們還須要引入 protobuf-java的依賴:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.5.1</version>
</dependency>
複製代碼
使用 protobuf 生成的每個 Java類中,都會包含兩種內部類:Msg 和 Msg 包含的 Builder(這裏的Msg就是實際消息傳輸類)。具體是.proto
中定義的每個message 都會生成一個 Msg,每個Msg對應一個 Builder:
好比咱們使用 Builder來構建 Msg,例子以下:
public class MessageBaseTest {
public static void main(String[] args) {
MessageBase.Message message = MessageBase.Message.newBuilder()
.setRequestId(UUID.randomUUID().toString())
.setContent("hello world").build();
System.out.println("message: "+message.toString());
}
}
複製代碼
這裏就很少介紹protobuf-java API的相關用法了,更多詳情仍是參考官方文檔:developers.google.com/protocol-bu…
上面說了這麼多,消息傳輸格式已經定義好了,可是在客戶端和服務端傳輸過程當中咱們還須要對這種 protobuf格式進行編解碼,固然咱們能夠自定義消息的編解碼,protobuf-java
的API中提供了相關的序列化和反序列化方法。好消息是,Netty 爲了支持 protobuf提供了針對 protobuf的編解碼器,以下表所示(摘自《Netty實戰》) :
名稱 | 描述 |
---|---|
ProtobufDecoder | 使用 protobuf 對消息進行解碼 |
ProtobufEncoder | 使用 protobuf 對消息進行編碼 |
ProtobufVarint32FrameDecoder | 根據消息中的 Google Protocol Buffers 的 「Base 128 Varint" 整型長度字段值動態地分割所接收到的 ByteBuf |
ProtobufVarint32LengthFieldPrepender | 向 ByteBuf 前追加一個Google Protocol Buffers 的 「Base 128 Varint" 整型長度字段值 |
有了這些編解碼器,將其加入客戶端和服務端的 ChannelPipeline中以用於對消息進行編解碼,以下:
public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
//空閒檢測
.addLast(new ServerIdleStateHandler())
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(MessageBase.Message.getDefaultInstance()))
.addLast(new ProtobufVarint32LengthFieldPrepender())
.addLast(new ProtobufEncoder())
.addLast(new NettyServerHandler());
}
}
複製代碼
心跳是在TCP長鏈接中,客戶端與服務端之間按期發送的一種特殊的數據包,通知對方在線以確保TCP鏈接的有效性。
有兩種方式實現心跳機制:
TCP層面的 keepalive 機制咱們在以前構建 Netty服務端和客戶端啓動過程當中也有定義,咱們須要手動開啓,示例以下:
// 設置TCP的長鏈接,默認的 keepalive的心跳時間是兩個小時
childOption(ChannelOption.SO_KEEPALIVE, true)
複製代碼
除了開啓 TCP協議的 keepalive 以外,在我研究了github的一些開源Demo發現,人們每每也會自定義本身的心跳機制,定義心跳數據包。而Netty也提供了 IdleStateHandler 來實現心跳機制
下面來看看客戶端如何實現心跳機制:
@Slf4j
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
log.info("已經10s沒有發送消息給服務端");
//向服務端送心跳包
//這裏使用 protobuf定義的消息格式
MessageBase.Message heartbeat = new MessageBase.Message().toBuilder().setCmd(MessageBase.Message.CommandType.HEARTBEAT_REQUEST)
.setRequestId(UUID.randomUUID().toString())
.setContent("heartbeat").build();
//發送心跳消息,並在發送失敗時關閉該鏈接
ctx.writeAndFlush(heartbeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
複製代碼
咱們這裏建立了一個ChannelHandler類並重寫了userEventTriggered
方法,在該方法裏實現發送心跳數據包的邏輯,同時將 IdleStateEvent
類加入邏輯處理鏈上。
其實是當鏈接空閒時間太長時,將會觸發一個 IdleStateEvent
事件,而後咱們調用 userEventTriggered
來處理該 IdleStateEvent
事件。
當啓動客戶端和服務端以後,控制檯打印心跳消息以下:
2018-10-28 16:30:46.825 INFO 42648 --- [ntLoopGroup-2-1] c.pjmike.server.client.HeartbeatHandler : 已經10s沒有發送消息給服務端
2018-10-28 16:30:47.176 INFO 42648 --- [ntLoopGroup-4-1] c.p.server.server.NettyServerHandler : 收到客戶端發來的心跳消息:requestId: "80723780-2ce0-4b43-ad3a-53060a6e81ab"
cmd: HEARTBEAT_REQUEST
content: "heartbeat"
複製代碼
上面咱們只討論了客戶端發送心跳消息給服務端,那麼服務端還須要發心跳消息給客戶端嗎?
通常狀況是,對於長鏈接而言,一種方案是兩邊都發送心跳消息,另外一種是服務端做爲被動接收一方,若是一段時間內服務端沒有收到心跳包那麼就直接斷開鏈接。
咱們這裏採用第二種方案,只須要客戶端發送心跳消息,而後服務端被動接收,而後設置一段時間,在這段時間內若是服務端沒有收到任何消息,那麼就主動斷開鏈接,這也就是後面要說的 空閒檢測
通常有如下兩種狀況,Netty 客戶端須要重連服務端:
第一種狀況實現 ChannelFutureListener
用來監測鏈接是否成功,不成功就進行斷連重試機制,代碼以下:
@Component
@Slf4j
public class NettyClient {
private EventLoopGroup group = new NioEventLoopGroup();
@Value("${netty.port}")
private int port;
@Value("${netty.host}")
private String host;
private SocketChannel socketChannel;
public void sendMsg(MessageBase.Message message) {
socketChannel.writeAndFlush(message);
}
@PostConstruct
public void start() {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(host, port)
.handler(new ClientHandlerInitilizer());
ChannelFuture future = bootstrap.connect();
//客戶端斷線重連邏輯
future.addListener((ChannelFutureListener) future1 -> {
if (future1.isSuccess()) {
log.info("鏈接Netty服務端成功");
} else {
log.info("鏈接失敗,進行斷線重連");
future1.channel().eventLoop().schedule(() -> start(), 20, TimeUnit.SECONDS);
}
});
socketChannel = (SocketChannel) future.channel();
}
}
複製代碼
ChannelFuture添加一個監聽器,若是客戶端鏈接服務端失敗,調用 channel().eventLoop().schedule()
方法執行重試邏輯。
第二種狀況是運行過程當中 服務端忽然掛掉了,這種狀況咱們在處理數據讀寫的Handler中實現,代碼以下:
@Slf4j
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
@Autowired
private NettyClient nettyClient;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
log.info("已經10s沒有發送消息給服務端");
//向服務端送心跳包
MessageBase.Message heartbeat = new MessageBase.Message().toBuilder().setCmd(MessageBase.Message.CommandType.HEARTBEAT_REQUEST)
.setRequestId(UUID.randomUUID().toString())
.setContent("heartbeat").build();
//發送心跳消息,並在發送失敗時關閉該鏈接
ctx.writeAndFlush(heartbeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//若是運行過程當中服務端掛了,執行重連機制
EventLoop eventLoop = ctx.channel().eventLoop();
eventLoop.schedule(() -> nettyClient.start(), 10L, TimeUnit.SECONDS);
super.channelInactive(ctx);
}
}
複製代碼
咱們這裏直接在實現心跳機制的 Handler中重寫channelInactive
方法,而後在該方法中執行重試邏輯,這裏注入了 NettyClient
類,目的是方便調用 NettyClient
的start()
方法從新鏈接服務端
channelInactive()
方法是指若是當前Channel沒有鏈接到遠程節點,那麼該方法將會被調用。
空閒檢測是什麼?實際上空閒檢測是每隔一段時間,檢測這段時間內是否有數據讀寫。好比,服務端檢測一段時間內,是否收到客戶端發送來的數據,若是沒有,就及時釋放資源,關閉鏈接。
對於空閒檢測,Netty 特意提供了 IdleStateHandler 來實現這個功能。下面的代碼參考自《Netty 入門與實戰:仿寫微信 IM 即時通信系統》中空閒檢測部分的實現:
@Slf4j
public class ServerIdleStateHandler extends IdleStateHandler {
/** * 設置空閒檢測時間爲 30s */
private static final int READER_IDLE_TIME = 30;
public ServerIdleStateHandler() {
super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
log.info("{} 秒內沒有讀取到數據,關閉鏈接", READER_IDLE_TIME);
ctx.channel().close();
複製代碼
由於這是 SpringBoot 整合 Netty 的一個Demo,咱們建立一個Controller
方法對Netty 服務端與客戶端之間的通訊進行測試,controller代碼以下,很是簡單:
@RestController
public class ConsumerController {
@Autowired
private NettyClient nettyClient;
@GetMapping("/send")
public String send() {
MessageBase.Message message = new MessageBase.Message()
.toBuilder().setCmd(MessageBase.Message.CommandType.NORMAL)
.setContent("hello server")
.setRequestId(UUID.randomUUID().toString()).build();
nettyClient.sendMsg(message);
return "send ok";
}
}
複製代碼
注入 NettyClient
,調用其 sendMsg
方法發送消息,結果以下:
c.p.server.server.NettyServerHandler : 收到客戶端的業務消息:requestId: "aba74c28-1b6e-42b3-9f27-889e7044dcbf"
content: "hello server"
複製代碼
上面詳細闡述了 如何用 SpringBoot 整合 Netty ,其中借鑑不少前輩大佬的例子與文章,算是初步瞭解瞭如何使用 Netty。上文中若有錯誤之處,歡迎指出。github地址: github.com/pjmike/spri…