Netty學習(八)-Netty的心跳機制

版權聲明:本文爲博主原創文章,未經博主容許不得轉載。 https://blog.csdn.net/a953713428/article/details/69378412
咱們知道在TCP長鏈接或者WebSocket長鏈接中通常咱們都會使用心跳機制–即發送特殊的數據包來通告對方本身的業務尚未辦完,不要關閉連接。那麼心跳機制能夠用來作什麼呢?咱們知道網絡的傳輸是不可靠的,當咱們發起一個連接請求的過程之中會發生什麼事情誰都沒法預料,或者斷電,服務器重啓,斷網線之類。若是有這種狀況的發生對方也沒法判斷你是否還在線。因此這時候咱們引入心跳機制,在長連接中雙方沒有數據交互的時候互相發送數據(多是空包,也多是特殊數據),對方收到該數據以後也回覆相應的數據用以確保雙方都在線,這樣就能夠確保當前連接是有效的。bootstrap

1. 如何實現心跳機制
通常實現心跳機制由兩種方式:服務器

TCP協議自帶的心跳機制來實現;
在應用層來實現。
可是TCP協議自帶的心跳機制系統默認是設置的是2小時的心跳頻率。它檢查不到機器斷電、網線拔出、防火牆這些斷線。並且邏輯層處理斷線可能也不是那麼好處理。另外該心跳機制是與TCP協議綁定的,那若是咱們要是使用UDP協議豈不是用不了?因此通常咱們都不用。網絡

而通常咱們本身實現呢大體的策略是這樣的:socket

Client啓動一個定時器,不斷髮送心跳;
Server收到心跳後,作出迴應;
Server啓動一個定時器,判斷Client是否存在,這裏作判斷有兩種方法:時間差和簡單標識。
時間差:ide

收到一個心跳包以後記錄當前時間;
判判定時器到達時間,計算多久沒收到心跳時間=當前時間-上次收到心跳時間。若是改時間大於設定值則認爲超時。
簡單標識:oop

收到心跳後設置鏈接標識爲true;
判判定時器到達時間,若是未收到心跳則設置鏈接標識爲false;
今天咱們來看一下Netty的心跳機制的實現,在Netty中提供了IdleStateHandler類來進行心跳的處理,它能夠對一個 Channel 的 讀/寫設置定時器, 當 Channel 在必定事件間隔內沒有數據交互時(即處於 idle 狀態), 就會觸發指定的事件。測試

該類能夠對三種類型的超時作心跳機制檢測:this

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
1
2
3
readerIdleTimeSeconds:設置讀超時時間;
writerIdleTimeSeconds:設置寫超時時間;
allIdleTimeSeconds:同時爲讀或寫設置超時時間;
下面咱們仍是經過一個例子來說解IdleStateHandler的使用。.net

服務端:netty

public class HeartBeatServer {
private int port;

public HeartBeatServer(int port) {
this.port = port;
}

public void start(){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();

ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new HeartBeatServerChannelInitializer());

try {
ChannelFuture future = server.bind(port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}

public static void main(String[] args) {
HeartBeatServer server = new HeartBeatServer(7788);
server.start();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
服務端Initializer:

public class HeartBeatServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();

pipeline.addLast("handler",new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new HeartBeatServerHandler());
}
}
1
2
3
4
5
6
7
8
9
10
11
在這裏IdleStateHandler也是handler的一種,因此加入addLast。咱們分別設置4個參數:讀超時時間爲3s,寫超時和讀寫超時爲0,而後加入時間控制單元。

服務端handler:

public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter{
private int loss_connect_time = 0;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(ctx.channel().remoteAddress() + "Server :" + msg.toString());
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
//服務端對應着讀事件,當爲READER_IDLE時觸發
IdleStateEvent event = (IdleStateEvent)evt;
if(event.state() == IdleState.READER_IDLE){
loss_connect_time++;
System.out.println("接收消息超時");
if(loss_connect_time > 2){
System.out.println("關閉不活動的連接");
ctx.channel().close();
}
}else{
super.userEventTriggered(ctx,evt);
}
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
咱們看到在handler中調用了userEventTriggered方法,IdleStateEvent的state()方法一個有三個值:
READER_IDLE,WRITER_IDLE,ALL_IDLE。正好對應讀事件寫事件和讀寫事件。

再來寫一下客戶端:

public class HeartBeatsClient {
private int port;
private String address;

public HeartBeatsClient(int port, String address) {
this.port = port;
this.address = address;
}

public void start(){
EventLoopGroup group = new NioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new HeartBeatsClientChannelInitializer());

try {
ChannelFuture future = bootstrap.connect(address,port).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
group.shutdownGracefully();
}

}

public static void main(String[] args) {
HeartBeatsClient client = new HeartBeatsClient(7788,"127.0.0.1");
client.start();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
客戶端Initializer:

public class HeartBeatsClientChannelInitializer extends ChannelInitializer<SocketChannel> {

protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();

pipeline.addLast("handler", new IdleStateHandler(0, 3, 0, TimeUnit.SECONDS));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new HeartBeatClientHandler());
}
}
1
2
3
4
5
6
7
8
9
10
11
這裏咱們設置了IdleStateHandler的寫超時爲3秒,客戶端執行的動做爲寫消息到服務端,服務端執行讀動做。

客戶端handler:

public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {

private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
CharsetUtil.UTF_8));

