ShuffleExternalSorter 外部排序器在Shuffle過程當中的設計思路剖析-Spark商業環境實戰

本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。算法

Spark商業環境實戰及調優進階系列

1 ShuffleExternalSorter 外部排序器

1.1 ShuffleExternalSorter 外部排序器江湖地位

ShuffleExternalSorter和 ExternalSorter 外部排序器功能相似,可是也有不一樣的地方。不過在詳細剖析ShuffleExternalSorter以前,咱們先看看ShuffleExternalSorter在下圖中所處的位置。能夠看到最終的調用方是unsafeShuffleWriter。在下一節,我會詳細剖析UnsafeShuffleWriter。數組

1.2 ShuffleExternalSorter 外部排序器不同凡響的特點

相同點:

  • ShuffleExternalSorter與ExternalSorter都是將記錄插入到內存中。

不一樣點:

  • ExternalSorter除了將數據存入內存中,還會進行聚合操做,ShuffleExternalSorter沒有聚合功能。
  • ShuffleExternalSorter使用的是Tungsten緩存(所以多是JVM堆緩存,也多是操做系統的內存)
  • 溢出前排序操做:ExternalSorter是按照分區ID和key進行排序實現,ShuffleExternalSorter除了按照分區ID的排序外,也有基於基數排序(Radix Sort)的實現。
  • ShuffleExternalSorter沒有了applendOnlyMapz這種數據結構。

1.2 ShuffleExternalSorter 主要成員

  • ShuffleInMemorySorter :用於在內存中對插入的記錄進行排序,算法仍是TimSort。緩存

  • spills :溢出文件的元數據信息列表。數據結構

  • numElementsForSpillThreshold :磁盤溢出的元素數量。能夠經過spark.shuffle.spill.numElementsForceSpillThreshold屬性來進行配置,默認是1M架構

  • taskMemoryManager:app

  • allocatedPages:已經分配的Page列表(即MemoryBlock)列表框架

    * Memory pages that hold the records being sorted. The pages in this list are freed when
     * spilling, although in principle we could recycle these pages across spills (on the other hand,
     * this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager
     * itself)。
    複製代碼

1.3 ShuffleExternalSorter insertRecord 代碼欣賞

  • 數據溢出,經過inMemSorter.numRecords() >= numElementsForSpillThreshold來判斷,若知足直接溢出操做。oop

  • growPointerArrayIfNecessary:進行空間檢查和數據容量擴容。post

  • acquireNewPageIfNecessary:進行空間檢查,若不知足申請新page。學習

  • Platform.copyMemory:將數據拷貝到Page所表明的的內存塊中。

  • inMemSorter.insertRecord:將記錄的元數據存到內部的長整型數組中,便於排序。其中高24位是存儲分區ID,中間13位爲存儲頁號,低27位存儲偏移量。

    Write a record to the shuffle sorter.
      
      public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId)
          throws IOException {
          // for tests
          assert(inMemSorter != null);
          
          if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {    <= 神來之筆
          
            logger.info("Spilling data because number of spilledRecords crossed the threshold " +
              numElementsForSpillThreshold);
            spill();
          }
      
          growPointerArrayIfNecessary();                                  <= 神來之筆
          // Need 4 bytes to store the record length.
          final int required = length + 4;
          
          acquireNewPageIfNecessary(required);
      
          assert(currentPage != null);
          final Object base = currentPage.getBaseObject();
          final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
          Platform.putInt(base, pageCursor, length);
          pageCursor += 4;
          Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);  <= 神來之筆
          pageCursor += length;
          inMemSorter.insertRecord(recordAddress, partitionId);      <= 神來之筆,排序後寫入內存。
        }
    複製代碼

1.3 ShuffleExternalSorter spill 代碼欣賞

  • writeSortedFile:做用在於將內存中的記錄排序後輸出到磁盤中,排序規則有兩種: 一種:對分區ID進行排序。二種是採用基數排序(Radix Sort)

    public long spill(long size, MemoryConsumer trigger) throws IOException {
          if (trigger != this || inMemSorter == null || inMemSorter.numRecords() == 0) {
            return 0L;
          }
          logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
            Thread.currentThread().getId(),
            Utils.bytesToString(getMemoryUsage()),
            spills.size(),
            spills.size() > 1 ? " times" : " time");
      
          writeSortedFile(false);                              <= 神來之筆
          final long spillSize = freeMemory();
          inMemSorter.reset();
          
          // Reset the in-memory sorter's pointer array only after freeing up the memory pages
          holding the records. Otherwise, if the task is over allocated memory, then without freeing the memory pages, we might not be able to get memory for the pointer array.
          
          taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
          return spillSize;
        }
    複製代碼

2 最後

本篇須要挖掘的點還有不少,鑑於可參考的資料太少,只能暫時到此結束,後續會繼續完善

秦凱新 於深圳

相關文章
相關標籤/搜索