Spark

Spark 2.x管理與開發html

==========Spark Core==========

1、什麼是Spark?(官網:http://spark.apache.org

1、什麼是Spark

 

個人翻譯:Spark是一個針對大規模數據處理的快速通用引擎。java

 

Spark是一種快速、通用、可擴展的大數據分析引擎,2009年誕生於加州大學伯克利分校AMPLab2010年開源,20136月成爲Apache孵化項目,20142月成爲Apache頂級項目。目前,Spark生態系統已經發展成爲一個包含多個子項目的集合,其中包含SparkSQLSpark StreamingGraphXMLlib等子項目,Spark是基於內存計算的大數據並行計算框架。Spark基於內存計算,提升了在大數據環境下數據處理的實時性,同時保證了高容錯性和高可伸縮性,容許用戶將Spark部署在大量廉價硬件之上,造成集羣。Spark獲得了衆多大數據公司的支持,這些公司包括HortonworksIBMIntelClouderaMapRPivotal、百度、阿里、騰訊、京東、攜程、優酷土豆。當前百度的Spark已應用於鳳巢、大搜索、直達號、百度大數據等業務;阿里利用GraphX構建了大規模的圖計算和圖挖掘系統,實現了不少生產系統的推薦算法;騰訊Spark集羣達到8000臺的規模,是當前已知的世界上最大的Spark集羣。mysql

2、爲何要學習Spark

*HadoopMapReduce計算模型存在的問題:es6

學習過HadoopMapReduce的學員都知道,MapReduce的核心是Shuffle(洗牌)。在整個Shuffle的過程當中,至少會產生6次的I/O。下圖是咱們在講MapReduce的時候,畫的Shuffle的過程。web

中間結果輸出:基於MapReduce的計算引擎一般會將中間結果輸出到磁盤上,進行存儲和容錯。另外,當一些查詢(如:Hive)翻譯到MapReduce任務時,每每會產生多個Stage(階段),而這些串聯的Stage又依賴於底層文件系統(如HDFS)來存儲每個Stage的輸出結果,而I/O的效率每每較低,從而影響了MapReduce的運行速度。算法

 

*Spark的最大特色:基於內存sql

SparkMapReduce的替代方案,並且兼容HDFSHive,可融入Hadoop的生態系統,以彌補MapReduce的不足。shell

 

3Spark的特色:快、易用、通用、兼容性

*)快數據庫

HadoopMapReduce相比,Spark基於內存的運算速度要快100倍以上,即便,Spark基於硬盤的運算也要快10倍。Spark實現了高效的DAG執行引擎,從而能夠經過內存來高效處理數據流。apache

 

 

*)易用

Spark支持JavaPythonScalaAPI,還支持超過80種高級算法,使用戶能夠快速構建不一樣的應用。並且Spark支持交互式的PythonScalashell,能夠很是方便地在這些shell中使用Spark集羣來驗證解決問題的方法。

 

 

*)通用

Spark提供了統一的解決方案。Spark能夠用於批處理、交互式查詢(Spark SQL)、實時流處理(Spark Streaming)、機器學習(Spark MLlib)和圖計算(GraphX)。這些不一樣類型的處理均可以在同一個應用中無縫使用。Spark統一的解決方案很是具備吸引力,畢竟任何公司都想用統一的平臺去處理遇到的問題,減小開發和維護的人力成本和部署平臺的物力成本。

另外Spark還能夠很好的融入Hadoop的體系結構中能夠直接操做HDFS,並提供Hive on SparkPig on Spark的框架集成Hadoop

 

*)兼容性

Spark能夠很是方便地與其餘的開源產品進行融合。好比,Spark可使用HadoopYARNApache Mesos做爲它的資源管理和調度器,器,而且能夠處理全部Hadoop支持的數據,包括HDFSHBaseCassandra等。這對於已經部署Hadoop集羣的用戶特別重要,由於不須要作任何數據遷移就可使用Spark的強大處理能力。Spark也能夠不依賴於第三方的資源管理和調度器,它實現了Standalone做爲其內置的資源管理和調度框架,這樣進一步下降了Spark的使用門檻,使得全部人均可以很是容易地部署和使用Spark。此外,Spark還提供了在EC2上部署StandaloneSpark集羣的工具。

 

 

 

 

 

2、Spark的體系結構與安裝部署

1Spark集羣的體系結構

官方的一張圖:

 

我本身的一張圖:

 

2Spark的安裝與部署

Spark的安裝部署方式有如下幾種模式:

l Standalone

l YARN

l Mesos

l Amazon EC2

 

 

 

 

*Spark Standalone僞分佈的部署

配置文件:conf/spark-env.sh

n export JAVA_HOME=/root/training/jdk1.7.0_75

n export SPARK_MASTER_HOST=spark81

n export SPARK_MASTER_PORT=7077

n 下面的能夠不寫,默認

n export SPARK_WORKER_CORES=1

n export SPARK_WORKER_MEMORY=1024m

配置文件:conf/slave

n spark81

 

*Spark Standalone全分佈的部署

配置文件:conf/spark-env.sh

n export JAVA_HOME=/root/training/jdk1.7.0_75

n export SPARK_MASTER_HOST=spark82

n export SPARK_MASTER_PORT=7077

n 下面的能夠不寫,默認

n export SPARK_WORKER_CORES=1

n export SPARK_WORKER_MEMORY=1024m

 

配置文件:conf/slave

n spark83

n spark84

 

*)啓動Spark集羣:start-all.sh

 

 

3Spark HA的實現

*)基於文件系統的單點恢復

主要用於開發或測試環境。當spark提供目錄保存spark Applicationworker的註冊信息,並將他們的恢復狀態寫入該目錄中,這時,一旦Master發生故障,就能夠經過從新啓動Master進程(sbin/start-master.sh),恢復已運行的spark Applicationworker的註冊信息。

基於文件系統的單點恢復,主要是在spark-en.sh裏對SPARK_DAEMON_JAVA_OPTS設置

配置參數

參考值

spark.deploy.recoveryMode

設置爲FILESYSTEM開啓單點恢復功能,默認值:NONE

spark.deploy.recoveryDirectory

Spark 保存恢復狀態的目錄

參考:

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/root/training/spark-2.1.0-bin-hadoop2.7/recovery"

 

 

測試:

1、在spark82上啓動Spark集羣

2、在spark83上啓動spark shell

MASTER=spark://spark82:7077 spark-shell

3、在spark82上中止master

stop-master.sh

4、觀察spark83上的輸出:

 

5、在spark82上重啓master

start-master.sh

 

*)基於ZookeeperStandby Masters

ZooKeeper提供了一個Leader Election機制,利用這個機制能夠保證雖然集羣存在多個Master,可是隻有一個是Active的,其餘的都是Standby。當ActiveMaster出現故障時,另外的一個Standby Master會被選舉出來。因爲集羣的信息,包括WorkerDriverApplication的信息都已經持久化到ZooKeeper,所以在切換的過程當中只會影響新Job的提交,對於正在進行的Job沒有任何的影響。加入ZooKeeper的集羣總體架構以下圖所示。

 


配置參數

參考值

spark.deploy.recoveryMode

設置爲ZOOKEEPER開啓單點恢復功能,默認值:NONE

spark.deploy.zookeeper.url

ZooKeeper集羣的地址

spark.deploy.zookeeper.dir

Spark信息在ZK中的保存目錄,默認:/spark

l 參考:

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=bigdata12:2181,bigdata13:2181,bigdata14:2181 -Dspark.deploy.zookeeper.dir=/spark"

 

l 另外:每一個節點上,須要將如下兩行註釋掉。

 

 

l ZooKeeper中保存的信息

 

 

 

 

3、執行Spark Demo程序

1、執行Spark Example程序

(*)示例程序:$SPARK_HOME/examples/jars/spark-examples_2.11-2.1.0.jar

*)全部的示例程序:$EXAMPLE_HOME/examples/src/main

 JavaScala等等等

