你真知道如何高效用mapPartitions嗎?


1. mappartition簡介typescript


首先,說到mapPartitions你們確定想到的是map和MapPartitions的對比。你們都知道mapPartition算子是使用一個函數針對分區計算的,函數參數是一個迭代器。而map只針對每條數據調用的,因此存在訪問外部數據庫等狀況時mapParititons更加高效。
mapPartitions函數:
 /** * Return a new RDD by applying a function to each partition of this RDD. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */ def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val cleanedF = sc.clean(f) new MapPartitionsRDD( this, (_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter), preservesPartitioning) }
有代碼可知mapPartitions的函數參數是傳入一個迭代器,返回值是另外一個迭代器。
map函數:
 /** * Return a new RDD by applying a function to all elements of this RDD. */ def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF)) }
map函數就是將rdd的元素由T類型轉化爲U類型。
綜上可知,map和foreach這類的是針對一個元素調用一次咱們的函數,也便是咱們的函數參數是單個元素,假如函數內部存在數據庫連接、文件等的建立及關閉,那麼會致使處理每一個元素時建立一次連接或者句柄,致使性能底下,不少初學者犯過這種毛病。
而foreachpartition/mapPartitions是針對每一個分區調用一次咱們的函數,也便是咱們函數傳入的參數是整個分區數據的迭代器,這樣避免了建立過多的臨時連接等,提高了性能。
下面的例子都是1-20這20個數字,通過map完成a*3的轉換:
val a = sc.parallelize(1 to 20, 2)
def mapTerFunc(a : Int) : Int = {a*3}
val mapResult = a.map(mapTerFunc)
println(mapResult.collect().mkString(","))
結果
  
    
  
  
   
   
            
   
   
    
      
    
    
     
     
              
     
     

3. mappartitions低效用法數據庫


3,6,9,12,15,18,21,24,27,30,33,36,39,42,45,48,51,54,57,60
你們一般的作法都是申請一個迭代器buffer,將處理後的數據加入迭代器buffer,而後返回迭代器。以下面的demo。
val a = sc.parallelize(1 to 20, 2) def terFunc(iter: Iterator[Int]) : Iterator[Int] = {  var res = List[Int]()  while (iter.hasNext) {  val cur = iter.next; res.::= (cur*3) ; } res.iterator}
val result = a.mapPartitions(terFunc)println(result.collect().mkString(","))
結果亂序了,由於個人list是無序的,能夠使用LinkList:
30,27,24,21,18,15,12,9,6,3,60,57,54,51,48,45,42,39,36,33

4. mappartitions高效用法數組


注意,3中的例子,會在mappartition執行期間,在內存中定義一個數組而且將緩存全部的數據。假如數據集比較大,內存不足,會致使內存溢出,任務失敗。對於這樣的案例,Spark的RDD不支持像mapreduce那些有上下文的寫方法。其實,浪尖有個方法是無需緩存數據的,那就是自定義一個迭代器類。以下例:
  
    
  
  
   
   
            
   
   
    
      
    
    
     
     
              
     
     
class CustomIterator(iter: Iterator[Int]) extends Iterator[Int] {  def hasNext : Boolean = {   iter.hasNext }      def next : Int= {        val cur = iter.next cur*3  }}  
val result = a.mapPartitions(v => new CustomIterator(v))println(result.collect().mkString(","))
結果:
   
     
   
   
    
    
             
    
    
3,6,9,12,15,18,21,24,27,30,33,36,39,42,45,48,51,54,57,60
其實,主要問題就是返回值類型和參數類型要一致,那麼不一致咋辦呢?
歡迎留言~
推薦閱讀:
27.scala的類型推斷
嚐嚐鮮|Spark 3.1自適應執行計劃
從 PageRank Example 談 Spark 應用程序調優

本文分享自微信公衆號 - 浪尖聊大數據(bigdatatip)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。緩存

相關文章
相關標籤/搜索