Netty 4 實現一個 NettyClient

本文章爲做者原創,有問題的地方請提出指正。java

一、類繼承Diagram

二、定義EndPoint類

目前僅僅定義了2個方法,分別用來獲取本地或遠程服務器的地址。算法

package netty;

import java.net.InetSocketAddress;

/**
 * @author xfyou
 * @date 2019/8/28
 */
public interface EndPoint {

  /**
   * Return  the local Inet address
   *
   * @return The local Inet address to which this <code>EndPoint</code> is bound, or <code>null</code>
   * if this <code>EndPoint</code> does not represent a network connection.
   */
  InetSocketAddress getLocalAddress();

  /**
   * Return the remote Inet address
   *
   * @return The remote Inet address to which this <code>EndPoint</code> is bound, or <code>null</code>
   * if this <code>EndPoint</code> does not represent a network connection.
   */
  InetSocketAddress getRemoteAddress();

}

二、定義AbstractClass類

主要是定義幾個抽象方法:apache

  • doOpen - 建立引導類Bootstrap;
  • doConnect - 建立Channel並鏈接遠程服務器;
  • getChannel - 獲取已建立的Channel

另外,提供了2個公共的方法給外部調用:bootstrap

  • send - 發送消息(OutBound)
  • receive - 接收消息 (InBound)

內部私有的write()方法。write方法負責在connect成功後,把消息寫到遠程peer。翻閱源碼,咱們能夠看到以下的調用棧:服務器

  • channel.writeAndFlush
    • pipeline.writeAndFlush (pipleline爲channel實例所關聯的pipleline實例)
      • AbstractChannelHandlerContext.writeAndFlush (每一個ChannelHanlder都有一個對應的ChannelHandlerContext,能夠從這個ChannelHanlderConext獲取Channel、ChannelHanlder和ChannelPipeline)
        • AbstractChannelHandlerContext.write(在這個方法裏面有一個executor.inEventLoop()的判斷,這個地方很重要,它主要是判斷當前線程是不是EventLoop分配的線程,若是是則直接使用EventLoop分配的線程執行,不然會將當前要執行的任務封裝成一個Task,而後塞到一個LinkedBlockQueue裏面去等待後續的調度執行。這樣作的目的主要是就是把用戶線程的操做封裝成Task放入隊列,統一由I/O線程來處理)
package netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.NotImplementedException;

import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * @author xfyou
 * @date 2019/8/29
 */
@Slf4j
@RequiredArgsConstructor
abstract class AbstractClient implements EndPoint {

  @NonNull
  private String hostName;

  @NonNull
  private int port;

  @NonNull
  @Getter(value = AccessLevel.PROTECTED)
  private int connectionTimeout;

  protected final CountDownLatch countDownLatch = new CountDownLatch(1);
  protected String respMsg;

  @SneakyThrows
  public void send(Object message) {
    doOpen();
    doConnect();
    write(message);
  }

  @SneakyThrows
  public String receive() {
    boolean b = countDownLatch.await(getConnectionTimeout(), TimeUnit.MILLISECONDS);
    if (!b) {
      log.error("Timeout(" + getConnectionTimeout() + "ms) when receiving response message");
    }
    return respMsg;
  }

  private void write(Object message) {
    Channel channel = getChannel();
    if (null != channel) {
      ChannelFuture f = channel.writeAndFlush(byteBufferFrom(message)).syncUninterruptibly();
      if (!f.isSuccess()) {
        log.error("Failed to send message to " + getRemoteAddress() + f.cause().getMessage());
      }
    }
  }

  private ByteBuf byteBufferFrom(Object message) {
    return message instanceof String ? Unpooled.copiedBuffer((String) message, StandardCharsets.UTF_8) : Unpooled.copiedBuffer((byte[]) message);
  }

  @Override
  public InetSocketAddress getRemoteAddress() {
    return new InetSocketAddress(hostName, port);
  }

  @Override
  public InetSocketAddress getLocalAddress() {
    throw new NotImplementedException("This method is not need to be implemented");
  }

  /**
   * Open client.
   *
   * @throws Throwable
   */
  protected abstract void doOpen() throws Throwable;

