alluxio源碼解析-netty部分(2)

netty簡介

Netty是 一個異步事件驅動的網絡應用程序框架,用於快速開發可維護的高性能協議服務器和客戶端。
 
 

netty做爲alluxio中重要的通信組件

在常見的客戶端上傳,下載中,都會有netty的參與html

交互圖

關於這個圖,能夠看上篇文章的介紹:promise

https://www.cnblogs.com/victor2302/p/10490253.html緩存

 
 
netty做爲alluxio交互重要的組件,扮演者重要的角色:
  • 解耦ufs和worker緩存的功能
  • 解耦 BlockHandler和  ShortCircuitBlockHandler
  • 解耦異步上傳,同步上傳
  • 高性能傳輸
 

netty客戶端部分:

 1.固定的處理器:alluxio.network.netty.NettyClient

final Bootstrap boot = new Bootstrap();

boot.group(WORKER_GROUP)
    .channel(NettyUtils.getClientChannelClass(!(address instanceof InetSocketAddress)));
boot.option(ChannelOption.SO_KEEPALIVE, true);
boot.option(ChannelOption.TCP_NODELAY, true);
boot.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
if (NettyUtils.USER_CHANNEL_TYPE == ChannelType.EPOLL) {
  boot.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
}

// After 10 missed heartbeat attempts and no write activity, the server will close the channel.
final long timeoutMs = Configuration.getMs(PropertyKey.NETWORK_NETTY_HEARTBEAT_TIMEOUT_MS);
final long heartbeatPeriodMs = Math.max(timeoutMs / 10, 1);
boot.handler(new ChannelInitializer<Channel>() {
  @Override
  public void initChannel(Channel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();

    pipeline.addLast(RPCMessage.createFrameDecoder());
    pipeline.addLast(ENCODER);
    pipeline.addLast(DECODER);
    pipeline.addLast(new IdleStateHandler(0, heartbeatPeriodMs, 0, TimeUnit.MILLISECONDS));
    pipeline.addLast(new IdleWriteHandler());
  }
});

 

2.臨時的處理器:針對通用response註冊回調(ShortCircuitBlockHandler 調用)

public static ProtoMessage call(final NettyRPCContext context, ProtoMessage request)
      throws IOException {
    Channel channel = Preconditions.checkNotNull(context.getChannel());
    final Promise<ProtoMessage> promise = channel.eventLoop().newPromise();
    channel.pipeline().addLast(new RPCHandler(promise));
    channel.writeAndFlush(new RPCProtoMessage(request)).addListener((ChannelFuture future) -> {
      if (future.cause() != null) {
        future.channel().close();
        promise.tryFailure(future.cause());
      }
    });
    ProtoMessage message;
    try {
      message = promise.get(context.getTimeoutMs(), TimeUnit.MILLISECONDS);
    } catch (ExecutionException | TimeoutException e) {
      CommonUtils.closeChannel(channel);
      throw new IOException(e);
    } catch (InterruptedException e) {
      CommonUtils.closeChannel(channel);
      throw new RuntimeException(e);
    } finally {
      if (channel.isOpen()) {
        channel.pipeline().removeLast();
      }
    }
    if (message.isResponse()) {
      CommonUtils.unwrapResponseFrom(message.asResponse(), context.getChannel());
    }
    return message;
  }

 

3.臨時的處理器:針對讀寫操做註冊回調(BlockHandler)

  private NettyPacketReader(FileSystemContext context, WorkerNetAddress address,
      Protocol.ReadRequest readRequest) throws IOException {
    mContext = context;
    mAddress = address;
    mPosToRead = readRequest.getOffset();
    mReadRequest = readRequest;

    mChannel = mContext.acquireNettyChannel(address);
    mChannel.pipeline().addLast(new PacketReadHandler());
    mChannel.writeAndFlush(new RPCProtoMessage(new ProtoMessage(mReadRequest)))
        .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
  }