*Demo:蒙特卡羅求PI

命令:

spark-submit --master spark://spark81:7077 --class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.11-2.1.0.jar 100

2、使用Spark Shell

spark-shellSpark自帶的交互式Shell程序,方便用戶進行交互式編程,用戶能夠在該命令行下用scala編寫spark程序。

*)啓動Spark Shellspark-shell

也可使用如下參數:

參數說明:

--master spark://spark81:7077 指定Master的地址

--executor-memory 2g 指定每一個worker可用內存爲2G

--total-executor-cores 2 指定整個集羣使用的cup核數爲2

 

例如:

spark-shell --master spark://spark81:7077 --executor-memory 2g --total-executor-cores 2

 

*)注意:

若是啓動spark shell時沒有指定master地址,可是也能夠正常啓動spark shell和執行spark shell中的程序,實際上是啓動了sparklocal模式,該模式僅在本機啓動一個進程,沒有與集羣創建聯繫。

請注意local模式和集羣模式的日誌區別:

 

*)在Spark Shell中編寫WordCount程序

程序以下:

sc.textFile("hdfs://192.168.88.111:9000/data/data.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://192.168.88.111:9000/output/spark/wc")

 

說明:

l scSparkContext對象,該對象時提交spark程序的入口

l textFile("hdfs://192.168.88.111:9000/data/data.txt")hdfs中讀取數據

l flatMap(_.split(" "))map在壓平

l map((_,1))將單詞和1構成元組

l reduceByKey(_+_)按照key進行reduce,並將value累加

l saveAsTextFile("hdfs://192.168.88.111:9000/output/spark/wc")將結果寫入到hdfs

3、在IDEA中編寫WordCount程序

