Spark存儲體系底層架構剖析-Spark商業環境實戰

本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。算法

Spark商業環境實戰及調優進階系列

1. Spark存儲體系組件關係解釋

BlockInfoManger 主要提供讀寫鎖控制,層級僅僅位於BlockManger之下,一般Spark讀寫操做都先調用BlockManger,而後諮詢BlockInfoManger是否存在鎖競爭,而後纔會調用DiskStore和MemStore,進而調用DiskBlockManger來肯定數據與位置映射,或者調用 MemoryManger來肯定內存池的軟邊界和內存使用申請。緩存

1.1 Driver 與 Executor 與 SparkEnv 與 BlockManger 組件關係:

Driver與 Executor 組件各自擁有任務執行的SparkEnv環境,而每個SparkEnv 中都有一個BlockManger負責存儲服務,做爲高層抽象,BlockManger 之間須要經過 RPCEnv,ShuffleClient,及BlocakTransferService相互通信。架構

1.1 BlockInfoManger 與 BlockInfo 共享鎖和排它鎖讀寫控制關係:

BlockInfo中具備讀寫鎖的標誌,經過標誌能夠判斷是否進行寫控制框架

val NO_WRITER: Long = -1
  val NON_TASK_WRITER: Long = -1024
  
 * The task attempt id of the task which currently holds the write lock for this block, or
 * [[BlockInfo.NON_TASK_WRITER]] if the write lock is held by non-task code, or
 * [[BlockInfo.NO_WRITER]] if this block is not locked for writing.
 
 def writerTask: Long = _writerTask
 def writerTask_=(t: Long): Unit = {
 _writerTask = t
    checkInvariants()
複製代碼

BlockInfoManager具備BlockId與BlockInfo的映射關係以及任務id與BlockId的鎖映射:dom

private[this] val infos = new mutable.HashMap[BlockId, BlockInfo]  
 
 *Tracks the set of blocks that each task has locked for writing.
 private[this] val writeLocksByTask = new mutable.HashMap[TaskAttemptId, mutable.Set[BlockId]]
                                       with mutable.MultiMap[TaskAttemptId, BlockId]
 
 *Tracks the set of blocks that each task has locked for reading, along with the number of times
 *that a block has been locked (since our read locks are re-entrant).
 private[this] val readLocksByTask =
 new mutable.HashMap[TaskAttemptId, ConcurrentHashMultiset[BlockId]]
複製代碼

1.3 DiskBlockManager 與 DiskStore 組件關係:

能夠看到DiskStore內部會調用DiskBlockManager來肯定Block的讀寫位置:函數

  • 如下是DiskStore的抽象寫操做,須要傳入FileOutputStream => Unit高階函數:oop

    def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
      if (contains(blockId)) {
      throw new IllegalStateException(s"Block $blockId is already present in the disk store")
      }
      logDebug(s"Attempting to put block $blockId")
      val startTime = System.currentTimeMillis
      
      val file = diskManager.getFile(blockId)
      
      val fileOutputStream = new FileOutputStream(file)
      var threwException: Boolean = true
      try {
          writeFunc(fileOutputStream)
          threwException = false
      } finally {
       try {
          Closeables.close(fileOutputStream, threwException)
       } finally {
       if (threwException) {
        remove(blockId)
              }
          }
      }
      val finishTime = System.currentTimeMillis
      logDebug("Block %s stored as %s file on disk in %d ms".format(
      file.getName,
      Utils.bytesToString(file.length()),
      finishTime - startTime))
      }
    複製代碼
  • 如下是DiskStore的讀操做,調用DiskBlockManager來獲取數據位置:post

    def getBytes(blockId: BlockId): ChunkedByteBuffer = {
      
      val file = diskManager.getFile(blockId.name)
     
      val channel = new RandomAccessFile(file, "r").getChannel
      Utils.tryWithSafeFinally {
    * For small files, directly read rather than memory map
      if (file.length < minMemoryMapBytes) {
      val buf = ByteBuffer.allocate(file.length.toInt)
      channel.position(0)
      while (buf.remaining() != 0) {
        if (channel.read(buf) == -1) {
          throw new IOException("Reached EOF before filling buffer\n" +
            s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
        }
      }
      buf.flip()
      new ChunkedByteBuffer(buf)
      } else {
      new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))
          }
      } {
      channel.close()
       }
      }
    複製代碼

1.3 MemManager 與 MemStore 與 MemoryPool 組件關係:

在這裏要強調的是:第一代大數據框架hadoop只將內存做爲計算資源,而Spark不只將內存做爲計算資源外,還將內存的一部分歸入存儲體系:學習

  • 內存池模型 :邏輯上分爲堆內存和堆外內存,而後堆內存(或堆外內存)內部又分爲StorageMemoryPool和ExecutionMemoryPool。
  • MemManager是抽象的,定義了內存管理器的接口規範,方便之後擴展,好比:老版的StaticMemoryManager和新版的UnifiedMemoryManager.
  • MemStore 依賴於UnifiedMemoryManager進行內存的申請和軟邊界變化或內存釋放。
  • MemStore 內部同時負責存儲真實的對象,好比內部成員變量:entries ,創建了內存中的BlockId與MemoryEntry(Block的內存的形式)之間的映射。
  • MemStore 內部的「佔座」行爲,如:內部變量offHeapUnrollMemoryMap 和onHeapUnrollMemoryMap。

1.4 BlockManagerMaster 與 BlockManager 組件關係:

  • BlockManagerMaster的做用就是對存在於Dirver或Executor上的BlockManger進行統一管理,這簡直是代理行爲,由於他持有BlockManagerMasterEndpointREf,進而和BlockManagerMasterEndpoint進行通信。

2. Spark存儲體系組件BlockTransferServic傳輸服務

未完待續大數據

3. 總結

存儲體系是Spark的基石,我爭取把每一塊細微的知識點進行剖析,和大部分博客不一樣的是,我會盡可能採用最平實的語言,畢竟技術就是一層窗戶紙。

秦凱新 20181031 凌晨

相關文章
相關標籤/搜索