  /**
   * Connect to server.
   *
   * @throws Throwable
   */
  protected abstract void doConnect() throws Throwable;

  /**
   * Get the connected channel.
   *
   * @return channel
   */
  protected abstract Channel getChannel();

}

四、定義NettyClient類

NettyClient類繼承了AbstractClient類,主要是實現了doOpen、doConnect、getChannel類;同時實現了一個自定義的ChannelHander用來在ChannelActive時獲取Channel以及有消息返回時讀取消息。異步

  • doOpen方法的實現。建立引導類並在引導類上註冊相關屬性;
    • 註冊NioEventLoopGroup,基於java NIO傳輸的一個線程池,線程池的默認大小爲:CPU核數*2。當一個新的Channel被建立後,Netty會從這個NioEventLoopGroup中選擇一個線程來爲此Channel建立一個關聯的EventLoop(用來監聽關聯Channel的全部的I/O事件,好比鏈接、斷開鏈接、讀、寫等);
    • 註冊NioSocketChannel類類型,這個類型說明將要建立的Channel的實例的類型,客戶端爲:NioSocketChannel,服務器端爲:NioServerSocketChannel;Bootstrap會根據這個class來建立一個BootstrapChannelFactory<NioSocketChannel>實例(Channel工廠類,用於未來在connect時建立Channel);
    • 設置相關Option選項
    • 註冊自定義的ChannelHandler,這些ChannelHandler會被註冊到與Channel相關聯的ChannelPipleline中,用來攔截消息並作相應的處理。
  • doConnect方法的實現。經過已建立的Channel來鏈接到遠程服務器。前面咱們已經在Bootstrap中設置的超時時間,因此connect時可使用忽略線程中斷阻塞的方式去鏈接,直到超時。connect時會先經過BootstrapChannelFactory<NioSocketChannel>來建立一個NioSocketChannel實例,並把這個NioSocketChannel實例註冊到NioEventGroup中去(從線程池中按某種算法選擇一個EventLoop來和當前的Channel創建對應關係,能夠是1:N,即一個EventLoop能夠對應多個Channel )。EventLoop同時也是一個EventLoopExecutor,EventLoop和Channel對應起來後就能夠處理全部這個Channel的I/O操做了。一句話,某個Channel的全部I/O操做都是線程池(NioEventGroup)中的某個I/O線程(EventLoopExecutor)來異步處理的。
package netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;

/**
 * @author xfyou
 * @date 2019/8/28
 */
@Slf4j
public class NettyClient extends AbstractClient {

  private Bootstrap bootstrap;

  private volatile Channel channel;

  private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup();

  public NettyClient(String hostName, int port, int connectionTimeout) {
    super(hostName, port, connectionTimeout);
  }

  private class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
      super.channelActive(ctx);
      channel = ctx.channel();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
      try {
        respMsg = msg.toString(StandardCharsets.UTF_8);
      } finally {
        countDownLatch.countDown();
        ctx.close();
      }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
      log.error("An exception was thrown, cause:" + cause.getMessage());
      ctx.close();
    }
  }

  @Override
  protected void doOpen() throws Throwable {
    bootstrap = new Bootstrap();
    bootstrap
        .group(NIO_GROUP)
        .remoteAddress(getRemoteAddress())
        .channel(NioSocketChannel.class)
        .option(ChannelOption.TCP_NODELAY, true)
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectionTimeout())
        .handler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new ClientHandler());
          }
        });
  }

  @Override
  public void doConnect() {
    ChannelFuture f = bootstrap.connect().syncUninterruptibly();
    if (!f.isSuccess() && null != f.cause()) {
      log.error("The client failed to connect the server:" + getRemoteAddress() + ",error message is:" + f.cause().getMessage());
    }
  }

  @Override
  protected Channel getChannel() {
    return channel;
  }

}

五、測試類

package netty;

import lombok.SneakyThrows;

/**
 * Test
 *
 * @author xfyou
 */
public class Test {

  @SneakyThrows
  public static void main(String[] args) {
    NettyClient client = new NettyClient("127.0.0.1", 8080, 45000);
    client.send("aaa".getBytes());
    // maybe do something else
    System.out.println(client.receive());
  }
  
}
相關文章
相關標籤/搜索