接着上一篇,本篇,咱們分析一下實現磁盤存儲的功能類DiskStore,這個類相對簡單。在正式展開以前,我以爲有必要大概分析一下BlockManager的背景,或者說它的運行環境,運行的做用範圍。Blockmanager這個類其實在運行時的每一個節點都會有一個實例(包括driver和executor進程),由於不管是driver端進行廣播變量的建立,仍是executor端shuffle過程當中寫shuffle塊,或者是任務運行時結果太大須要經過BlockManager傳輸,或者是RDD的緩存,其實在每一個運行節點上都會經過Blockmanager來管理程序內部對於本地的內存和磁盤的讀寫,因此綜上,我想表達的核心意思就是每一個進程(driver和executor)都有一Blockmanager實例,而這些Blockmanager實例是經過BlockManagerId類來進行惟一區分的,BlockManagerId其實是對進程物理位置的封裝。java
首先咱們來看一個最經常使用的寫入方法緩存
def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = { // 經過DiskBlockManager對象檢查這個blockId對應的文件名的文件是否存在 if (contains(blockId)) { throw new IllegalStateException(s"Block $blockId is already present in the disk store") } logDebug(s"Attempting to put block $blockId") val startTime = System.currentTimeMillis // 經過DiskBlockManager獲取一個文件用於寫入數據 val file = diskManager.getFile(blockId) // 用CountingWritableChannel包裝一下,以便於記錄寫入的字節數 val out = new CountingWritableChannel(openForWrite(file)) var threwException: Boolean = true try { writeFunc(out) // 關鍵步驟,記錄到內部的map結構中 blockSizes.put(blockId, out.getCount) threwException = false } finally { try { out.close() } catch { case ioe: IOException => if (!threwException) { threwException = true throw ioe } } finally { if (threwException) { remove(blockId) } } } val finishTime = System.currentTimeMillis logDebug("Block %s stored as %s file on disk in %d ms".format( file.getName, Utils.bytesToString(file.length()), finishTime - startTime)) }
這個方法很簡單,沒什麼好說的,可是調用了一個比較重要的類DiskBlockManager,這個類的功能就是對磁盤上的目錄和文件進行管理,會在磁盤上按照必定規則建立一些目錄和子目錄,在分配文件名時也會盡可能均勻第分配在這些目錄和子目錄下。app
這個方法就不說了,簡單處理一下直接調用put方法。ide
def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = { put(blockId) { channel => bytes.writeFully(channel) } }
咱們來看一下這個方法,首先經過DiskBlockManager獲取對應的文件名,而後將其包裝成一個BlockData對象,分爲加密和不加密兩種。加密
def getBytes(blockId: BlockId): BlockData = { val file = diskManager.getFile(blockId.name) val blockSize = getSize(blockId) securityManager.getIOEncryptionKey() match { case Some(key) => // Encrypted blocks cannot be memory mapped; return a special object that does decryption // and provides InputStream / FileRegion implementations for reading the data. new EncryptedBlockData(file, blockSize, conf, key) case _ => // 看一下DiskBlockData new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, file, blockSize) } }
這個類做爲磁盤文件的包裝類,主要功能是提供了幾個方便的接口,將磁盤文件中的數據讀取出來並生成緩衝對象。
這個類中有兩個重要的方法toChunkedByteBuffer和toByteBuffer,toByteBuffer就不說了,調用ReadableByteChannel.read(ByteBuffer dst)方法讀取文件數據,咱們看一下toChunkedByteBufferspa
這個方法也很簡單,在數據量比較大的時候,因爲每次申請的內存塊大小有限制maxMemoryMapBytes,因此須要切分紅多個塊code
override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): ChunkedByteBuffer = { // Utils.tryWithResource調用保證在使用完資源後關閉資源 // 基本等同於java中的try{}finally{} Utils.tryWithResource(open()) { channel => var remaining = blockSize val chunks = new ListBuffer[ByteBuffer]() while (remaining > 0) { // 這裏取剩餘大小和maxMemoryMapBytes的較小值, // 也就是說每次申請的內存塊大小不超過maxMemoryMapBytes val chunkSize = math.min(remaining, maxMemoryMapBytes) val chunk = allocator(chunkSize.toInt) remaining -= chunkSize JavaUtils.readFully(channel, chunk) chunk.flip() chunks += chunk } new ChunkedByteBuffer(chunks.toArray) } }
這個類以前也分析過,主要是用來管理spark運行過程當中寫入的一些臨時文件,以及目錄的管理。orm
首先會根據參數配置建立本地目錄(能夠是逗號分隔的多個目錄),參數的優先順序是:若是是運行在yarn上,則會使用yarn參數LOCAL_DIRS配置的本地目錄;不然獲取環境變量SPARK_LOCAL_DIRS的值;不然獲取spark.local.dir參數的值;最後若是都沒有配置,那麼就用java系統參數java.io.tmpdir的值做爲臨時目錄。對象
其次,關於文件在目錄之間分配的問題,使用文件名的hash值對目錄數量取餘的方法來儘可能將文件均勻地分配到不一樣的目錄下。接口
另一點要說的是文件名的命名規則,是根據不一樣做用的Block來區別命名的,例如RDD緩存寫入的block的id就是RDDBlockId,它的文件名拼接規則是"rdd_" + rddId + "_" + splitIndex