重要|Spark driver端獲得executor返回值的方法

重要|Spark driver端獲得executor返回值的方法

浪院長 浪尖聊大數據 mysql

重要|Spark driver端獲得executor返回值的方法
有人說spark的代碼不優雅,這個浪尖就忍不了了。實際上,說spark代碼不優雅的主要是對scala不熟悉,spark代碼我以爲仍是很讚的,最值得閱讀的大數據框架之一。
今天這篇文章不是爲了爭辯Spark 代碼優雅與否,主要是講一下理解了spark源碼以後咱們能使用的一些小技巧吧。
spark 使用的時候,總有些需求比較另類吧,好比有球友問過這樣一個需求:sql

浪尖,我想要在driver端獲取executor執行task返回的結果,好比task是個規則引擎,我想知道每條規則命中了幾條數據,請問這個怎麼作呢?

這個是否是很騷氣,也很常見,按理說你輸出以後,在mysql裏跑條sql就好了,可是這個每每顯的比較麻煩。並且有時候,在 driver可能還要用到這些數據呢?具體該怎麼作呢?數據庫

大部分的想法估計是collect方法,那麼用collect如何實現呢?你們本身能夠考慮一下,我只能告訴你不簡單,不如輸出到數據庫裏,而後driver端寫sql分析一下。apache

還有一種考慮就是使用自定義累加器。這樣就能夠在executor端將結果累加而後在driver端使用,不過具體實現也是很麻煩。你們也能夠本身琢磨一下下~數組

那麼,浪尖就給你們介紹一個比較經常使用也比較騷的操做吧。框架

其實,這種操做咱們最早想到的應該是count函數,由於他就是將task的返回值返回到driver端,而後進行聚合的。咱們能夠從idea count函數點擊進去,能夠看到...elasticsearch

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

也便是sparkcontext的runJob方法。
Utils.getIteratorSize _這個方法主要是計算每一個iterator的元素個數,也便是每一個分區的元素個數,返回值就是元素個數:ide

/**
   * Counts the number of elements of an iterator using a while loop rather than calling
   * [[scala.collection.Iterator#size]] because it uses a for loop, which is slightly slower
   * in the current version of Scala.
   */
  def getIteratorSize[T](iterator: Iterator[T]): Long = {
    var count = 0L
    while (iterator.hasNext) {
      count += 1L
      iterator.next()
    }
    count
  }

而後就是runJob返回的是一個數組,每一個數組的元素就是咱們task執行函數的返回值,而後調用sum就獲得咱們的統計值了。函數

那麼咱們徹底能夠藉助這個思路實現咱們開頭的目標。浪尖在這裏直接上案例了:oop

import org.apache.spark.{SparkConf, SparkContext, TaskContext}
import org.elasticsearch.hadoop.cfg.ConfigurationOptions

object es2sparkRunJob {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName)

    conf.set(ConfigurationOptions.ES_NODES, "127.0.0.1")
    conf.set(ConfigurationOptions.ES_PORT, "9200")
    conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")
    conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true")
    conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false")
    conf.set("es.write.rest.error.handlers", "ignoreConflict")
    conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")

    val sc = new SparkContext(conf)
    import org.elasticsearch.spark._

    val rdd = sc.esJsonRDD("posts").repartition(10)

    rdd.count()
    val func = (itr : Iterator[(String,String)]) => {
      var count = 0
      itr.foreach(each=>{
        count += 1
      })
      (TaskContext.getPartitionId(),count)
    }

    val res = sc.runJob(rdd,func)

    res.foreach(println)

    sc.stop()
  }
}

例子中driver端獲取的就是每一個task處理的數據量。效率高,並且操做靈活高效~是否是很騷氣~~

相關文章
相關標籤/搜索