關於累加器,前面我也寫了一篇博客,順便粘貼這兒,對比學習,Spark學習之編程進階總結(一)。Spark 2.0系列引入了一個更加簡單和更高性能的累加器API,如在1.X版本中能夠這樣使用累加器:html
val sparkSession = SparkSession.builder().master("local").appName("wordcount").getOrCreate() val sc = sparkSession.sparkContext sc.setLogLevel("WARN") // Spark1.x版本 定義累加器,這裏直接使用SparkContext內置的累加器 val accum = sc.accumulator(0, "My Accumulator") sc.parallelize(Array(1,2,3,4)).foreach(x=>accum+=x) // 獲取累加器的值,Executor上面只能對累加器進行累加操做,只要Driver才能讀取累加器的值 // Driver讀取值的時候會把各個Executor上存儲的本地累加器的值加起來,這裏的結果是10 println(accum) // 輸出10
在Spark 2.X版本里使用SparkContext裏內置的累加器:sql
// 與Spark1.x不一樣的是,須要指定累加器的類型,目前SparkContext有Long類型和Double類型 // 能夠直接使用,不須要指定初始值 val accum1 = sc.longAccumulator("My Accumulator") sc.parallelize(Array(1,2,3,4)).foreach(x=>accum1.add(x)) println(accum1.value) // 輸出10
只使用SparkContext裏內置的累加器功能確定不能知足略微複雜的業務類型,此時咱們就能夠自定義累加器。這裏給出Spark2.X自定義累加器的參考代碼apache
在Scala中有一種方式來自定義累加器,用戶只須要繼承Accumulable,就能夠把元素和返回值定義爲不一樣的類型,這樣咱們就能夠完成添加操做(如往Int類型的List裏添加整數,此時元素爲Int類型,而返回類型爲List)。編程
在Spark 2.X中加入一個新的抽象類—AccumulatorV2,繼承這個類要實現如下幾種方法:
add方法:指定元素相加操做。
copy方法:指定對自定義累加器的複製操做。
isZero方法:返回該累加器的值是否爲「零」。
merge方法:合併兩個相同類型的累加器。
reset方法:重置累加器。
value方法:返回累加器當前的值。
重寫這幾種方法以後,只需實例化自定義累加器,並連同累加器名字一塊兒傳給sparkContext.register方法。
下面簡單實現一個把字符串合併爲數組的累加器:數組
// 首先要繼承AccumulatorV2,並指定輸入爲String類型,輸出爲ArrayBuffer[String] class MyAccumulator extends AccumulatorV2[String,ArrayBuffer[String]]{ // 設置累加器的結果,類型爲ArrayBuffer[String] private var result = ArrayBuffer[String]() // 判斷累加器當前值是否爲零值,這裏咱們指定若是result的size爲0,則累加器當前值是零值 override def isZero:Boolean = this.result.size == 0 // copy方法設置爲新建本累加器,並把result賦給新的累加器 override def copy():AccumulatorV2[String,ArrayBuffer[String]] = { val newAccum = new MyAccumulator newAccum.result = this.result newAccum } // reset 方法設置爲把result設置爲新的ArrayBuffer override def reset():Unit = this.result = new ArrayBuffer[String]() // add 方法把傳進來的字符串添加到result中 override def add(v:String):Unit = this.result+=v // merge 方法把兩個累加器的result合併起來 override def merge(other:AccumulatorV2[String,ArrayBuffer[String]]):Unit={ result.++=:(other.value) } // value 方法返回result override def value:ArrayBuffer[String] = this.result }
// 接着在main方法裏使用累加器 val Myaccum = new MyAccumulator() // 向SparkContext註冊累加器 sc.register(Myaccum) // 把 a b c d添加進累加器的result數組並打印出來 sc.parallelize(Array("a","b","c","d")).foreach(x=>Myaccum.add(x)) println(Myaccum.value) // 輸出ArrayBuffer(a, b, c, d)
關於Streaming我之前也有一篇博客,這裏粘貼出來對比學習。Spark學習之Spark Streaming。這裏寫一個程序來監聽網絡端口發來的內容,而後進行WordCount。網絡
第一步:建立程序入口SparkSession,並引入spark.implicits來容許Scalaobject隱式轉換爲DataFrame。
第二步:建立流。配置從socket讀取流數據,地址和端口爲localhost:9999
第三步:進行單詞統計。這裏lines是DataFrame,使用as[String]給它定義類型轉換爲DataSet。以後在DataSet裏進行單詞統計。
第四步:建立查詢句柄,定義打印結果方式並啓動程序。這裏使用writeStream方法,輸出模式爲所有輸出到控制檯。app
import org.apache.spark.sql.SparkSession object StreamingTest { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local").appName("StructuredNetworkCount").getOrCreate() val sc = spark.sparkContext sc.setLogLevel("WARN") // 上面變量必定也得爲spark import spark.implicits._ val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load() val words = lines.as[String].flatMap(_.split(" ")) val wordcount = words.groupBy("value").count() val query = wordcount.writeStream.outputMode("complete").format("console").start() // 調用awaitTermination方法來防止程序在處理數據時中止 query.awaitTermination() } }
運行上面的程序首先nc軟件要先運行起來,鏈接成功後再輸入下面的字符串。socket
而後控制檯打印的結果爲:ide
Streaming的關鍵思想如圖所示:把數據流視做一張數據不斷增長的表,這樣用戶就能夠基於這張表進行數據處理,就好像使用批處理來處理靜態數據同樣,但實際上Spark底層是把新數據不斷地增量添加到這張無界的表的下一行中。post
Structured Streaming共有3種輸出模式,這3種模式都只適用於某些類型的查詢:
(1)CompleteMode:完整模式。整個更新的結果表將被寫入外部存儲器。由存儲鏈接器決定如何處理整張表的寫入。聚合操做以及聚合以後的排序操做支持這種模式。
(2)AppendMode:附加模式。只有自上次觸發執行後在結果表中附加的新行會被寫入外部存儲器。這僅適用於結果表中的現有行不會更改的查詢,如select、where、map、flatMap、filter、join等操做支持這種模式。
(3)UpdateMode:更新模式(這個模式將在之後的版本中實現)。只有自上次觸發執行後在結果表中更新的行將被寫入外部存儲器(不輸出未更改的行)。
這篇博文主要來自《Spark大數據商業實戰三部曲》這本書裏面的第二章,內容有刪減,還有本書的一些代碼的實驗結果。