基於Netty UDP實現的簡單心跳機制

package com.simple.netty;

import java.util.Hashtable;
import java.util.Iterator;
import java.util.Set;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class UDPServer {
    public static void main(String[] args) {
        Bootstrap strap = new Bootstrap();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        strap.channel(NioDatagramChannel.class).group(workerGroup)
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel channel)
                            throws Exception {
                        channel.pipeline().addLast(
                                new LoggingHandler(LogLevel.INFO),
                                new StringEncoder(), new StringDecoder());
                        channel.pipeline().addLast(
                                new MyHeartbeatChannelHandler());
                    }
                });
        try {
            ChannelFuture future = strap.bind(7777).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

}

class MyHeartbeatChannelHandler extends
        SimpleChannelInboundHandler<DatagramPacket> implements Runnable {
    private Hashtable<String, Long> observe = new Hashtable<String, Long>();

    public MyHeartbeatChannelHandler() {
        new Thread(this).start();
    }

    @Override
    protected void channelRead0(
            ChannelHandlerContext paramChannelHandlerContext,
            DatagramPacket paramI) throws Exception {

    }

    private int flag = 0;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        synchronized (Object.class) {
            if (msg instanceof DatagramPacket) {
                observe.put(((DatagramPacket) msg).sender().toString(),
                        System.currentTimeMillis());
            }
            System.out.println(++flag);
        }
    }

    @Override
    public boolean acceptInboundMessage(Object msg) throws Exception {
        return super.acceptInboundMessage(msg);
    }

    @Override
    public void run() {
        while (true) {
            synchronized (Object.class) {
                Set<String> keys = observe.keySet();
                Iterator<String> mIterator = keys.iterator();
                while (mIterator.hasNext()) {
                    String k = mIterator.next();
                    long l = observe.get(k);
                    if (l > 0 && System.currentTimeMillis() - l > 3 * 1000) {
                        mIterator.remove();
                        System.out.println(k + "掉線");
                    }
                }
            }
        }
    }
}
java

package com.simple.netty;

import java.util.Random;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

public class UDPClient {
    public static void main(String[] args) {

        for (int i = 0; i < 1000; i++) {
            new Thread() {
                public void run() {
                    Bootstrap strap = new Bootstrap();

                    strap.channel(NioDatagramChannel.class)
                            .group(new NioEventLoopGroup())
                            .handler(new ChannelInitializer<Channel>() {
                                @Override
                                protected void initChannel(Channel channel)
                                        throws Exception {
                                    channel.pipeline().addLast(
                                            new LoggingHandler(LogLevel.INFO),
                                            new StringEncoder(),
                                            new StringDecoder());
                                    channel.pipeline().addLast("heartbeat",
                                            new HeartbeatChannelHandler());
                                }
                            });
                    try {
                        ChannelFuture future = strap.connect("127.0.0.1", 7777)
                                .sync();

                        int number = 3 + new Random().nextInt(5);

                        while (true) {
                            future.channel()
                                    .writeAndFlush("88888888")
                                    .addListener(
                                            new GenericFutureListener<Future<? super Void>>() {
                                                @Override
                                                public void operationComplete(
                                                        Future<? super Void> paramF)
                                                        throws Exception {
                                                    System.out.println(paramF
                                                            .isSuccess());
                                                }
                                            });
                            Thread.sleep(1000);
                            --number;
                            if (number == 0)
                                break;
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                };
            }.start();
        }

    }
}

class HeartbeatChannelHandler extends
        SimpleChannelInboundHandler<DatagramPacket> {
    @Override
    protected void channelRead0(
            ChannelHandlerContext paramChannelHandlerContext,
            DatagramPacket paramI) throws Exception {
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        ctx.writeAndFlush("8888888888888");
    }
}bootstrap

相關文章
相關標籤/搜索