private NettyPacketWriter(FileSystemContext context, final WorkerNetAddress address, long id,
      long length, long packetSize, Protocol.RequestType type, OutStreamOptions options,
      Channel channel) {
    mContext = context;
    mAddress = address;
    mLength = length;
    Protocol.WriteRequest.Builder builder =
        Protocol.WriteRequest.newBuilder().setId(id).setTier(options.getWriteTier()).setType(type);
    if (type == Protocol.RequestType.UFS_FILE) {
      Protocol.CreateUfsFileOptions ufsFileOptions =
          Protocol.CreateUfsFileOptions.newBuilder().setUfsPath(options.getUfsPath())
              .setOwner(options.getOwner()).setGroup(options.getGroup())
              .setMode(options.getMode().toShort()).setMountId(options.getMountId()).build();
      builder.setCreateUfsFileOptions(ufsFileOptions);
    }
    mPartialRequest = builder.buildPartial();
    mPacketSize = packetSize;
    mChannel = channel;
    mChannel.pipeline().addLast(new PacketWriteResponseHandler());
  }

 

 

netty服務端:

註冊處理器列表:

alluxio.worker.netty.PipelineHandler
protected void initChannel(Channel ch) throws Exception {
  ChannelPipeline pipeline = ch.pipeline();
 
  final long timeoutMs = Configuration.getMs(PropertyKey.NETWORK_NETTY_HEARTBEAT_TIMEOUT_MS);
 
  // Decoders & Encoders
  pipeline.addLast("frameDecoder", RPCMessage.createFrameDecoder());
  pipeline.addLast("RPCMessageDecoder", new RPCMessageDecoder());
  pipeline.addLast("RPCMessageEncoder", new RPCMessageEncoder());
 
  // Idle Event Handlers
  pipeline.addLast("idleEventHandler", new IdleStateHandler(timeoutMs, 0, 0,
      TimeUnit.MILLISECONDS));
  pipeline.addLast("idleReadHandler", new IdleReadHandler());
  pipeline.addLast("heartbeatHandler", new HeartbeatHandler());
 
  // Block Handlers
  pipeline.addLast("blockReadHandler",
      new BlockReadHandler(NettyExecutors.BLOCK_READER_EXECUTOR,
          mWorkerProcess.getWorker(BlockWorker.class), mFileTransferType));
  pipeline.addLast("blockWriteHandler", new BlockWriteHandler(
      NettyExecutors.BLOCK_WRITER_EXECUTOR, mWorkerProcess.getWorker(BlockWorker.class),
      mWorkerProcess.getUfsManager()));
  pipeline.addLast("shortCircuitBlockReadHandler",
      new ShortCircuitBlockReadHandler(NettyExecutors.RPC_EXECUTOR,
          mWorkerProcess.getWorker(BlockWorker.class)));
  pipeline.addLast("shortCircuitBlockWriteHandler",
      new ShortCircuitBlockWriteHandler(NettyExecutors.RPC_EXECUTOR,
          mWorkerProcess.getWorker(BlockWorker.class)));
  pipeline.addLast("asyncCacheHandler", new AsyncCacheHandler(mRequestManager));
 
  // UFS Handlers
  pipeline.addLast("ufsFileWriteHandler", new UfsFileWriteHandler(
      NettyExecutors.FILE_WRITER_EXECUTOR, mWorkerProcess.getUfsManager()));
 
  // Unsupported Message Handler
  pipeline.addLast("unsupportedMessageHandler", new UnsupportedMessageHandler());
}

 

寫入或者讀取配置

alluxio.client.file.options.CreateFileOptions是FileSystem類createFile的第二個參數,能夠選定不一樣的寫入策略服務器

例如:網絡

  • MUST_CACHE(只寫入Alluxio,必須存儲在Alluxio中)
  • CACHE_THROUGH(嘗試緩存,同步寫入到UnderFS)
  • THROUGH(無緩存,同步寫入到UnderFS)
  • ASYNC_THROUGH(異步寫入到UnderFS,實現特性)
FileOutStream createFile(AlluxioURI path, CreateFileOptions options)
    throws FileAlreadyExistsException, InvalidPathException, IOException, AlluxioException;

 

而這種寫入選項,就是經過在傳遞netty message時,設置不一樣的標識,而後在netty中分派到不一樣的pipeline節點,處理各自的特性的框架

代碼實例:異步

是否須要寫入到ufs,則在UfsFileWriteHandler的acceptMessage方法中進行判斷的async

alluxio.worker.netty.UfsFileWriteHandler#acceptMessageide

  protected boolean acceptMessage(Object object) {
    if (!super.acceptMessage(object)) {
      return false;
    }
    Protocol.WriteRequest request = ((RPCProtoMessage) object).getMessage().asWriteRequest();
    return request.getType() == Protocol.RequestType.UFS_FILE;
  }
相關文章
相關標籤/搜索