(*)須要的jar包:$SPARK_HOME/jars/*.jar

*)建立Scala Project,並建立Scala Object、或者Java Class

*)書寫源代碼,並打成jar包,上傳到Linux

==========================Scala版本==========================

 

 

 

 

*)運行程序:

spark-submit --master spark://spark81:7077 --class mydemo.WordCount jars/wc.jar hdfs://192.168.88.111:9000/data/data.txt hdfs://192.168.88.111:9000/output/spark/wc1

 

 

====================Java版本(直接輸出在屏幕)====================

 

 

 

*)運行程序:

spark-submit --master spark://spark81:7077 --class mydemo.JavaWordCount jars/wc.jar hdfs://192.168.88.111:9000/data/data.txt

4、Spark運行機制及原理分析

1WordCount執行的流程分析

 

須要看源碼一步步看。

 

 

2Spark提交任務的流程

 

 

 

 

5、Spark的算子

1RDD基礎

 

  • 什麼是RDD

RDDResilient Distributed Dataset)叫作彈性分佈式數據集,是Spark中最基本的數據抽象,它表明一個不可變、可分區、裏面的元素可並行計算的集合。RDD具備數據流模型的特色:自動容錯、位置感知性調度和可伸縮性。RDD容許用戶在執行多個查詢時顯式地將工做集緩存在內存中,後續的查詢可以重用工做集,這極大地提高了查詢速度。

 

  • RDD的屬性(源碼中的一段話)

 

² 一組分片(Partition,即數據集的基本組成單位。對於RDD來講,每一個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶能夠在建立RDD時指定RDD的分片個數,若是沒有指定,那麼就會採用默認值。默認值就是程序所分配到的CPU Core的數目。

² 一個計算每一個分區的函數SparkRDD的計算是以分片爲單位的,每一個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行復合,不須要保存每次計算的結果。

² RDD之間的依賴關係RDD的每次轉換都會生成一個新的RDD,因此RDD之間就會造成相似於流水線同樣的先後依賴關係。在部分分區數據丟失時,Spark能夠經過這個依賴關係從新計算丟失的分區數據,而不是對RDD的全部分區進行從新計算。

² 一個Partitioner,即RDD的分片函數。當前Spark中實現了兩種類型的分片函數,一個是基於哈希的HashPartitioner,另一個是基於範圍的RangePartitioner。只有對於於key-valueRDD,纔會有Partitioner,非key-valueRDDParititioner的值是NonePartitioner函數不但決定了RDD自己的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。

² 一個列表,存儲存取每一個Partition的優先位置(preferred location)。對於一個HDFS文件來講,這個列表保存的就是每一個Partition所在的塊的位置。按照「移動數據不如移動計算」的理念,Spark在進行任務調度的時候,會盡量地將計算任務分配到其所要處理數據塊的存儲位置。

 

 

 

 

RDD的建立方式

  • 經過外部的數據文件建立,如HDFS

val rdd1 = sc.textFile(「hdfs://192.168.88.111:9000/data/data.txt」)

 

  • 經過sc.parallelize進行建立

val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))

 

  • RDD的類型:TransformationAction

 

RDD的基本原理

 

 

 

 

2Transformation

RDD中的全部轉換都是延遲加載的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動做。只有當發生一個要求返回結果給Driver的動做時,這些轉換纔會真正運行。這種設計讓Spark更加有效率地運行。

轉換

含義

map(func)

返回一個新的RDD,該RDD由每個輸入元素通過func函數轉換後組成

filter(func)

返回一個新的RDD,該RDD由通過func函數計算後返回值爲true的輸入元素組成

flatMap(func)

相似於map,可是每個輸入元素能夠被映射爲0或多個輸出元素(因此func應該返回一個序列,而不是單一元素)

mapPartitions(func)

相似於map,但獨立地在RDD的每個分片上運行,所以在類型爲TRDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]

mapPartitionsWithIndex(func)

相似於mapPartitions,但func帶有一個整數參數表示分片的索引值,所以在類型爲TRDD上運行時,func的函數類型必須是(Int, Interator[T]) => Iterator[U]

sample(withReplacement, fraction, seed)

根據fraction指定的比例對數據進行採樣,能夠選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子

union(otherDataset)

對源RDD和參數RDD求並集後返回一個新的RDD

intersection(otherDataset)

對源RDD和參數RDD求交集後返回一個新的RDD

distinct([numTasks]))

對源RDD進行去重後返回一個新的RDD

groupByKey([numTasks])

在一個(K,V)RDD上調用,返回一個(K, Iterator[V])RDD

reduceByKey(func, [numTasks])

在一個(K,V)RDD上調用,返回一個(K,V)RDD,使用指定的reduce函數,將相同key的值聚合到一塊兒,與groupByKey相似,reduce任務的個數能夠經過第二個可選的參數來設置

aggregateByKey(zeroValue)(seqOp,combOp,[numTasks])

 

sortByKey([ascending], [numTasks])

在一個(K,V)RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)RDD

sortBy(func,[ascending], [numTasks])

與sortByKey相似,可是更靈活

join(otherDataset, [numTasks])

在類型爲(K,V)(K,W)RDD上調用,返回一個相同key對應的全部元素對在一塊兒的(K,(V,W))RDD

cogroup(otherDataset, [numTasks])

在類型爲(K,V)(K,W)RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD

cartesian(otherDataset)

笛卡爾積

pipe(command, [envVars])

 

coalesce(numPartitions)

 

repartition(numPartitions)

 

repartitionAndSortWithinPartitions(partitioner)

 

 

 

3Action

動做

含義

reduce(func)

經過func函數彙集RDD中的全部元素,這個功能必須是課交換且可並聯的

collect()

在驅動程序中,以數組的形式返回數據集的全部元素

count()

返回RDD的元素個數

first()

返回RDD的第一個元素(相似於take(1)

take(n)

返回一個由數據集的前n個元素組成的數組

takeSample(withReplacement,num, [seed])

返回一個數組,該數組由從數據集中隨機採樣的num個元素組成,能夠選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子

takeOrdered(n[ordering])

 

saveAsTextFile(path)

將數據集的元素以textfile的形式保存到HDFS文件系統或者其餘支持的文件系統,對於每一個元素,Spark將會調用toString方法,將它裝換爲文件中的文本

saveAsSequenceFile(path

將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可使HDFS或者其餘Hadoop支持的文件系統。

saveAsObjectFile(path

 

countByKey()

針對(K,V)類型的RDD,返回一個(K,Int)map,表示每個key對應的元素個數。

foreach(func)

在數據集的每個元素上,運行函數func進行更新。

4RDD的緩存機制

RDD經過persist方法或cache方法能夠將前面的計算結果緩存,可是並非這兩個方法被調用時當即緩存,而是觸發後面的action時,該RDD將會被緩存在計算節點的內存中,並供後面重用。

 

經過查看源碼發現cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。

 

緩存有可能丟失,或者存儲存儲於內存的數據因爲內存不足而被刪除,RDD的緩存容錯機制保證了即便緩存丟失也能保證計算的正確執行。經過基於RDD的一系列轉換,丟失的數據會被重算,因爲RDD的各個Partition是相對獨立的,所以只須要計算丟失的部分便可,並不須要重算所有Partition

Demo示例:

 

經過UI進行監控:

 

 

 

5RDDCheckpoint(檢查點)機制:容錯機制

檢查點(本質是經過將RDD寫入Disk作檢查點)是爲了經過lineage(血統)作容錯的輔助,lineage過長會形成容錯成本太高,這樣就不如在中間階段作檢查點容錯,若是以後有節點出現問題而丟失分區,從作檢查點的RDD開始重作Lineage,就會減小開銷。

設置checkpoint的目錄,能夠是本地的文件夾、也能夠是HDFS。通常是在具備容錯能力,高可靠的文件系統上(好比HDFS, S3)設置一個檢查點路徑,用於保存檢查點數據。

分別舉例說明:

本地目錄

注意:這種模式,須要將spark-shell運行在本地模式上

 

 

HDFS的目錄

注意:這種模式,須要將spark-shell運行在集羣模式上

 

 

源碼中的一段話

 

6RDD的依賴關係和Spark任務中的Stage

RDD的依賴關係

RDD和它依賴的父RDDs)的關係有兩種不一樣的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。

 

 

  • 窄依賴指的是每個父RDDPartition最多被子RDD的一個Partition使用

總結:窄依賴咱們形象的比喻爲獨生子女

 

  • 寬依賴指的是多個子RDDPartition會依賴同一個父RDDPartition

總結:窄依賴咱們形象的比喻爲超生

 

 

 

 

 

 

 

 

 

Spark任務中的Stage

DAG(Directed Acyclic Graph)叫作有向無環圖,原始的RDD經過一系列的轉換就就造成了DAG,根據RDD之間的依賴關係的不一樣將DAG劃分紅不一樣的Stage,對於窄依賴,partition的轉換處理在Stage中完成計算。對於寬依賴,因爲有Shuffle的存在,只能在parent RDD處理完成後,才能開始接下來的計算,所以寬依賴是劃分Stage的依據

 

7RDD基礎練習

  • 練習1

//經過並行化生成rdd

val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))

//rdd1裏的每個元素乘2而後排序

val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true)

//過濾出大於等於十的元素

val rdd3 = rdd2.filter(_ >= 10)

//將元素以數組的方式在客戶端顯示

rdd3.collect

 

  • 練習2

val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))

//rdd1裏面的每個元素先切分在壓平

val rdd2 = rdd1.flatMap(_.split(' '))

rdd2.collect

 

  • 練習3

val rdd1 = sc.parallelize(List(5, 6, 4, 3))

val rdd2 = sc.parallelize(List(1, 2, 3, 4))

//求並集

val rdd3 = rdd1.union(rdd2)

//求交集

val rdd4 = rdd1.intersection(rdd2)

//去重

rdd3.distinct.collect

rdd4.collect

 

  • 練習4

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))

val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))

//jion

val rdd3 = rdd1.join(rdd2)

rdd3.collect

//求並集

val rdd4 = rdd1 union rdd2

//key進行分組

rdd4.groupByKey

rdd4.collect

 

 

 

  • 練習5

val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))

val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))

//cogroup

val rdd3 = rdd1.cogroup(rdd2)

//注意cogroupgroupByKey的區別

rdd3.collect

 

  • 練習6

val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))

//reduce聚合

val rdd2 = rdd1.reduce(_ + _)

rdd2.collect

 

  • 練習7

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2),  ("shuke", 1)))

val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))

val rdd3 = rdd1.union(rdd2)

//key進行聚合

val rdd4 = rdd3.reduceByKey(_ + _)

rdd4.collect

//value的降序排序

val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))

rdd5.collect

 

 

 

 

 

 

6、Spark RDD的高級算子

1mapPartitionsWithIndex

把每一個partition中的分區號和對應的值拿出來

 

u 接收一個函數參數:

l 第一個參數:分區號

l 第二個參數:分區中的元素

u 示例:將每一個分區中的元素和分區號打印出來。

l val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)

 

建立一個函數返回RDD中的每一個分區號和元素:

def func1(index:Int, iter:Iterator[Int]):Iterator[String] ={

   iter.toList.map( x => "[PartID:" + index + ", value=" + x + "]" ).iterator

}

調用:rdd1.mapPartitionsWithIndex(func1).collect

 

2aggregate

先對局部聚合,再對全局聚合

 

 

 

 

 

 

 

示例:val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)

u 查看每一個分區中的元素:

 

將每一個分區中的最大值求和,注意:初始值是0

 

若是初始值時候10,則結果爲:30

 

若是是求和,注意:初始值是0

 

若是初始值是10,則結果是:45

 

u 一個字符串的例子:

val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)

修改一下剛纔的查看分區元素的函數

def func2(index: Int, iter: Iterator[(String)]) : Iterator[String] = {

  iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator

}

兩個分區中的元素:

[partID:0, val: a], [partID:0, val: b], [partID:0, val: c],

[partID:1, val: d], [partID:1, val: e], [partID:1, val: f]

 

運行結果:

 

u 更復雜一點的例子

val rdd3 = sc.parallelize(List("12","23","345","4567"),2)

rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)

結果多是:」24」,也多是:」42」

 

val rdd4 = sc.parallelize(List("12","23","345",""),2)

rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)

結果是:」10」,也多是」01」,

緣由:注意有個初始值」」,其長度0,而後0.toString變成字符串

 

val rdd5 = sc.parallelize(List("12","23","","345"),2)

rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)

結果是:」11」,緣由同上。

 

3aggregateByKey

n 準備數據:

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)

def func3(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {

  iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator

}

 

n 兩個分區中的元素:

 

n 示例:

l 將每一個分區中的動物最多的個數求和

scala> pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect

res69: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))

 

l 將每種動物個數求和

scala> pairRDD.aggregateByKey(0)(_+_, _ + _).collect

res71: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))

    

    這個例子也可使用:reduceByKey

scala> pairRDD.reduceByKey(_+_).collect

res73: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))

 

 

 

 

4coalescerepartition

都是將RDD中的分區進行重分區。

區別是:coalesce默認不會進行shufflefalse);而repartition會進行shuffletrue),即:會將數據真正經過網絡進行重分區。

n 示例:

def func4(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {

  iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator

}

 

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)

 

下面兩句話是等價的:

val rdd2 = rdd1.repartition(3)

val rdd3 = rdd1.coalesce(3,true) --->若是是false,查看RDDlength依然是2

5、其餘高級算子

參考:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

 

 

 

 

 

 

 

 

7、Spark基礎編程案例

1、案例一:求網站的訪問量

l Tomcat的訪問日誌

 

 

 

l 求出訪問量最高的兩個網頁

n 要求顯示:網頁名稱、訪問量

 

 

 

2、案例二:建立自定義分區

根據jsp文件的名字,將各自的訪問日誌放入到不一樣的分區文件中,以下:

n 生成的分區文件

 

例如:part-00000文件中的內容:只包含了web.jsp的訪問日誌

 

 

 

3、案例三:訪問數據庫

RDD的數據保存到Oracle數據庫中

 

調用:

 

 

使用JdbcRDD:執行SQL語句

 

JdbcRDD參數說明:

參數名稱

類型

說明

sc

org.apache.spark.SparkContext

Spark Context對象

getConnection

scala.Function0[java.sql.Connection]

獲得一個數據庫Connection

sql

scala.Predef.String

執行的SQL語句

lowerBound

scala.Long

下邊界值,即:SQL的第一個參數

upperBound

scala.Long

上邊界值,即:SQL的第二個參數

numPartitions

scala.Int

分區的個數,即:啓動多少個Executor

mapRow

scala.Function1[java.sql.ResultSet, T]

獲得的結果集

 

JdbcRDD的缺點:從上面的參數說明能夠看出,JdbcRDD有如下兩個缺點:

1.執行的SQL必須有兩個參數,並類型都是Long

2.獲得的結果是ResultSet,即:只支持select操做

 

 

 

===========Spark SQL===========

1、Spark SQL基礎

1Spark SQL簡介

 

 

Spark SQLSpark用來處理結構化數據的一個模塊,它提供了一個編程抽象叫作DataFrame而且做爲分佈式SQL查詢引擎的做用。

 

爲何要學習Spark SQL咱們已經學習了Hive,它是將Hive SQL轉換成MapReduce而後提交到集羣上執行,大大簡化了編寫MapReduce的程序的複雜性,因爲MapReduce這種計算模型執行效率比較慢。因此Spark SQL的應運而生,它是將Spark SQL轉換成RDD,而後提交到集羣執行,執行效率很是快!同時Spark SQL也支持從Hive中讀取數據。

 

Spark SQL的特色:

l 容易整合(集成)

 

 

l 統一的數據訪問方式

 

兼容Hive

 

l 標準的數據鏈接

 

2、基本概念:DatasetsDataFrames

u DataFrame

DataFrame是組織成命名列的數據集。它在概念上等同於關係數據庫中的,但在底層具備更豐富的優化。DataFrames能夠從各類來源構建,

例如:

l 結構化數據文件

l hive中的表

外部數據庫或現有RDDs

DataFrame API支持的語言有ScalaJavaPythonR

 

從上圖能夠看出,DataFrame多了數據的結構信息,schemaRDD是分佈式的 Java對象的集合。DataFrame是分佈式的Row對象的集合。DataFrame除了提供了比RDD更豐富的算子之外,更重要的特色是提高執行效率、減小數據讀取以及執行計劃的優化

 

u Datasets

Dataset是數據的分佈式集合。Dataset是在Spark 1.6中添加的一個新接口,是DataFrame之上更高一級的抽象。它提供了RDD的優勢(強類型化,使用強大的lambda函數的能力)以及Spark SQL優化後的執行引擎的優勢。一個Dataset 能夠從JVM對象構造,而後使用函數轉換(mapflatMapfilter等)去操做。 Dataset API 支持ScalaJavaPython不支持Dataset API

3、測試數據

使用員工表的數據,並已經將其保存到了HDFS上。

 

4、建立DataFrames

*)經過Case Class建立DataFrames

① 定義case class(至關於表的結構:Schema

注意:因爲mgrcomm列中包含null值,簡單起見,將對應的case class類型定義爲String

 

 

② HDFS上的數據讀入RDD,並將RDDcase Class關聯

 

 

③ RDD轉換成DataFrames

 

④ 經過DataFrames查詢數據

 

*)使用SparkSession

① 什麼是SparkSession

Apache Spark 2.0引入了SparkSession,其爲用戶提供了一個統一的切入點來使用Spark的各項功能,而且容許用戶經過它調用DataFrameDataset相關API來編寫Spark程序。最重要的是,它減小了用戶須要瞭解的一些概念,使得咱們能夠很容易地與Spark交互。

2.0版本以前,與Spark交互以前必須先建立SparkConfSparkContext然而在Spark 2.0中,咱們能夠經過SparkSession來實現一樣的功能,而不須要顯式地建立SparkConf, SparkContext 以及 SQLContext,由於這些對象已經封裝在SparkSession中。

 

② 建立StructType,來定義Schema結構信息

 

 

注意,須要:import org.apache.spark.sql.types._

 

③ 讀入數據而且切分數據

 

④ RDD中的數據映射成Row

 

注意,須要:import org.apache.spark.sql.Row

⑤ 建立DataFrames

val df = spark.createDataFrame(rowRDD,myschema)

 

再舉一個例子,使用JSon文件來建立DataFame

① 源文件:$SPARK_HOME/examples/src/main/resources/people.json

② val df = spark.read.json("源文件")

③ 查看數據和Schema信息

 

5DataFrame操做

DataFrame操做也稱爲無類型的Dataset操做

 

*)查詢全部的員工姓名

 

 

*)查詢全部的員工姓名和薪水,並給薪水加100塊錢

 

 

*)查詢工資大於2000的員工

 

 

*)求每一個部門的員工人數

 

完整的例子,請參考:

http://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.sql.Dataset 

 

*)在DataFrame中使用SQL語句

① 將DataFrame註冊成表(視圖):df.createOrReplaceTempView("emp")

② 執行查詢:spark.sql("select * from emp").show

              spark.sql("select * from emp where deptno=10").show

              spark.sql("select deptno,sum(sal) from emp group by deptno").show

 

6Global Temporary View

上面使用的是一個在Session生命週期中的臨時views。在Spark SQL中,若是你想擁有一個臨時的view,並想在不一樣的Session中共享,並且在application的運行週期內可用,那麼就須要建立一個全局的臨時view。並記得使用的時候加上global_temp做爲前綴來引用它,由於全局的臨時view是綁定到系統保留的數據庫global_temp上。

① 建立一個普通的view和一個全局的view

df.createOrReplaceTempView("emp1")

df.createGlobalTempView("emp2")

 

② 在當前會話中執行查詢,都可查詢出結果。

spark.sql("select * from emp1").show

spark.sql("select * from global_temp.emp2").show

 

③ 開啓一個新的會話,執行一樣的查詢

spark.newSession.sql("select * from emp1").show     (運行出錯)

spark.newSession.sql("select * from global_temp.emp2").show

 

7、建立Datasets

DataFrame的引入,可讓Spark更好的處理結構數據的計算,但其中一個主要的問題是:缺少編譯時類型安全。爲了解決這個問題,Spark採用新的Dataset API (DataFrame API的類型擴展)

 

 

Dataset是一個分佈式的數據收集器。這是在Spark1.6以後新加的一個接口,兼顧了RDD的優勢(強類型,可使用功能強大的lambda)以及Spark SQL的執行器高效性的優勢。因此能夠把DataFrames當作是一種特殊的Datasets,即:Dataset(Row)

 

*)建立DataSet,方式一:使用序列

1、定義case class

    case class MyData(a:Int,b:String)

 

2、生成序列,並建立DataSet

   val ds = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS

 

3、查看結果

 

 

*)建立DataSet,方式二:使用JSON數據

1、定義case class

             case class Person(name: String, gender: String)

 

2、經過JSON數據生成DataFrame

             val df = spark.read.json(sc.parallelize("""{"gender": "Male", "name": "Tom"}""" :: Nil))

 

3、將DataFrame轉成DataSet

               df.as[Person].show

               df.as[Person].collect

 

*)建立DataSet,方式三:使用HDFS數據

1、讀取HDFS數據,並建立DataSet

                val linesDS = spark.read.text("hdfs://hadoop111:9000/data/data.txt").as[String]

 

2、對DataSet進行操做:分詞後,查詢長度大於3的單詞

                val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3)

                words.show

                words.collect

 

            3、執行WordCount程序

             val result = linesDS.flatMap(_.split(" ")).map((_,1)).groupByKey(x => x._1).count

             result.show

             排序:result.orderBy($"value").show

 

8Datasets的操做案例

 

 

*)使用emp.json 生成DataFrame

val empDF = spark.read.json("/root/resources/emp.json")

            查詢工資大於3000的員工

            empDF.where($"sal" >= 3000).show

 

 

*)建立case class

case class Emp(empno:Long,ename:String,job:String,hiredate:String,mgr:String,sal:Long,comm:String,deptno:Long)

 

*)生成DataSets,並查詢數據

     val empDS = empDF.as[Emp]

 

     查詢工資大於3000的員工

     empDS.filter(_.sal > 3000).show

 

     查看10號部門的員工

     empDS.filter(_.deptno == 10).show

 

*)多表查詢

1、建立部門表

val deptRDD=sc.textFile("/root/temp/dept.csv").map(_.split(","))

case class Dept(deptno:Int,dname:String,loc:String)

val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS

 

2、建立員工表

case class Emp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)

val empRDD = sc.textFile("/root/temp/emp.csv").map(_.split(","))

val empDS = empRDD.map(x => Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt)).toDS

 

3、執行多表查詢:等值連接

    val result = deptDS.join(empDS,"deptno")

    

    另外一種寫法:注意有三個等號

    val result = deptDS.joinWith(empDS,deptDS("deptno")=== empDS("deptno"))

    joinWithjoin的區別是鏈接後的新Datasetschema會不同

 

*)查看執行計劃:result.explain

 

2、使用數據源 

1、通用的Load/Save函數

*)什麼是parquet文件?

Parquet是列式存儲格式的一種文件類型,列式存儲有如下的核心:

能夠跳過不符合條件的數據,只讀取須要的數據,下降IO數據量。

壓縮編碼能夠下降磁盤存儲空間。因爲同一列的數據類型是同樣的,可使用更高效的壓縮編碼(例如Run Length EncodingDelta Encoding)進一步節約存儲空間。

l 只讀取須要的列,支持向量運算,可以獲取更好的掃描性能。

l Parquet格式是Spark SQL的默認數據源,可經過spark.sql.sources.default配置

 

*)通用的Load/Save函數

讀取Parquet文件

val usersDF = spark.read.load("/root/resources/users.parquet")

 

查詢Schema和數據

 

 

查詢用戶的name和喜好顏色,並保存

usersDF.select($"name",$"favorite_color").write.save("/root/result/parquet")

 

l 驗證結果

 

 

*)顯式指定文件格式:加載json格式

直接加載:val usersDF = spark.read.load("/root/resources/people.json")

                      會出錯

l val usersDF = spark.read.format("json").load("/root/resources/people.json")

 

 

*)存儲模式(Save Modes

能夠採用SaveMode執行存儲操做,SaveMode定義了對數據的處理模式。須要注意的是,這些保存模式不使用任何鎖定,不是原子操做。此外,當使用Overwrite方式執行時,在輸出新數據以前原數據就已經被刪除。SaveMode詳細介紹以下表:

 

Demo

l usersDF.select($"name").write.save("/root/result/parquet1")

--> 出錯:由於/root/result/parquet1已經存在

 

l usersDF.select($"name").write.mode("overwrite").save("/root/result/parquet1")

 

*)將結果保存爲表

l usersDF.select($"name").write.saveAsTable("table1")

 

也能夠進行分區、分桶等操做:partitionBybucketBy

2Parquet文件

Parquet是一個列格式並且用於多個數據處理系統中。Spark SQL提供支持對於Parquet文件的讀寫,也就是自動保存原始數據的schema。當寫Parquet文件時,全部的列被自動轉化爲nullable,由於兼容性的緣故。

*)案例:

讀入json格式的數據,將其轉換成parquet格式,並建立相應的表來使用SQL進行查詢。

 

 

 

 

*Schema的合併:

Parquet支持Schema evolutionSchema演變,即:合併)。用戶能夠先定義一個簡單的Schema,而後逐漸的向Schema中增長列描述。經過這種方式,用戶能夠獲取多個有不一樣Schema但相互兼容的Parquet文件。

Demo:

 

 

3JSON Datasets

Spark SQL能自動解析JSON數據集的Schema,讀取JSON數據集爲DataFrame格式。讀取JSON數據集方法爲SQLContext.read().json()。該方法將String格式的RDDJSON文件轉換爲DataFrame

須要注意的是,這裏的JSON文件不是常規的JSON格式。JSON文件每一行必須包含一個獨立的、自知足有效的JSON對象。若是用多行描述一個JSON對象,會致使讀取出錯。讀取JSON數據集示例以下:

*Demo1:使用Spark自帶的示例文件 --> people.json 文件

定義路徑:

val path ="/root/resources/people.json"

 

讀取Json文件,生成DataFrame

val peopleDF = spark.read.json(path)

 

打印Schema結構信息:

peopleDF.printSchema()

 

建立臨時視圖:

peopleDF.createOrReplaceTempView("people")

 

執行查詢

spark.sql("SELECT name FROM people WHERE age=19").show

 

4、使用JDBC

Spark SQL一樣支持經過JDBC讀取其餘數據庫的數據做爲數據源。

Demo演示:使用Spark SQL讀取Oracle數據庫中的表。

啓動Spark Shell的時候,指定Oracle數據庫的驅動

spark-shell --master spark://spark81:7077            \\

         --jars /root/temp/ojdbc6.jar              \\

         --driver-class-path /root/temp/ojdbc6.jar

 

讀取Oracle數據庫中的數據

 

*)方式一:

            val oracleDF = spark.read.format("jdbc").

         option("url","jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com").

         option("dbtable","scott.emp").

         option("user","scott").

         option("password","tiger").

         load

 

*)方式二:

導入須要的類:

import java.util.Properties   

 

定義屬性:               

val oracleprops = new Properties()

oracleprops.setProperty("user","scott")

oracleprops.setProperty("password","tiger")

 

 

讀取數據:

val oracleEmpDF =

   spark.read.jdbc("jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com",

   "scott.emp",oracleprops)

 

注意:下面是讀取Oracle 10gWindows上)的步驟

 

5、使用Hive Table

首先,搭建好Hive的環境(須要Hadoop

配置Spark SQL支持Hive

只須要將如下文件拷貝到$SPARK_HOME/conf的目錄下,便可

u $HIVE_HOME/conf/hive-site.xml

u $HADOOP_CONF_DIR/core-site.xml

u $HADOOP_CONF_DIR/hdfs-site.xml

 

使用Spark Shell操做Hive

啓動Spark Shell的時候,須要使用--jars指定mysql的驅動程序

n 建立表

spark.sql("create table src (key INT, value STRING) row format delimited fields terminated by ','")

 

n 導入數據

spark.sql("load data local path '/root/temp/data.txt' into table src")

 

n 查詢數據

spark.sql("select * from src").show

 

使用spark-sql操做Hive

啓動spark-sql的時候,須要使用--jars指定mysql的驅動程序

操做Hive

u show tables;

u select * from ;

 

 

3、性能優化

1、在內存中緩存數據

性能調優主要是將數據放入內存中操做。經過spark.cacheTable("tableName")或者dataFrame.cache()。使用spark.uncacheTable("tableName")來從內存中去除table

Demo案例:

*)從Oracle數據庫中讀取數據,生成DataFrame

     val oracleDF = spark.read.format("jdbc")

        .option("url","jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com")

        .option("dbtable","scott.emp")

        .option("user","scott")

        .option("password","tiger").load

 

*)將DataFrame註冊成表:    oracleDF.registerTempTable("emp")

 

*)執行查詢,並經過Web Console監控執行的時間

                 spark.sql("select * from emp").show

 

*)將表進行緩存,並查詢兩次,並經過Web Console監控執行的時間

     spark.sqlContext.cacheTable("emp")

 

*)清空緩存:

      spark.sqlContext.cacheTable("emp")

      spark.sqlContext.clearCache

2、性能優化相關參數

l 將數據緩存到內存中的相關優化參數

n spark.sql.inMemoryColumnarStorage.compressed

默認爲 true

u Spark SQL 將會基於統計信息自動地爲每一列選擇一種壓縮編碼方式。

 

n spark.sql.inMemoryColumnarStorage.batchSize

默認值:10000

緩存批處理大小。緩存數據時, 較大的批處理大小能夠提升內存利用率和壓縮率,但同時也會帶來 OOMOut Of Memory)的風險。

 

l 其餘性能相關的配置選項(不過不推薦手動修改,可能在後續版本自動的自適應修改)

n spark.sql.files.maxPartitionBytes

默認值:128 MB

u 讀取文件時單個分區可容納的最大字節數

 

n spark.sql.files.openCostInBytes

默認值:4M

打開文件的估算成本, 按照同一時間可以掃描的字節數來測量。當往一個分區寫入多個文件的時候會使用。高估更好, 這樣的話小文件分區將比大文件分區更快 (先被調度)

 

l spark.sql.autoBroadcastJoinThreshold

默認值:10M

用於配置一個表在執行 join 操做時可以廣播給全部 worker 節點的最大字節大小。經過將這個值設置爲 -1 能夠禁用廣播。注意,當前數據統計僅支持已經運行了 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan 命令的 Hive Metastore 表。

 

l spark.sql.shuffle.partitions

默認值:200

用於配置 join 或聚合操做混洗(shuffle)數據時使用的分區數。

 

4、在IDEA中開發Spark SQL程序

1、指定Schema格式

 

 

2、使用case class

 

3、就數據保存到數據庫

 

 

 

 

 

=======Spark Streaming=======

1、Spark Streaming基礎

1Spark Streaming簡介

Spark Streaming是核心Spark API的擴展,可實現可擴展、高吞吐量、可容錯的實時數據流處理。數據能夠從諸如KafkaFlumeKinesisTCP套接字等衆多來源獲取,而且可使用由高級函數(如mapreducejoinwindow)開發的複雜算法進行流數據處理。最後,處理後的數據能夠被推送到文件系統,數據庫和實時儀表板。並且,您還能夠在數據流上應用Spark提供的機器學習和圖處理算法。

 

2Spark Streaming的特色

 

 

 

 

3Spark Streaming的內部結構

在內部,它的工做原理以下。Spark Streaming接收實時輸入數據流,並將數據切分紅批,而後由Spark引擎對其進行處理,最後生成「批」形式的結果流。

 

Spark Streaming將連續的數據流抽象爲discretizedstreamDStream。在內部,DStream 由一個RDD序列表示。

4、第一個小案例:NetworkWordCount

1)因爲在本案例中須要使用netcat網絡工具,因此須要先安裝。

 

rpm -iUv ~/netcat-0.6.1-1.i386.rpm

 

2)啓動netcat數據流服務器,並監聽端口:1234

命令:nc -l -p 9999

服務器端:

 

 

3)啓動客戶端

bin/run-example streaming.NetworkWordCount localhost 1234

 

客戶端:

 

(必定注意):若是要執行本例,必須確保機器cpu核數大於2

 

5、開發本身的NetworkWordCount

 

 

 

(必定注意):

val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")

 

官方的解釋:

 

 

 

 

 

 

 

2、Spark Streaming進階

1StreamingContext對象詳解

初始化StreamingContext

方式一:從SparkConf對象中建立

 

 

從一個現有的SparkContext實例中建立

 

l 程序中的幾點說明:

n appName參數是應用程序在集羣UI上顯示的名稱。

n masterSparkMesosYARN集羣的URL,或者一個特殊的「local [*]」字符串來讓程序以本地模式運行。

當在集羣上運行程序時,不須要在程序中硬編碼master參數,而是使用spark-submit提交應用程序並將masterURL以腳本參數的形式傳入。可是,對於本地測試和單元測試,您能夠經過「local[*]」來運行Spark Streaming程序(請確保本地系統中的cpu核心數夠用)。

n StreamingContext會內在的建立一個SparkContext的實例(全部Spark功能的起始點),你能夠經過ssc.sparkContext訪問到這個實例。

n 批處理的時間窗口長度必須根據應用程序的延遲要求和可用的集羣資源進行設置。

 

請務必記住如下幾點:

一旦一個StreamingContextt開始運做,就不能設置或添加新的流計算。

n 一旦一個上下文被中止,它將沒法從新啓動。

同一時刻,一個JVM中只能有一個StreamingContext處於活動狀態。

n StreamingContext上的stop()方法也會中止SparkContext。 要僅中止StreamingContext(保持SparkContext活躍),請將stop() 方法的可選參數stopSparkContext設置爲false

只要前一個StreamingContext在下一個StreamingContext被建立以前中止(不中止SparkContext),SparkContext就能夠被重用來建立多個StreamingContext

2、離散流(DStreams):Discretized Streams

l DiscretizedStreamDStream Spark Streaming對流式數據的基本抽象。它表示連續的數據流,這些連續的數據流能夠是從數據源接收的輸入數據流,也能夠是經過對輸入數據流執行轉換操做而生成的經處理的數據流。在內部,DStream由一系列連續的RDD表示,以下圖:

 

 

舉例分析:在以前的NetworkWordCount的例子中,咱們將一行行文本組成的流轉換爲單詞流,具體作法爲:將flatMap操做應用於名爲linesDStream中的每一個RDD上,以生成words DStreamRDD。以下圖所示:

 

可是DStream和RDD也有區別,下面畫圖說明:

 

 

 

3DStream中的轉換操做(transformation

 

最後兩個transformation算子須要重點介紹一下:

 

n transform(func)

經過RDD-to-RDD函數做用於源DStream中的各個RDD,能夠是任意的RDD操做,從而返回一個新的RDD

舉例:在NetworkWordCount中,也可使用transform來生成元組對

 

 

 

 

 

n updateStateByKey(func)

u 操做容許不斷用新信息更新它的同時保持任意狀態。

定義狀態-狀態能夠是任何的數據類型

定義狀態更新函數-怎樣利用更新前的狀態和從輸入流裏面獲取的新值更新狀態

重寫NetworkWordCount程序,累計每一個單詞出現的頻率(注意:累計)

 

 

u 輸出結果:

 

 

注意:若是在IDEA中,不想輸出log4j的日誌信息,能夠將log4j.properties文件(放在src的目錄下)的第一行改成:

log4j.rootCategory=ERROR, console

 

 

 

4、窗口操做

Spark Streaming還提供了窗口計算功能,容許您在數據的滑動窗口上應用轉換操做。下圖說明了滑動窗口的工做方式:

 

如圖所示,每當窗口滑過originalDStream時,落在窗口內的源RDD被組合並被執行操做以產生windowed DStreamRDD。在上面的例子中,操做應用於最近3個時間單位的數據,並以2個時間單位滑動。這代表任何窗口操做都須要指定兩個參數。

窗口長度(windowlength- 窗口的時間長度(上圖的示例中爲:3)。

滑動間隔(slidinginterval- 兩次相鄰的窗口操做的間隔(即每次滑動的時間長度)(上圖示例中爲:2)。

這兩個參數必須是源DStream的批間隔的倍數(上圖示例中爲:1)。

 

咱們以一個例子來講明窗口操做。 假設您但願對以前的單詞計數的示例進行擴展,每10秒鐘對過去30秒的數據進行wordcount。爲此,咱們必須在最近30秒的pairs DStream數據中對(word, 1)鍵值對應用reduceByKey操做。這是經過使用reduceByKeyAndWindow操做完成的。

 

 

一些常見的窗口操做以下表所示。全部這些操做都用到了上述兩個參數 - windowLengthslideInterval

u window(windowLength, slideInterval)

基於源DStream產生的窗口化的批數據計算一個新的DStream

 

u countByWindow(windowLength, slideInterval)

l 返回流中元素的一個滑動窗口數

 

u reduceByWindow(func, windowLength, slideInterval)

返回一個單元素流。利用函數func彙集滑動時間間隔的流的元素建立這個單元素流。函數必須是相關聯的以使計算可以正確的並行計算。

 

 

u reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])

應用到一個(K,V)對組成的DStream上,返回一個由(K,V)對組成的新的DStream。每個key的值均由給定的reduce函數彙集起來。注意:在默認狀況下,這個算子利用了Spark默認的併發任務數去分組。你能夠用numTasks參數設置不一樣的任務數

 

u reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

上述reduceByKeyAndWindow() 的更高效的版本,其中使用前一窗口的reduce計算結果遞增地計算每一個窗口的reduce值。這是經過對進入滑動窗口的新數據進行reduce操做,以及「逆減(inverse reducing)」離開窗口的舊數據來完成的。一個例子是當窗口滑動時對鍵對應的值進行「一加一減」操做。可是,它僅適用於「可逆減函數(invertible reduce functions)」,即具備相應「反減」功能的減函數(做爲參數invFunc)。 像reduceByKeyAndWindow同樣,經過可選參數能夠配置reduce任務的數量。 請注意,使用此操做必須啓用檢查點。

 

u countByValueAndWindow(windowLength, slideInterval, [numTasks])

應用到一個(K,V)對組成的DStream上,返回一個由(K,V)對組成的新的DStream。每一個key的值都是它們在滑動窗口中出現的頻率。

5、輸入DStreams和接收器

輸入DStreams表示從數據源獲取輸入數據流的DStreams。在NetworkWordCount例子中,lines表示輸入DStream,它表明從netcat服務器獲取的數據流。每個輸入流DStream和一個Receiver對象相關聯,這個Receiver從源中獲取數據,並將數據存入內存中用於處理。

輸入DStreams表示從數據源獲取的原始數據流。Spark Streaming擁有兩類數據源:

基本源(Basic sources):這些源在StreamingContext API中直接可用。例如文件系統、套接字鏈接、Akkaactor

高級源(Advanced sources):這些源包括Kafka,Flume,Kinesis,Twitter等等。

 

下面經過具體的案例,詳細說明:

 

  • 文件流:經過監控文件系統的變化,如有新文件添加,則將它讀入並做爲數據流

須要注意的是:

① 這些文件具備相同的格式

② 這些文件經過原子移動或重命名文件的方式在dataDirectory建立

③ 若是在文件中追加內容,這些追加的新數據也不會被讀取。

 

注意:要演示成功,須要在原文件中編輯,而後拷貝一份。

 

 

 

  • RDD隊列流

使用streamingContext.queueStream(queueOfRDD)建立基於RDD隊列的DStream,用於調試Spark Streaming應用程序。

 

 

 

  • 套接字流:經過監聽Socket端口來接收數據

 

  

6DStreams的輸出操做

輸出操做容許DStream的操做推到如數據庫、文件系統等外部系統中。由於輸出操做其實是容許外部系統消費轉換後的數據,它們觸發的實際操做是DStream轉換。目前,定義了下面幾種輸出操做:

 

 

 

 

 

 

 

 

 

 

foreachRDD的設計模式

DStream.foreachRDD是一個強大的原語,發送數據到外部系統中。

 

  • 第一步:建立鏈接,將數據寫入外部數據庫(使用以前的NetworkWordCount,改寫以前輸出結果的部分,以下)

 

出現如下Exception

 

緣由是:Connection對象不是一個可被序列化的對象,不能RDD的每一個Worker上運行;即:Connection不能在RDD分佈式環境中的每一個分區上運行,由於不一樣的分區可能運行在不一樣的Worker上。因此須要在每一個RDD分區上單首創建Connection對象。

 

  • 第二步:在每一個RDD分區上單首創建Connection對象,以下:

 

 

7DataFrameSQL操做

咱們能夠很方便地使用DataFramesSQL操做來處理流數據。您必須使用當前的StreamingContext對應的SparkContext建立一個SparkSession。此外,必須這樣作的另外一個緣由是使得應用能夠在driver程序故障時得以從新啓動,這是經過建立一個能夠延遲實例化的單例SparkSession來實現的。

在下面的示例中,咱們使用DataFramesSQL來修改以前的wordcount示例並對單詞進行計數。咱們將每一個RDD轉換爲DataFrame,並註冊爲臨時表,而後在這張表上執行SQL查詢。

 

 

8、緩存/持久化

RDD相似,DStreams還容許開發人員將流數據保留在內存中。也就是說,在DStream上調用persist() 方法會自動將該DStream的每一個RDD保留在內存中。若是DStream中的數據將被屢次計算(例如,相同數據上執行多個操做),這個操做就會頗有用。對於基於窗口的操做,如reduceByWindowreduceByKeyAndWindow以及基於狀態的操做,如updateStateByKey,數據會默認進行持久化。 所以,基於窗口的操做生成的DStream會自動保存在內存中,而不須要開發人員調用persist()

 

對於經過網絡接收數據(例如KafkaFlumesockets等)的輸入流,默認持久化級別被設置爲將數據複製到兩個節點進行容錯。

 

請注意,與RDD不一樣,DStreams的默認持久化級別將數據序列化保存在內存中。

 

 

 

9、檢查點支持

流數據處理程序一般都是全天候運行,所以必須對應用中邏輯無關的故障(例如,系統故障,JVM崩潰等)具備彈性。爲了實現這一特性,Spark Streaming須要checkpoint足夠的信息到容錯存儲系統,以即可以從故障中恢復。

 

① 通常會對兩種類型的數據使用檢查點:

1) 元數據檢查點(Metadatacheckpointing - 將定義流計算的信息保存到容錯存儲中(如HDFS)。這用於從運行streaming程序的driver程序的節點的故障中恢復。元數據包括如下幾種:

配置(Configuration- 用於建立streaming應用程序的配置信息。

l DStream操做(DStream operations- 定義streaming應用程序的DStream操做集合。

不完整的batchIncomplete batches- jobs還在隊列中但還沒有完成的batch

 

2) 數據檢查點(Datacheckpointing- 將生成的RDD保存到可靠的存儲層。對於一些須要將多個批次之間的數據進行組合的stateful變換操做,設置數據檢查點是必需的。在這些轉換操做中,當前生成的RDD依賴於先前批次的RDD,這致使依賴鏈的長度隨時間而不斷增長,由此也會致使基於血統機制的恢復時間無限增長。爲了不這種狀況,stateful轉換的中間RDD將按期設置檢查點並保存到到可靠的存儲層(例如HDFS)以切斷依賴關係鏈。

 

總而言之,元數據檢查點主要用於從driver程序故障中恢復,而數據或RDD檢查點在任何使用stateful轉換時是必需要有的。

 

② 什麼時候啓用檢查點:

對於具備如下任一要求的應用程序,必須啓用檢查點:

1) 使用狀態轉:若是在應用程序中使用updateStateByKeyreduceByKeyAndWindow(具備逆函數),則必須提供檢查點目錄以容許按期保存RDD檢查點。

2) 從運行應用程序的driver程序的故障中恢復:元數據檢查點用於使用進度信息進行恢復。

 

③ 如何配置檢查點:

能夠經過在一些可容錯、高可靠的文件系統(例如,HDFSS3等)中設置保存檢查點信息的目錄來啓用檢查點。這是經過使用streamingContext.checkpoint(checkpointDirectory)完成的。設置檢查點後,您就可使用上述的有狀態轉換操做。此外,若是要使應用程序從驅動程序故障中恢復,您應該重寫streaming應用程序以使程序具備如下行爲:

1) 當程序第一次啓動時,它將建立一個新的StreamingContext,設置好全部流數據源,而後調用start()方法。

2) 當程序在失敗後從新啓動時,它將從checkpoint目錄中的檢查點數據從新建立一個StreamingContext

使用StreamingContext.getOrCreate能夠簡化此行爲

 

 

④ 改寫以前的WordCount程序,使得每次計算的結果和狀態都保存到檢查點目錄下

 

 

 

經過查看HDFS中的信息,能夠看到相關的檢查點信息,以下:

 

 

 

 

 

 

 

 

 

3、高級數據源

1Spark Streaming接收Flume數據

 

基於FlumePush模式

Flume被用於在Flume agents之間推送數據.在這種方式下,Spark Streaming能夠很方便的創建一個receiver,起到一個Avro agent的做用.Flume能夠將數據推送到改receiver.

  • 第一步:Flume的配置文件

 

 

  • 第二步:Spark Streaming程序

 

 

 

  • 第三步:注意除了須要使用Flumelibjar包之外,還須要如下jar包:

 

 

  • 第四步:測試

啓動Spark Streaming程序

啓動Flume

拷貝日誌文件到/root/training/logs目錄

n 觀察輸出,採集到數據

 

 

基於Custom SinkPull模式

不一樣於Flume直接將數據推送到Spark Streaming中,第二種模式經過如下條件運行一個正常的Flume sinkFlume將數據推送到sink中,而且數據保持buffered狀態。Spark Streaming使用一個可靠的Flume接收器和轉換器從sink拉取數據。只要當數據被接收而且被Spark Streaming備份後,轉換器才運行成功。

這樣,與第一種模式相比,保證了很好的健壯性和容錯能力。然而,這種模式須要爲Flume配置一個正常的sink

如下爲配置步驟:

 

  • 第一步:Flume的配置文件

 

 

 

 

  • 第二步:Spark Streaming程序

 

 

  • 第三步:須要的jar

Sparkjar包拷貝到Flumelib目錄下

下面的這個jar包也須要拷貝到Flumelib目錄下,同時加入IDEA工程的classpath

 

 

 

  • 第四步:測試

啓動Flume

IDEA中啓動FlumeLogPull

將測試數據拷貝到/root/training/logs

觀察IDEA中的輸出

 

 

 

 

 

2Spark Streaming接收Kafka數據

Apache Kafka是一種高吞吐量的分佈式發佈訂閱消息系統。

 

 

搭建ZooKeeperStandalone):

*)配置/root/training/zookeeper-3.4.10/conf/zoo.cfg文件

dataDir=/root/training/zookeeper-3.4.10/tmp

server.1=spark81:2888:3888

 

*)在/root/training/zookeeper-3.4.10/tmp目錄下建立一個myid的空文件

echo 1 > /root/training/zookeeper-3.4.6/tmp/myid

 

搭建Kafka環境(單機單broker):

*)修改server.properties文件

 

*)啓動Kafka

bin/kafka-server-start.sh config/server.properties &

 

 出現如下錯誤:

 

     須要修改bin/kafka-run-class.sh文件,將這個選項註釋掉。

 

 

 

*)測試Kafka

建立Topic

bin/kafka-topics.sh --create --zookeeper spark81:2181 -replication-factor 1 --partitions 3 --topic mydemo1

 

l 發送消息

bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1

 

l 接收消息

bin/kafka-console-consumer.sh --zookeeper spark81:2181 --topic mydemo1

 

搭建Spark StreamingKafka的集成開發環境

因爲Spark StreamingKafka集成的時候,依賴的jar包比較多,並且還會產生衝突。強烈建議使用Maven的方式來搭建項目工程

下面是依賴的pom.xml文件:

    

 

基於Receiver的方式

這個方法使用了Receivers來接收數據。Receivers的實現使用到Kafka高層次的消費者API。對於全部的Receivers,接收到的數據將會保存在Spark executors中,而後由Spark Streaming啓動的Job來處理這些數據。

 

 

  • 開發Spark StreamingKafka Receivers

 

 

 

 

 

  • 測試

啓動Kafka消息的生產者

bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1

 

IDEA中啓動任務,接收Kafka消息

 

 

直接讀取方式

和基於Receiver接收數據不同,這種方式按期地從Kafkatopic+partition中查詢最新的偏移量,再根據定義的偏移量範圍在每一個batch裏面處理數據。看成業須要處理的數據來臨時,spark經過調用Kafka的簡單消費者API讀取必定範圍的數據。

 

 

  • 開發Spark Streaming的程序

 

 

  • 測試

啓動Kafka消息的生產者

bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1

 

IDEA中啓動任務,接收Kafka消息

 

 

 

 

 

 

4、性能優化

1、減小批數據的執行時間

Spark中有幾個優化能夠減小批處理的時間:

① 數據接收的並行水平

經過網絡(kafkaflumesocket)接收數據須要這些數據反序列化並被保存到Spark中。若是數據接收成爲系統的瓶頸,就要考慮並行地接收數據。注意,每一個輸入DStream建立一個receiver(運行在worker機器上)接收單個數據流。建立多個輸入DStream並配置它們能夠從源中接收不一樣分區的數據流,從而實現多數據流接收。例如,接收兩個topic數據的單個輸入DStream能夠被切分爲兩個kafka輸入流,每一個接收一個topic。這將在兩個worker上運行兩個receiver,所以容許數據並行接收,提升總體的吞吐量。多個DStream能夠被合併生成單個DStream,這樣運用在單個輸入DStreamtransformation操做能夠運用在合併的DStream上。

 

 

 

② 數據處理的並行水平

若是運行在計算stage上的併發任務數不足夠大,就不會充分利用集羣的資源。默認的併發任務數經過配置屬性來肯定spark.default.parallelism

 

③ 數據序列化

能夠經過改變序列化格式來減小數據序列化的開銷。在流式傳輸的狀況下,有兩種類型的數據會被序列化:

l 輸入數據

由流操做生成的持久RDD

在上述兩種狀況下,使用Kryo序列化格式能夠減小CPU和內存開銷。

 

 

 

 

 

 

 

2、設置正確的批容量

爲了Spark Streaming應用程序可以在集羣中穩定運行,系統應該可以以足夠的速度處理接收的數據(即處理速度應該大於或等於接收數據的速度)。這能夠經過流的網絡UI觀察獲得。批處理時間應該小於批間隔時間。

 

根據流計算的性質,批間隔時間可能顯著的影響數據處理速率,這個速率能夠經過應用程序維持。能夠考慮WordCountNetwork這個例子,對於一個特定的數據處理速率,系統可能能夠每2秒打印一次單詞計數(批間隔時間爲2秒),但沒法每500毫秒打印一次單詞計數。因此,爲了在生產環境中維持指望的數據處理速率,就應該設置合適的批間隔時間(即批數據的容量)

 

找出正確的批容量的一個好的辦法是用一個保守的批間隔時間(5-10,秒)和低數據速率來測試你的應用程序。

3、內存調優

在這一節,咱們重點介紹幾個強烈推薦的自定義選項,它們能夠減小Spark Streaming應用程序垃圾回收的相關暫停,得到更穩定的批處理時間。

Default persistence level of DStreamsRDDs不一樣的是,默認的持久化級別是序列化數據到內存中(DStreamStorageLevel.MEMORY_ONLY_SERRDDStorageLevel.MEMORY_ONLY)。即便保存數據爲序列化形態會增長序列化/反序列化的開銷,可是能夠明顯的減小垃圾回收的暫停。

 

Clearing persistent RDDs默認狀況下,經過Spark內置策略(LUR),Spark Streaming生成的持久化RDD將會從內存中清理掉。若是spark.cleaner.ttl已經設置了,比這個時間存在更老的持久化RDD將會被定時的清理掉。正如前面提到的那樣,這個值須要根據Spark Streaming應用程序的操做當心設置。然而,能夠設置配置選項spark.streaming.unpersisttrue來更智能的去持久化(unpersistRDD。這個配置使系統找出那些不須要常常保有的RDD,而後去持久化它們。這能夠減小Spark RDD的內存使用,也可能改善垃圾回收的行爲。

 

Concurrent garbage collector使用併發的標記-清除垃圾回收能夠進一步減小垃圾回收的暫停時間。儘管併發的垃圾回收會減小系統的總體吞吐量,可是仍然推薦使用它以得到更穩定的批處理時間。

相關文章
相關標籤/搜索