Netty學習-Echo服務器客戶端

Netty服務器構成

  1. 至少一個ChannelHandler——該組件實現了服務器對從客戶端接受的數據的處理,即它的業務邏輯
  2. 引導——配置服務器的啓動代碼。至少,它會將服務器綁定到它要監聽鏈接請求的端口上。

ChannelHandler和業務邏輯

 ChannelHandler是一個接口族的父接口,它的實現負責接受並響應事件通知,在Netty應用程序中,全部的數據處理邏輯都包含在這些核心抽象的實現中

 Echo服務器會響應傳入的消息,所以須要實現ChannelInboundHandler接口,用來定義響應入站事件的方法。因爲Echo服務器的應用程序只須要用到少許的方法,因此只須要繼承ChannelInboundHandlerAdapter類,它提供了ChannelInboundHandler的默認實現。

 在ChannelInboundHandler中,咱們感興趣的方法有:java

  1. channelRead()——對於每一個傳入的消息都要調用
  2. channelReadComplete()——通知ChannelInboundHandler最後一次對channelRead()的調用是當前批量讀取中的最後一條消息
  3. execeptionCaught()——在讀取操做期間,有異常拋出時會調用。

EchoServerHandler實現

package cn.sh.demo.echo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author sh
 * @ChannelHandler.Sharable 標示一個ChannelHandler能夠被多個Channel安全地共享
 */
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        //將接受到的消息輸出到客戶端
        System.out.println("Server received:" + in.toString(CharsetUtil.UTF_8));
        //將接收到的消息寫給發送者,而不沖刷出站消息
        ctx.write(in);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        //將消息沖刷到客戶端,而且關閉該Channel
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        //打印異常堆棧跟蹤
        cause.printStackTrace();
        //關閉該Channel
        ctx.close();
    }
}

備註git

  1. ChannelInbounHandlerAdapter每一個方法均可以被重寫而後掛鉤到事件生命週期的恰當點上。
  2. 重寫exceptionCaught()方法容許你對Throwable的任何子類型做出反應。
  3. 每一個Channel都擁有一個與之關聯的ChannelPipeline,ChannelPipeline持有一個ChannelHandler的實例鏈。在默認狀況下,ChannelHandler會把對方法的調用轉發給鏈中的下一個ChannelHandler。所以,若是exceptionCaught()方法沒有被該鏈中的某處實現,那麼異常將會被傳遞到ChannelPipeline的末端進行記錄

引導服務器

主要涉及的內容github

  1. 綁定監聽並接受傳入鏈接請求的端口
  2. 配置Channel,將有關的入站消息通知給EchoServerHandler實例

Echo服務引導示例代碼bootstrap

package cn.sh.demo.echo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

public class EchoServer {

    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void startServer() throws InterruptedException {
        EchoServerHandler serverHandler = new EchoServerHandler();
        //建立EventLoopGroup
        EventLoopGroup group = new NioEventLoopGroup();
        //建立ServerBootstrap
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(group)
                //指定所使用的NIO傳輸Channel
                .channel(NioServerSocketChannel.class)
                //使用指定的端口套接字
                .localAddress(new InetSocketAddress(port))
                //添加一個EchoServerHandler到子Channel的ChannelPipeline
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        //此處因爲EchoServerHandler被註解標註爲@Shareble,因此咱們老是使用相同的實例
                        channel.pipeline().addLast(serverHandler);
                    }
                });
        try {
            //異步的綁定服務器,調用sync()方法阻塞等待直到綁定完成
            ChannelFuture channelFuture = bootstrap.bind().sync();
            //獲取Channel的CloseFuture,而且阻塞當前線程直到它完成
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //關閉EventLoopGroup,釋放全部的資源
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        if (args.length != 1) {
            System.err.println("參數類型或者個數不正確");
            return;
        }
        //設置端口值
        int port = Integer.parseInt(args[0]);
        //啓動Echo服務器
        new EchoServer(port).startServer();
    }
}

備註數組

  1. 此處使用了一個特殊的類——ChannelInitializer。當一個新的鏈接被接受時,一個新的子Channel會被建立,此時ChannelInitializer就會把一個EchoServerHandler的示例添加到該Channel的ChannelPipeline中,這個ChannelHandler將會收到有關入站消息的通知。

回顧引導服務

  1. 建立一個ServerBootStrap實例來引導和綁定服務器
  2. 建立並分配一個NioEventLoopGroup實例進行事件的處理,如接受新鏈接以及讀/寫數據
  3. 指定服務器綁定的本地InetSocketAddress
  4. 使用EchoServerHandler的實例初始化每個新的Channel
  5. 調用ServerBootstrap.bind()方法來綁定服務器

