Spark學習之編程進階總結(二)

5、基於分區進行操做

  基於分區對數據進行操做可讓咱們避免爲每一個數據元素進行重複的配置工做。諸如打開數據庫鏈接或建立隨機數生成器等操做,都是咱們應當儘可能避免爲每一個元素都配置一次的工做。Spark 提供基於分區的 map 和 foreach ,讓你的部分代碼只對 RDD 的每一個分區運行一次,這樣能夠幫助下降這些操做的代價。算法

  當基於分區操做 RDD 時,Spark 會爲函數提供該分區中的元素的迭代器。返回值方面,也返回一個迭代器。除 mapPartitions() 外,Spark 還有一些別的基於分區的操做符,列在了表中。數據庫

  

一、mapPartitions

  與map相似,不一樣點是map是對RDD的裏的每個元素進行操做,而mapPartitions是對每個分區的數據(迭代器)進行操做,具體能夠看上面的表格。下面同時用map和mapPartitions實現WordCount,看一下mapPartitions的用法以及與map的區別。apache

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf


object Test {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("test").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")  // 設置日誌顯示級別

    val input = sc.parallelize(Seq("Spark Hive hadoop", "Hadoop Hbase Hive Hbase", "Java Scala Spark"))
    val words = input.flatMap(line => line.split(" "))
    val counts = words.map(word => (word, 1)).reduceByKey { (x, y) => x + y }
    println(counts.collect().mkString(","))
    val counts1 = words.mapPartitions(it => it.map(word => (word, 1))).reduceByKey { (x, y) => x + y }
    println(counts1.collect().mkString(","))

  }
}

  

二、mapPartitionsWithIndex

  和mapPartitions同樣,只是多了一個分區的序號,下面的代碼實現了將Rdd的元素數字n變爲(分區序號,n*n)。編程

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf


object Test {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("test").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")  // 設置日誌顯示級別

    val rdd = sc.parallelize(1 to 10, 5) // 5 表明分區數
    val res = rdd.mapPartitionsWithIndex((index, it) => {
      it.map(n => (index, n * n))
    })
    println(res.collect().mkString(" "))

  }
}

  

三、foreachPartitions

  foreachPartitions和foreach相似,不一樣點也是foreachPartitions基於分區進行操做的。函數

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf


object Test {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("test").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")  // 設置日誌顯示級別
    val rdd = sc.parallelize(1 to 10, 5) // 5 表明分區數
    rdd.foreachPartition(it => it.foreach(println))

  }
}

  

6、與外部程序間的管道

  Spark 提供了一種通用機制,能夠將數據經過管道傳給用其餘語言編寫的程序,好比 R 語言腳本。oop

  Spark 在 RDD 上提供 pipe() 方法。Spark 的 pipe() 方法可讓咱們使用任意一種語言實現 Spark 做業中的部分邏輯,只要它能讀寫 Unix 標準流就行。經過 pipe() ,你能夠將 RDD 中的各元素從標準輸入流中以字符串形式讀出,並對這些元素執行任何你須要的操做,而後把結果以字符串的形式寫入標準輸出——這個過程就是 RDD 的轉化操做過程。這種接口和編程模型有較大的侷限性,可是有時候這偏偏是你想要的,好比在 map 或filter 操做中使用某些語言原生的函數。 測試

  有時候,因爲你已經寫好並測試好了一些很複雜的軟件,因此會但願把 RDD 中的內容經過管道交給這些外部程序或者腳原本進行處理並重用。不少數據科學家都用 R寫好的代碼 ,能夠經過pipe() 與 R 程序進行交互。大數據

7、數值RDD的操做

  Spark 的數值操做是經過流式算法實現的,容許以每次一個元素的方式構建出模型。這些統計數據都會在調用 stats() 時經過一次遍歷數據計算出來,並以 StatsCounter 對象返回。表列出了 StatsCounter 上的可用方法。spa

  

  若是你只想計算這些統計數據中的一個,也能夠直接對 RDD 調用對應的方法,好比 rdd.mean() 或者 rdd.sum() 。scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf


object Test {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("test").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")  // 設置日誌顯示級別
    val rdd = sc.parallelize(List(1,2,3,4))
    val res = rdd.stats
    println(res.count)  // 4  統計元素個數
    println(res.mean)   // 2.5 平均值
    println(res.sum)   // 10 總和
    println(res.max)   //  4 最大值
    println(res.min)   //  1 最小值
    println(res.variance)  // 1.25 方差
    println(res.sampleVariance) //1.667 採樣方差
    println(res.stdev)  // 1.11803 標準差
    println(res.sampleStdev)  //1.29099 採樣標準差
  }
}

  

  這篇博文主要來自《Spark快速大數據分析》這本書裏面的第六章,內容有刪減,還有關於本書的一些代碼的實驗結果。

相關文章
相關標籤/搜索