block做爲alluxio文件讀取或者存儲的最小基本單位,都是經過BlockOutStream和BlockInputtream實現的網絡
其中具體的數據包傳輸有Short circuit和netty兩種實現:架構
輸入流代碼中,關於兩種實現的選擇邏輯:tcp
1 public static BlockInStream create(FileSystemContext context, BlockInfo info,
2 WorkerNetAddress dataSource, BlockInStreamSource dataSourceType, InStreamOptions options) 3 throws IOException { 4 URIStatus status = options.getStatus(); 5 OpenFileOptions readOptions = options.getOptions(); 6 7 boolean promote = readOptions.getReadType().isPromote(); 8 9 long blockId = info.getBlockId(); 10 long blockSize = info.getLength(); 11 12 // Construct the partial read request 13 Protocol.ReadRequest.Builder builder = 14 Protocol.ReadRequest.newBuilder().setBlockId(blockId).setPromote(promote); 15 // Add UFS fallback options 16 builder.setOpenUfsBlockOptions(options.getOpenUfsBlockOptions(blockId)); 17 18 boolean shortCircuit = Configuration.getBoolean(PropertyKey.USER_SHORT_CIRCUIT_ENABLED); 19 boolean sourceSupportsDomainSocket = NettyUtils.isDomainSocketSupported(dataSource); 20 boolean sourceIsLocal = dataSourceType == BlockInStreamSource.LOCAL; 21 22 // Short circuit 23 if (sourceIsLocal && shortCircuit && !sourceSupportsDomainSocket) { 24 LOG.debug("Creating short circuit input stream for block {} @ {}", blockId, dataSource); 25 try { 26 return createLocalBlockInStream(context, dataSource, blockId, blockSize, options); 27 } catch (NotFoundException e) { 28 // Failed to do short circuit read because the block is not available in Alluxio. 29 // We will try to read via netty. So this exception is ignored. 30 LOG.warn("Failed to create short circuit input stream for block {} @ {}. Falling back to " 31 + "network transfer", blockId, dataSource); 32 } 33 } 34 35 // Netty 36 LOG.debug("Creating netty input stream for block {} @ {} from client {} reading through {}", 37 blockId, dataSource, NetworkAddressUtils.getClientHostName(), dataSource); 38 return createNettyBlockInStream(context, dataSource, dataSourceType, builder.buildPartial(), 39 blockSize, options); 40 }
輸出流代碼中,關於兩種實現的選擇邏輯:ui
1 /**
2 * @param context the file system context
3 * @param blockId the block ID
4 * @param blockSize the block size in bytes
5 * @param address the Alluxio worker address
6 * @param options the out stream options
7 * @return the {@link PacketWriter} instance
8 */
9 public static PacketWriter create(FileSystemContext context, long blockId, long blockSize,
10 WorkerNetAddress address, OutStreamOptions options) throws IOException { 11 if (CommonUtils.isLocalHost(address) && Configuration 12 .getBoolean(PropertyKey.USER_SHORT_CIRCUIT_ENABLED) && !NettyUtils 13 .isDomainSocketSupported(address)) { 14 LOG.debug("Creating short circuit output stream for block {} @ {}", blockId, address); 15 return LocalFilePacketWriter.create(context, address, blockId, options); 16 } else { 17 LOG.debug("Creating netty output stream for block {} @ {} from client {}", blockId, address, 18 NetworkAddressUtils.getClientHostName()); 19 return NettyPacketWriter 20 .create(context, address, blockId, blockSize, Protocol.RequestType.ALLUXIO_BLOCK, 21 options); 22 } 23 }
針對block的的建立/摧毀,經過netty進行網絡通信this
針對block的實際內容,直接經過ramdisk寫到本地,避免了使用netty進行網絡通信,spa
下圖是Netty版本的客戶端+LocalFileBlockWriter+LocalFileBlockReader調用過程debug
下圖是Netty版本的服務端調用3d
下圖是Netty版本的客戶端調用過程netty
下圖是Netty版本的服務端調用code
下一篇將介紹BlockWorker相關的功能