Echo客戶端

客戶端主要包括的操做:安全

  1. 鏈接到服務器
  2. 發送一個或多個消息
  3. 對於每一個消息,等待並接受從服務器發回相同的消息
  4. 關閉鏈接

編寫客戶端主要包括業務邏輯和引導服務器

ChannelHandler實現客戶端邏輯

在該示例中,咱們使用SimpleChannelInboundHandler類來處理全部的事件,主要的方法有:異步

  1. channelActive()——在和服務器的鏈接已經創建以後被調用
  2. channelRead0()——當從服務器接收到一條消息時被調用
  3. exceptionCaught()——在處理過程當中引起異常時被調用

示例代碼以下:socket

package cn.sh.demo.echo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

/**
 * @author sh
 * @ChannelHandler.Sharable 標記該類的示例能夠被多個Channel共享
 */
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        //當一個鏈接被服務器接受並創建後,發送一條消息
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Netty", CharsetUtil.UTF_8));
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
        //記錄客戶端接收到服務器的消息
        System.out.println("Client received:" + byteBuf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 在發生異常時,記錄錯誤並關閉Channel
     * @param ctx
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

備註

 每次在接受數據時,都會調用channelRead0()方法。須要注意的是,由服務器發送的消息可能會被分塊接受。也就是說,若是服務器發送了5字節,那麼不能保證這5字節會被一次性接受。即便是對於這麼少許的數據,channelRead0()方法也可能會被調用兩次,第一次使用一個持有3字節的ByteBuf(Netty的字節容器),第二次使用一個持有2字節的ByteBuf。做爲一個面向流的協議,TCP保證了字節數組會按照服務器發送它們的順序被接受。ide

爲何客戶端使用SimpleChannelInboundHandler而不是ChannelInboundHandlerAdapter?

主要和業務邏輯如何處理消息以及Netty如何管理資源有關

客戶端中,當channelRead0()方法完成時,已經接受了消息而且處理完畢,當該方法返回時,SimpleChannelInboundHandler負責釋放指向保存該消息的ByteBuf的內存引用

可是在服務器端,你須要將消息返回給客戶端,write()操做是異步的,直到channelRead()方法返回後有可能仍然沒有完成,ChannelInboundHandlerAdapter在這個時間點上不會釋放消息。

服務端的消息是在channelComplete()方法中,經過writeAndFlush()方法調用時被釋放

引導客戶端

客戶端使用主機和端口參數來鏈接遠程地址,也就是Echo服務器的地址,而不是綁定到一個一直被監聽的端口。

示例代碼以下:

package cn.sh.demo.echo;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

public class EchoClient {

    private final String host;

    private final int port;

    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        //建立客戶端引導器
        Bootstrap bootstrap = new Bootstrap();
        //指定使用NioEventLoopGroup來處理客戶端事件
        bootstrap.group(group)
                //指定使用NIO傳輸的Channel類型
                .channel(NioSocketChannel.class)
                //設置服務器的InetSocketAddress
                .remoteAddress(new InetSocketAddress(host, port))
                //在建立Channel時,向ChannelPipeline中添加一個EchoHandler實例
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new EchoClientHandler());
                    }
                });
        try {
            //鏈接到遠程節點,阻塞等待直到鏈接完成
            ChannelFuture future = bootstrap.connect().sync();
            //阻塞直到Channel關閉
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //關閉線程池而且釋放全部的資源
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        if (args.length != 2) {
            System.err.println("參數個數不正確");
            return;
        }
        int port = Integer.parseInt(args[1]);
        new EchoClient(args[0], port).start();
    }
}

備註

服務器和客戶端均使用了NIO傳輸,可是,客戶端和服務端能夠使用不一樣的傳輸,例如,在服務器使用NIO傳輸,客戶端能夠使用OIO傳輸

回顧引導服務器

  1. 建立一個Bootstrap實例,引導並建立客戶端
  2. 建立一個NioEventLoopGroup實例來進行事件處理,其中事件處理包括建立新的鏈接以及處理入站和出站數據
  3. 爲服務器鏈接建立了一個InetSocketAddress實例
  4. 當鏈接創建時,一個EchoClientHandler實例會被添加到(該Channel)的ChannelPipeline中
  5. 設置完成後,調用Bootstrap.connetc()鏈接到遠程節點

運行程序

  1. 啓動服務端
  2. 再啓動客戶端

服務端的輸出以下:

服務端輸出

客戶端的輸出以下:

客戶端輸出

代碼地址

該文章的示例代碼位於cn.sh.demo.echo包下。

示例代碼,點擊查看

相關文章
相關標籤/搜索