Netty實現羣聊

如題,只是爲了實驗,我將全部的客戶端的channel存在靜態變量ChannelGroup實例中,對消息進行羣發。java

固然,若是實際的環境中,我估計要將channel存在緩存數據庫中,具體怎麼作,後面再研究。git

如今,咱們來作此次簡單的實驗:github

源代碼:https://github.com/YangZhouChaoFan/netty-learn/tree/master/netty-start數據庫


1:NettyServerbootstrap

package com.netty.start.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;


/**
 * 服務器類.
 */
public class NettyServer {

    public void start(int port) throws Exception {

        //建立接收者的事件循環組
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        //建立訪問者的事件循環組
        EventLoopGroup childGroup = new NioEventLoopGroup();

        try {
            //建立服務器引導程序
            ServerBootstrap b = new ServerBootstrap();
            //設置消息循環
            b.group(parentGroup, childGroup);
            //設置通道
            b.channel(NioServerSocketChannel.class);
            //配置通道參數:鏈接隊列的鏈接數
            b.option(ChannelOption.SO_BACKLOG, 1024);
            //設置客戶端請求的處理操做
            b.childHandler(new ChildChannelHandler());
            //綁定端口,並獲取通道io操做的結果
            ChannelFuture f = b.bind(port).sync();
            //等待服務端監聽端口關閉
            f.channel().closeFuture().sync();
        } finally {
            //關閉接收器事件循環
            parentGroup.shutdownGracefully();
            //關閉訪問者的事件循環
            childGroup.shutdownGracefully();
        }

    }

}


2:ChildChannelHandlerpromise

package com.netty.start.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * 客戶端通道處理類.
 */
public class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel e) throws Exception {

        ChannelPipeline pipeline = e.pipeline();
        // 以("\n")爲結尾分割的 解碼器
        pipeline.addLast(new LineBasedFrameDecoder(1024));
        // 字符串解碼 和 編碼
        pipeline.addLast(new StringDecoder());
        pipeline.addLast(new StringEncoder());
        //添加消息處理
        e.pipeline().addLast(new NettyServerHandler());

    }

}


3:NettyServerHandler緩存

package com.netty.start.server;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.SocketAddress;

/**
 * 服務器處理類.
 */
public class NettyServerHandler extends ChannelHandlerAdapter {

    static private Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);

    //建立頻道組
    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 鏈接通道.
     *
     * @param ctx
     * @param remoteAddress
     * @param localAddress
     * @param promise
     * @throws Exception
     */
    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        logger.info(remoteAddress + ":鏈接通道");
        super.connect(ctx, remoteAddress, localAddress, promise);
    }

    /**
     * 活躍通道.
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.info(ctx.channel().remoteAddress() + ":通道激活");
        super.channelActive(ctx);
        ctx.writeAndFlush("歡迎訪問服務器\r\n");
        channels.add(ctx.channel());
    }

    /**
     * 非活躍通道.
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        logger.info(ctx.channel().remoteAddress() + ":通道失效");
        super.channelInactive(ctx);
        channels.remove(ctx.channel());
    }

    /**
     * 接收消息.
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        logger.info(ctx.channel().remoteAddress() + ":" + msg);
        Channel currentChannel = ctx.channel();
        for (Channel channel : channels) {
            if (channel != currentChannel) {
                channel.writeAndFlush("[" + currentChannel.remoteAddress() + "]" + msg + "\n");
            }
        }
    }

    /**
     * 接收完畢.
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
    }

    /**
     * 關閉通道.
     *
     * @param ctx
     * @param promise
     * @throws Exception
     */
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        logger.info(ctx.channel().remoteAddress() + ":關閉通道");
        super.close(ctx, promise);
    }

    /**
     * 異常處理.
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.info("異常信息:" + cause.getMessage());
    }
}


4:NettyClient服務器

package com.netty.start.client;

import com.netty.start.server.ChildChannelHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.io.BufferedReader;
import java.io.InputStreamReader;

/**
 * 客戶端類.
 */
public class NettyClient {

    public void connect(String host, int port) throws Exception {

        //建立事件循環組
        EventLoopGroup group = new NioEventLoopGroup();

        try {

            //建立引導程序
            Bootstrap b = new Bootstrap();
            //設置消息循環
            b.group(group);
            //設置通道
            b.channel(NioSocketChannel.class);
            //配置通道參數:tcp不延遲
            b.option(ChannelOption.TCP_NODELAY, true);
            //設置通道處理
            b.handler(new ChannelHandler());
            //發起異步連接,等待輸入參數
            Channel channel = b.connect(host, port).sync().channel();
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
            while (true) {
                channel.writeAndFlush(in.readLine() + "\r\n");
            }

        } finally {
            //關閉
            group.shutdownGracefully();
        }

    }

}


5:ChannelHandler異步

package com.netty.start.client;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * 通道處理類.
 */
public class ChannelHandler extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {

        ChannelPipeline pipeline = socketChannel.pipeline();
        // 以("\n")爲結尾分割的 解碼器
        pipeline.addLast(new LineBasedFrameDecoder(1024));
        // 字符串解碼 和 編碼
        pipeline.addLast(new StringDecoder());
        pipeline.addLast(new StringEncoder());

        socketChannel.pipeline().addLast(new NettyClientHandler());

    }
}


6:NettyClientHandlersocket

package com.netty.start.client;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.SocketAddress;

/**
 * 客戶端處理類.
 */
public class NettyClientHandler extends ChannelHandlerAdapter {

    static private Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);

    /**
     * 鏈接通道.
     *
     * @param ctx
     * @param remoteAddress
     * @param localAddress
     * @param promise
     * @throws Exception
     */
    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        logger.info(remoteAddress + ":鏈接通道");
        super.connect(ctx, remoteAddress, localAddress, promise);
    }

    /**
     * 活躍通道.
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.info(ctx.channel().remoteAddress() + ":通道激活");
        super.channelActive(ctx);
    }

    /**
     * 非活躍通道.
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        logger.info(ctx.channel().remoteAddress() + ":通道失效");
        super.channelInactive(ctx);
    }

    /**
     * 接收消息.
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        logger.info(ctx.channel().remoteAddress() + ":" + msg);
    }

    /**
     * 接收完畢.
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
    }

    /**
     * 關閉通道.
     *
     * @param ctx
     * @param promise
     * @throws Exception
     */
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        super.close(ctx, promise);
    }

    /**
     * 異常處理.
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.info("異常信息:" + cause.getMessage());
    }
}


7:ServerTest

package com.netty.start.test;

import com.netty.start.server.NettyServer;

/**
 * Created by chenhao on 2016/3/17.
 */
public class ServerTest {

    public static void main(String[] args) throws Exception {

        NettyServer server = new NettyServer();
        server.start(3000);

    }

}


8:ClientTest

package com.netty.start.test;

import com.netty.start.client.NettyClient;

/**
 * Created by chenhao on 2016/3/17.
 */
public class ClientTest {

    public static void main(String[] args) throws Exception {

        NettyClient client = new NettyClient();
        client.connect("127.0.0.1", 3000);

    }
}


經過7和8兩個類,啓動測試實例。

相關文章
相關標籤/搜索