緩存,英文單詞譯爲Cache,緩存能夠幫助咱們幹不少事,固然最直接的體會就是能夠減小沒必要要的數據請求和操做.一樣在HDFS中,也存在着一套完整的緩存機制,但可能使用瞭解此機制的人並很少,由於這個配置項平時你們比較少用並且HDFS中默認是關閉此功能的.至因而哪一個配置項呢,在後面的描述中將會給出詳細的分析.php
HDFS緩存疑問點爲何在這裏會拋出這樣一個問題呢,由於本人在瞭解完HDFS的Cache總體機理以後,確實感受到其中的邏輯有點繞,直接分析不見得會起到很好的效果,因此先採起提問的形式來作一個引導會是一個不錯的選擇.列舉以下幾個問題,在這裏緩存的對象block數據塊,須要緩存的目標block叫作CacheBlock,block塊從緩存狀態須要轉變爲非緩存狀態的block塊稱之爲UnCacheBlock.java
如今依次從上到下依次揭開謎底,最後你必定會有種恍然大悟的感受.node
HDFS物理層面緩存Block物理層面緩存Block,這個名詞聽上去意思怪怪的.在HDFS源碼中的解釋以下:緩存
Manages caching for an FsDatasetImpl by using the mmap(2) and mlock(2) system calls to lock blocks into memory.
大意爲利用mmap,mlock系統調用將block鎖入內存.沒接觸過底層操做系統知識的人可能不是很清楚mmap,mlock調用是怎麼一回事,在這裏就簡單介紹一下.這裏以mmap爲例,他實際上是一個內存映射調用,百度百科中關於mmap的解釋以下:app
mmap將一個文件或者其它對象映射進內存。文件被映射到多個頁上,若是文件的大小不是全部頁的大小之和,最後一個頁不被使用的空間將會清零。mmap在用戶空間映射調用系統中做用很大。
咱們主要關注前半部分的解釋,將文件或其餘對象映射進行內存,這句話直接在代碼中獲得了體現.異步
/** * Load the block. * * mmap and mlock the block, and then verify its checksum. * * @param length The current length of the block. * @param blockIn The block input stream. Should be positioned at the * start. The caller must close this. * @param metaIn The meta file input stream. Should be positioned at * the start. The caller must close this. * @param blockFileName The block file name, for logging purposes. * * @return The Mappable block. */ public static MappableBlock load(long length, FileInputStream blockIn, FileInputStream metaIn, String blockFileName) throws IOException { MappableBlock mappableBlock = null; MappedByteBuffer mmap = null; FileChannel blockChannel = null; try { blockChannel = blockIn.getChannel(); if (blockChannel == null) { throw new IOException("Block InputStream has no FileChannel."); } mmap = blockChannel.map(MapMode.READ_ONLY, 0, length); NativeIO.POSIX.getCacheManipulator().mlock(blockFileName, mmap, length); verifyChecksum(length, metaIn, blockChannel, blockFileName); mappableBlock = new MappableBlock(mmap, length); } finally { IOUtils.closeQuietly(blockChannel); if (mappableBlock == null) { if (mmap != null) { NativeIO.POSIX.munmap(mmap); // unmapping also unlocks } } } return mappableBlock; }
在上面的代碼中,blockChannel對象(本質對象是FileChannel)就被映射到內存上了.OK,固然這是最底層執行的操做了,在HDFS中的上層調用是如何的呢,這纔是咱們所要真正關心的.ide
緩存塊的生命週期狀態緩存塊的生命週期不只僅只有Cached(已緩存)和(UnCached)(已解除緩存)2種.在FSDatasetCache類中,有了明確的定義:函數
private enum State { /** * The MappableBlock is in the process of being cached. */ CACHING, /** * The MappableBlock was in the process of being cached, but it was * cancelled. Only the FsDatasetCache#WorkerTask can remove cancelled * MappableBlock objects. */ CACHING_CANCELLED, /** * The MappableBlock is in the cache. */ CACHED, /** * The MappableBlock is in the process of uncaching. */ UNCACHING; /** * Whether we should advertise this block as cached to the NameNode and * clients. */ public boolean shouldAdvertise() { return (this == CACHED); } }
這個狀態信息被保存在了實際存儲的Value中性能
/** * MappableBlocks that we know about. */ private static final class Value { final State state; final MappableBlock mappableBlock; Value(MappableBlock mappableBlock, State state) { this.mappableBlock = mappableBlock; this.state = state; } }
實際存儲的Value與Block塊就構成了Cache中很是常見的key-value的結構ui
/** * Stores MappableBlock objects and the states they're in. */ private final HashMap<ExtendedBlockId, Value> mappableBlockMap = new HashMap<ExtendedBlockId, Value>();
可能有人會疑問爲何這裏不直接用64位的blockId直接當key,而是用ExtendedBlockId,答案是由於要考慮到Block Pool的存在,如今的HDFS是支持多namespace命名空間的.ExtendedBlockId, Value鍵值對的存儲與清除發生在cacheBlock和unCacheBlock方法.
/** * Attempt to begin caching a block. */ synchronized void cacheBlock(long blockId, String bpid, String blockFileName, long length, long genstamp, Executor volumeExecutor) { ... mappableBlockMap.put(key, new Value(null, State.CACHING)); volumeExecutor.execute( new CachingTask(key, blockFileName, length, genstamp)); LOG.debug("Initiating caching for Block with id {}, pool {}", blockId, bpid); }
而後放入CachingTask中異步執行.同理unCacheBlock方法也放在了異步線程中執行.
synchronized void uncacheBlock(String bpid, long blockId) { ExtendedBlockId key = new ExtendedBlockId(blockId, bpid); Value prevValue = mappableBlockMap.get(key); boolean deferred = false; ... switch (prevValue.state) { case CACHING: LOG.debug("Cancelling caching for block with id {}, pool {}.", blockId, bpid); mappableBlockMap.put(key, new Value(prevValue.mappableBlock, State.CACHING_CANCELLED)); break; case CACHED: mappableBlockMap.put(key, new Value(prevValue.mappableBlock, State.UNCACHING)); if (deferred) { LOG.debug("{} is anchored, and can't be uncached now. Scheduling it " + "for uncaching in {} ", key, DurationFormatUtils.formatDurationHMS(revocationPollingMs)); deferredUncachingExecutor.schedule( new UncachingTask(key, revocationMs), revocationPollingMs, TimeUnit.MILLISECONDS); } else { LOG.debug("{} has been scheduled for immediate uncaching.", key); uncachingExecutor.execute(new UncachingTask(key, 0)); } break; default: LOG.debug("Block with id {}, pool {} does not need to be uncached, " + "because it is in state {}.", blockId, bpid, prevValue.state); numBlocksFailedToUncache.incrementAndGet(); break; } }CacheBlock,UnCacheBlock場景觸發
按照上面的問題順序,第三個問題就是哪一種狀況會觸發緩存塊的行爲呢.一樣咱們要將此情形分爲2類,一個是cacheBlock,另一個就是unCacheBlock.
在ide中經過open call的方式能夠追蹤出他的上層方法調用,如圖
最下層的方法已經代表了此方法最終是來自於NameNode心跳處理的回覆命令.挑出其中一個方法進行查閱
private boolean processCommandFromActive(DatanodeCommand cmd, BPServiceActor actor) throws IOException { final BlockCommand bcmd = cmd instanceof BlockCommand? (BlockCommand)cmd: null; final BlockIdCommand blockIdCmd = cmd instanceof BlockIdCommand ? (BlockIdCommand)cmd: null; switch(cmd.getAction()) { ... case DatanodeProtocol.DNA_CACHE: LOG.info("DatanodeCommand action: DNA_CACHE for " + blockIdCmd.getBlockPoolId() + " of [" + blockIdArrayToString(blockIdCmd.getBlockIds()) + "]"); dn.getFSDataset().cache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds()); break; ... default: LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction()); } return true; }
解除緩存block的動做是否也是來自於NameNode的反饋命令呢,答案其實並不只限如此.一樣給出調用關係圖
能夠看到,這裏彙總成3類調用狀況:
其中最後一種跟上小節提到的狀況同樣,是從NameNode中收到的反饋命令所致.前面2類狀況致使unCache動做的理由很好理解.
目標緩存塊的肯定問題本質上歸結爲就是上面NameNode中cacheBlock,unCacheBlock回覆命令中的blockId組的緣來,就是下述代碼中的block pool Id和blockIds.
dn.getFSDataset().cache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
blockIdCmd是NameNode心跳處理的回覆命令,因此必然存在回覆處理的過程,從而構造出了blockIdCmd的回覆指令.這裏直接定位到DatanodeManager#handleHeartbeat命令處理方法中
if (shouldSendCachingCommands && ((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >= timeBetweenResendingCachingDirectivesMs)) { // 構造須要緩存的block命令 DatanodeCommand pendingCacheCommand = getCacheCommand(nodeinfo.getPendingCached(), nodeinfo, DatanodeProtocol.DNA_CACHE, blockPoolId); if (pendingCacheCommand != null) { cmds.add(pendingCacheCommand); sendingCachingCommands = true; } // 構造須要解除緩存的block命令 DatanodeCommand pendingUncacheCommand = getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo, DatanodeProtocol.DNA_UNCACHE, blockPoolId); if (pendingUncacheCommand != null) { cmds.add(pendingUncacheCommand); sendingCachingCommands = true; } if (sendingCachingCommands) { nodeinfo.setLastCachingDirectiveSentTimeMs(nowMs); } }
從這裏能夠看出,cacheBlock和unCacheBlock來源於nodeInfo中的pendingCache和pendingUncache,實質獲取的變量以下:
/** * The blocks which we know are cached on this datanode. * This list is updated by periodic cache reports. */ private final CachedBlocksList cached = new CachedBlocksList(this, CachedBlocksList.Type.CACHED); /** * The blocks which we want to uncache on this DataNode. */ private final CachedBlocksList pendingUncached = new CachedBlocksList(this, CachedBlocksList.Type.PENDING_UNCACHED);
好像離目標愈來愈近了,只要能找到CachedBlockList的直接操做方,就能明白緩存block以及解除緩存block是如何肯定的.經過進一步的上層調用,最後發現真正的操做主類CacheReplicationMonitor,這個類的用途以下:
Scans the namesystem, scheduling blocks to be cached as appropriate.
在這裏,我作些補充解釋,CacheReplicationMonitor自身持有一個系統中的標準緩存塊列表,而後經過自身內部的緩存原則,進行cacheBlock的添加和移除,而後對應更新到以前提到過的pendingCache和pendingUncache列表中,隨後這些pending信息就會被NameNode拿來放入回覆命令中.這裏就會有2個疑惑點:
第一個問題會在下面的小節中提到,這裏集中看第二條.答案在rescanCachedBlockMap的方法中.鑑於此方法代碼處理邏輯比較複雜,咱們直接看方法註釋中的解釋:
Scan through the cached block map. Any blocks which are under-replicated should be assigned new Datanodes. Blocks that are over-replicated should be removed from Datanodes.
這裏給出了2個基本原則:
其實仔細一想,這個策略仍是很巧妙的,儘可能多緩存一些副本數不夠的副本(緩存至關於充當了1塊副本),移除掉副本數過多的多餘緩存.
系統持有的CacheBlock列表如何更新在上節中提到過CacheReplicationMonitor對象持有的系統CacheBlock列表如何被更新的問題,這個列表是用來發送pendingCache,pengdUncache信息的基礎.由於是系統所全局持有的,會存在反饋上報的過程.一樣存在於心跳處理代碼的附近.
... List<DatanodeCommand> cmds = blockReport(); processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()])); // 緩存塊的上報 DatanodeCommand cmd = cacheReport(); processCommand(new DatanodeCommand[]{ cmd }); ...
繼續調用到下面這行操做
// 獲取到datanode上的緩存塊的blockId List<Long> blockIds = dn.getFSDataset().getCacheReport(bpid);
最後又從新調用到了FSDatasetCache中的getCacheBlocks方法.到這裏你應該能夠發現,這裏造成了一個閉環操做.最後的緩存操做執行者一樣也是緩存塊狀況的反饋者.CacheReplicationMonitor屬於CacheManager對象的內部變量,會從中拿到cacheBlock塊的最新信息.
CacheBlock的使用這時候從新回頭看CacheBlock緩存塊的使用問題就顯得很簡單了,cacheBlock在shortCirCuit讀操做中的請求文件描述符的時候用到.
public void requestShortCircuitFds(final ExtendedBlock blk, final Token<BlockTokenIdentifier> token, SlotId slotId, int maxVersion, boolean supportsReceiptVerification) throws IOException { ... if (slotId != null) { boolean isCached = datanode.data. isCached(blk.getBlockPoolId(), blk.getBlockId()); datanode.shortCircuitRegistry.registerSlot( ExtendedBlockId.fromExtendedBlock(blk), slotId, isCached); registeredSlotId = slotId; } fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion); Preconditions.checkState(fis != null); bld.setStatus(SUCCESS); bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION); ...
最後,給出整個調用流程,你能夠明顯看到中間的一個閉環
說了這麼多關於HDFS緩存的原理和內容後,必定要補充介紹管控此功能的配置項,以下,附上解釋:
<property> <name>dfs.datanode.max.locked.memory</name> <value>0</value> <description> The amount of memory in bytes to use for caching of block replicas in memory on the datanode. The datanode's maximum locked memory soft ulimit (RLIMIT_MEMLOCK) must be set to at least this value, else the datanode will abort on startup. By default, this parameter is set to 0, which disables in-memory caching. If the native libraries are not available to the DataNode, this configuration has no effect. </description> </property>
看完這個配置,估計你的第一反應也是這個配置項的名稱與實際所使用的功能狀況不太一致,有點歧義的感受,可能叫dfs.datanode.max.cache.memory比較好理解一些.這個配置項控制的是下面這個變量:
首先是在FSDatasetCache構造函數中拿到此屬性值
this.maxBytes = dataset.datanode.getDnConf().getMaxLockedMemory();
而後在usedBytes對象的使用上作限制
long reserve(long count) { count = rounder.round(count); while (true) { long cur = usedBytes.get(); long next = cur + count; if (next > maxBytes) { return -1; } if (usedBytes.compareAndSet(cur, next)) { return next; } } }
這個配置默認關閉的,你們能夠經過改變此配置的值來開啓此功能,對於HDFS讀性能將會帶來不小的提高.
參考資料