浪院長 浪尖聊大數據 mysql
有人說spark的代碼不優雅,這個浪尖就忍不了了。實際上,說spark代碼不優雅的主要是對scala不熟悉,spark代碼我以爲仍是很讚的,最值得閱讀的大數據框架之一。
今天這篇文章不是爲了爭辯Spark 代碼優雅與否,主要是講一下理解了spark源碼以後咱們能使用的一些小技巧吧。
spark 使用的時候,總有些需求比較另類吧,好比有球友問過這樣一個需求:sql
浪尖,我想要在driver端獲取executor執行task返回的結果,好比task是個規則引擎,我想知道每條規則命中了幾條數據,請問這個怎麼作呢?
這個是否是很騷氣,也很常見,按理說你輸出以後,在mysql裏跑條sql就好了,可是這個每每顯的比較麻煩。並且有時候,在 driver可能還要用到這些數據呢?具體該怎麼作呢?數據庫
大部分的想法估計是collect方法,那麼用collect如何實現呢?你們本身能夠考慮一下,我只能告訴你不簡單,不如輸出到數據庫裏,而後driver端寫sql分析一下。apache
還有一種考慮就是使用自定義累加器。這樣就能夠在executor端將結果累加而後在driver端使用,不過具體實現也是很麻煩。你們也能夠本身琢磨一下下~數組
那麼,浪尖就給你們介紹一個比較經常使用也比較騷的操做吧。框架
其實,這種操做咱們最早想到的應該是count函數,由於他就是將task的返回值返回到driver端,而後進行聚合的。咱們能夠從idea count函數點擊進去,能夠看到...elasticsearch
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
也便是sparkcontext的runJob方法。
Utils.getIteratorSize _這個方法主要是計算每一個iterator的元素個數,也便是每一個分區的元素個數,返回值就是元素個數:ide
/** * Counts the number of elements of an iterator using a while loop rather than calling * [[scala.collection.Iterator#size]] because it uses a for loop, which is slightly slower * in the current version of Scala. */ def getIteratorSize[T](iterator: Iterator[T]): Long = { var count = 0L while (iterator.hasNext) { count += 1L iterator.next() } count }
而後就是runJob返回的是一個數組,每一個數組的元素就是咱們task執行函數的返回值,而後調用sum就獲得咱們的統計值了。函數
那麼咱們徹底能夠藉助這個思路實現咱們開頭的目標。浪尖在這裏直接上案例了:oop
import org.apache.spark.{SparkConf, SparkContext, TaskContext} import org.elasticsearch.hadoop.cfg.ConfigurationOptions object es2sparkRunJob { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName) conf.set(ConfigurationOptions.ES_NODES, "127.0.0.1") conf.set(ConfigurationOptions.ES_PORT, "9200") conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true") conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true") conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false") conf.set("es.write.rest.error.handlers", "ignoreConflict") conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler") val sc = new SparkContext(conf) import org.elasticsearch.spark._ val rdd = sc.esJsonRDD("posts").repartition(10) rdd.count() val func = (itr : Iterator[(String,String)]) => { var count = 0 itr.foreach(each=>{ count += 1 }) (TaskContext.getPartitionId(),count) } val res = sc.runJob(rdd,func) res.foreach(println) sc.stop() } }
例子中driver端獲取的就是每一個task處理的數據量。效率高,並且操做靈活高效~是否是很騷氣~~