package com.simple.netty;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Set;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class UDPServer {
public static void main(String[] args) {
Bootstrap strap = new Bootstrap();
EventLoopGroup workerGroup = new NioEventLoopGroup();
strap.channel(NioDatagramChannel.class).group(workerGroup)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel)
throws Exception {
channel.pipeline().addLast(
new LoggingHandler(LogLevel.INFO),
new StringEncoder(), new StringDecoder());
channel.pipeline().addLast(
new MyHeartbeatChannelHandler());
}
});
try {
ChannelFuture future = strap.bind(7777).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
}
}
class MyHeartbeatChannelHandler extends
SimpleChannelInboundHandler<DatagramPacket> implements Runnable {
private Hashtable<String, Long> observe = new Hashtable<String, Long>();
public MyHeartbeatChannelHandler() {
new Thread(this).start();
}
@Override
protected void channelRead0(
ChannelHandlerContext paramChannelHandlerContext,
DatagramPacket paramI) throws Exception {
}
private int flag = 0;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
synchronized (Object.class) {
if (msg instanceof DatagramPacket) {
observe.put(((DatagramPacket) msg).sender().toString(),
System.currentTimeMillis());
}
System.out.println(++flag);
}
}
@Override
public boolean acceptInboundMessage(Object msg) throws Exception {
return super.acceptInboundMessage(msg);
}
@Override
public void run() {
while (true) {
synchronized (Object.class) {
Set<String> keys = observe.keySet();
Iterator<String> mIterator = keys.iterator();
while (mIterator.hasNext()) {
String k = mIterator.next();
long l = observe.get(k);
if (l > 0 && System.currentTimeMillis() - l > 3 * 1000) {
mIterator.remove();
System.out.println(k + "掉線");
}
}
}
}
}
}
java
package com.simple.netty;
import java.util.Random;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
public class UDPClient {
public static void main(String[] args) {
for (int i = 0; i < 1000; i++) {
new Thread() {
public void run() {
Bootstrap strap = new Bootstrap();
strap.channel(NioDatagramChannel.class)
.group(new NioEventLoopGroup())
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel)
throws Exception {
channel.pipeline().addLast(
new LoggingHandler(LogLevel.INFO),
new StringEncoder(),
new StringDecoder());
channel.pipeline().addLast("heartbeat",
new HeartbeatChannelHandler());
}
});
try {
ChannelFuture future = strap.connect("127.0.0.1", 7777)
.sync();
int number = 3 + new Random().nextInt(5);
while (true) {
future.channel()
.writeAndFlush("88888888")
.addListener(
new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(
Future<? super Void> paramF)
throws Exception {
System.out.println(paramF
.isSuccess());
}
});
Thread.sleep(1000);
--number;
if (number == 0)
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();
}
}
}
class HeartbeatChannelHandler extends
SimpleChannelInboundHandler<DatagramPacket> {
@Override
protected void channelRead0(
ChannelHandlerContext paramChannelHandlerContext,
DatagramPacket paramI) throws Exception {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ctx.writeAndFlush("8888888888888");
}
}bootstrap