private static final int TRY_TIMES = 3;

private int currentTime = 0;

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("激活時間是:"+new Date());
System.out.println("連接已經激活");
ctx.fireChannelActive();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("中止時間是:"+new Date());
System.out.println("關閉連接");
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("當前輪詢時間:"+new Date());
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
if(currentTime <= TRY_TIMES){
System.out.println("currentTime:"+currentTime);
currentTime++;
ctx.channel().writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
}
}
}
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = (String) msg;
System.out.println(message);
if (message.equals("Heartbeat")) {
ctx.write("has read message from server");
ctx.flush();
}
ReferenceCountUtil.release(msg);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
啓動服務端和客戶端咱們看到輸出爲:

 

咱們再來屢一下思路:

首先客戶端激活channel,由於客戶端中並無發送消息因此會觸發客戶端的IdleStateHandler,它設置的寫超時時間爲3s;
而後觸發客戶端的事件機制進入userEventTriggered方法,在觸發器中計數並向客戶端發送消息;
服務端接收消息;
客戶端觸發器繼續輪詢發送消息,直到計數器滿再也不向服務端發送消息;
服務端在IdleStateHandler設置的讀消息超時時間5s內未收到消息,觸發了服務端中handler的userEventTriggered方法,因而關閉客戶端的連接。
大致咱們的簡單心跳機制就是這樣的思路,經過事件觸發機制以及計數器的方式來實現,上面咱們的案例中最後客戶端沒有發送消息的時候咱們是強制斷開了客戶端的連接,那麼既然能夠關閉,咱們是否是也但是從新連接客戶端呢?由於萬一客戶端自己並不想關閉而是因爲別的緣由致使他沒法與服務端通訊。下面咱們來講一下重連機制。

當咱們的服務端在未讀到客戶端消息超時而關閉客戶端的時候咱們通常在客戶端的finally塊中方的是關閉客戶端的代碼,這時咱們能夠作一下修改的,finally是必定會被執行新的,因此咱們能夠在finally塊中從新調用一下啓動客戶端的代碼,這樣就又從新啓動了客戶端了,上客戶端代碼:

/**
* 本Client爲測試netty重連機制
* Server端代碼都同樣,因此不作修改
* 只用在client端中作一下判斷便可
*/
public class HeartBeatsClient2 {

private int port;
private String address;
ChannelFuture future;

public HeartBeatsClient2(int port, String address) {
this.port = port;
this.address = address;
}

public void start(){
EventLoopGroup group = new NioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new HeartBeatsClientChannelInitializer());

try {
future = bootstrap.connect(address,port).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
//group.shutdownGracefully();
if (null != future) {
if (future.channel() != null && future.channel().isOpen()) {
future.channel().close();
}
}
System.out.println("準備重連");
start();
System.out.println("重連成功");
}

}

public static void main(String[] args) { HeartBeatsClient2 client = new HeartBeatsClient2(7788,"127.0.0.1"); client.start(); }}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748其他部分的代碼與上面的實例並沒有異同,只需改造客戶端便可,咱們再運行服務端和客戶端會看到客戶端雖然被關閉了,可是立馬又被重啓:

相關文章
相關標籤/搜索