Spark課堂筆記html
Spark生態圈:
Spark Core : RDD(彈性分佈式數據集)
Spark SQL
Spark Streamingjava
Spark MLLib:協同過濾,ALS,邏輯迴歸等等 --> 機器學習
Spark Graphx : 圖計算node
重點在前三章mysql
-----------------Spark Core------------------------
1、什麼是Spark?特色?
https://spark.apache.org/
Apache Spark™ is a unified analytics engine for large-scale data processing.
特色:快、易用、通用性、兼容性(徹底兼容Hadoop)
快:快100倍(Hadoop 3 以前)
易用:支持多種語言開發
通用性:生態系統全。
易用性:兼容Hadoop
spark 取代 Hadoopes6
2、安裝和部署Spark、Spark 的 HA算法
一、spark體系結構
Spark的運行方式
Yarn
Standalone:本機調試(demo)
Worker:從節點。每一個服務器上,資源和任務的管理者。只負責管理一個節點。
執行過程:
一個Worker 有多個 Executor。 Executor是任務的執行者,按階段(stage)劃分任務。————> RDD
客戶端:Driver Program 提交任務到集羣中。
一、spark-submit
二、spark-shell sql
二、spark的搭建
(1)準備工做:JDK 配置主機名 免密碼登陸
(2)僞分佈式模式
在一臺虛擬機上模擬分佈式環境(Master和Worker在一個節點上)
export JAVA_HOME=/usr/java/jdk1.8.0_201
export SPARK_MASTER_HOST=node3
export SPARK_MASTER_PORT=7077
(3)全分佈式環境
修改slave文件 拷貝到其餘兩臺服務器 啓動
三、Spark的 HA
回顧HA;
(*)HDFS Yarn Hbase Spark 主從結構
(*)單點故障
(1)基於文件目錄的單點恢復
(*)本質:仍是隻有一個主節點Master,建立了一個恢復目錄,保存集羣狀態和任務的信息。
當Master掛掉,從新啓動時,會從恢復目錄下讀取狀態信息,恢復出來原來的狀態
用途:用於開發和測試,生產用zookeeper
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM
-Dspark.deploy.recoveryDirectory=/usr/local/spark-2.1.0-bin-hadoop2.7/recovery"
(2)基於Zookeeper :和Hadoop相似
(*)複習一下zookeeper:
至關於一個數據庫,把一些信息存放在zookeeper中,好比集羣的信息。
數據同步功能,選舉功能,分佈式鎖功能
數據同步:給一個節點中寫入數據,能夠同步到其餘節點
選舉:Zookeeper中存在不一樣的角色,Leader Follower。若是Leader掛掉,從新選舉Leader
分佈式鎖:秒殺。以目錄節點的方式來保存數據。
修改 spark-env.sh
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=node3:2181,node4:2181,node5:2181
-Dspark.deploy.zookeeper.dir=/spark"
同步到其餘兩臺服務器。
在node3 start-all node3 master node4 Worker node5 Worker
在node4 start-master node3 master node4 master(standby) node4 Worker node5 Worker
在node3上kill master
node4 master(Active) node4 Worker node5 Worker
在網頁http://192.168.109.134:8080/ 能夠看到相應信息
3、執行Spark的任務:兩個工具
一、spark-submit:用於提交Spark的任務
任務:jar。
舉例:蒙特卡洛求PI(圓周率)。
./spark-submit --master spark://node3:7077 --class
--class指明主程序的名字
/usr/local/spark-2.1.0-bin-hadoop2.7/bin/spark-submit --master spark://node3:7077
--class org.apache.spark.examples.SparkPi
/usr/local/spark-2.1.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.1.0.jar 100
二、spark-shell 至關於REPL
做爲一個獨立的Application運行
兩種模式:
(1)本地模式
spark-shell 後面不接任何參數,表明本地模式
Spark context available as 'sc' (master = local[*], app id = local-1554038459298).
sc 是 SparkContext 對象名。 local[*] 表明本地模式,不提交到集羣中運行。
(2)集羣模式
./spark-submit --master spark://node3:7077 提交到集羣中運行
Spark context available as 'sc' (master = spark://node3:7077, app id = app-20190331212447-0000).
master = spark://node3:7077
Spark session available as 'spark'
Spark Session 是 2.0 之後提供的,利用 SparkSession 能夠訪問spark全部組件。
示例:WordCount程序
(*)處理本地文件,把結果打印到屏幕上
scala> sc.textFile("/usr/local/tmp_files/test_WordCount.txt")
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.collect
res0: Array[(String, Int)] = Array((is,1), (love,2), (capital,1), (Beijing,2), (China,2), (I,2), (of,1), (the,1))
(*)處理HDFS文件,結果保存在hdfs上
sc.textFile("hdfs://node1:8020/tmp_files/test_WordCount.txt")
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.saveAsTextFile("hdfs://node1:8020/output/0331/test_WordCount")
-rw-r--r-- 3 root supergroup 0 2019-03-31 21:43 /output/0331/test_WordCount/_SUCCESS
-rw-r--r-- 3 root supergroup 40 2019-03-31 21:43 /output/0331/test_WordCount/part-00000
-rw-r--r-- 3 root supergroup 31 2019-03-31 21:43 /output/0331/test_WordCount/part-00001
_SUCCESS 表明程序執行成功
part-00000 part-00001 結果文件,分區。裏面內容不重複。
(*)單步運行WordCount ----> RDD
scala> val rdd1 = sc.textFile("/usr/local/tmp_files/test_WordCount.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /usr/local/tmp_files/test_WordCount.txt MapPartitionsRDD[12] at textFile at <console>:24shell
scala> 1+1
res2: Int = 2數據庫
scala> rdd1.collect
res3: Array[String] = Array(I love Beijing, I love China, Beijing is the capital of China)apache
scala> val rdd2 = rdd1.flatMap(_.split(" "))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at flatMap at <console>:26
scala> rdd2.collect
res4: Array[String] = Array(I, love, Beijing, I, love, China, Beijing, is, the, capital, of, China)
scala> val rdd3 = rdd2.map((_,1))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[14] at map at <console>:28
scala> rdd3.collect
res5: Array[(String, Int)] = Array((I,1), (love,1), (Beijing,1), (I,1), (love,1), (China,1), (Beijing,1), (is,1), (the,1), (capital,1), (of,1), (China,1))
scala> val rdd4 = rdd3.reduceByKey(_+_)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[15] at reduceByKey at <console>:30
scala> rdd4.collect
res6: Array[(String, Int)] = Array((is,1), (love,2), (capital,1), (Beijing,2), (China,2), (I,2), (of,1), (the,1))
RDD 彈性分佈式數據集
(1)依賴關係 : 寬依賴和窄依賴
(2)算子:
函數:
Transformation : 延時計算 map flatMap textFile
Action : 當即觸發計算 collect
說明:scala複習
(*)flatten:把嵌套的結果展開
scala> List(List(2,4,6,8,10),List(1,3,5,7,9)).flatten
res21: List[Int] = List(2, 4, 6, 8, 10, 1, 3, 5, 7, 9)
(*)flatmap : 至關於一個 map + flatten
scala> var myList = List(List(2,4,6,8,10),List(1,3,5,7,9))
myList: List[List[Int]] = List(List(2, 4, 6, 8, 10), List(1, 3, 5, 7, 9))
scala> myList.flatMap(x=>x.map(_*2))
res22: List[Int] = List(4, 8, 12, 16, 20, 2, 6, 10, 14, 18)
myList.flatMap(x=>x.map(_*2))
執行過程:
一、將 List(2, 4, 6, 8, 10), List(1, 3, 5, 7, 9) 調用 map(_*2) 方法。x 表明一個List
二、flatten
三、在IDE中開發scala版本和Java版本的WorkCount。
(1)scala版本的WordCount
新建一個工程,把jar引入到工程中。
export jar 點擊下一步下一步,不須要設置main class
把jar上傳到服務器上。
spark-submit --master spark://node3:7077
--class day1025.MyWordCount
/usr/local/tmp_files/Demo1.jar
hdfs://node2:8020/tmp_files/test_WordCount.txt
hdfs://node2:8020/output/1025/demo1
(2)java版本的WordCount
./spark-submit --master spark://node3:7077 --class day0330.JavaWordCount /usr/local/tmp_files/Demo2.jar
4、分析Spark的任務流程
一、分析WordCount程序處理過程
見圖片
二、Spark調度任務的過程
提交到及羣衆運行任務時,spark執行任務調度。
見圖片
5、RDD和RDD特性、RDD的算子
一、RDD:彈性分佈式數據集
(*)Spark中最基本的數據抽象。
(*)RDD的特性
* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions
一、是一組分區。
RDD由分區組成,每一個分區運行在不一樣的Worker上,經過這種方式來實現分佈式計算。
* - A function for computing each split
在RDD中,提供算子處理每一個分區中的數據
* - A list of dependencies on other RDDs
RDD存在依賴關係:寬依賴和窄依賴。
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
能夠自定義分區規則來建立RDD
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)
優先選擇離文件位置近的節點來執行
如何建立RDD?
(1)經過SparkContext.parallelize方法來建立
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at parallelize at <console>:29
scala> rdd1.partitions.length
res35: Int = 3
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[33] at parallelize at <console>:29
scala> rdd1.partitions.length
res36: Int = 2
(2)經過外部數據源來建立
sc.textFile()
scala> val rdd2 = sc.textFile("/usr/local/tmp_files/test_WordCount.txt")
rdd2: org.apache.spark.rdd.RDD[String] = /usr/local/tmp_files/test_WordCount.txt MapPartitionsRDD[35] at textFile at <console>:29
二、 算子
(1)Transformation
map(func):至關於for循環,返回一個新的RDD
filter(func):過濾
flatMap(func):flat+map 壓平
mapPartitions(func):對RDD中的每一個分區進行操做
mapPartitionsWithIndex(func):對RDD中的每一個分區進行操做,能夠取到分區號。
sample(withReplacement, fraction, seed):採樣
集合運算
union(otherDataset)
intersection(otherDataset)
distinct([numTasks])):去重
聚合操做:group by
groupByKey([numTasks])
reduceByKey(func, [numTasks])
aggregateByKey(zeroValue)(seqOp,combOp,[numTasks])
排序
sortByKey([ascending], [numTasks])
sortBy(func,[ascending], [numTasks])
join(otherDataset, [numTasks])
cogroup(otherDataset, [numTasks])
cartesian(otherDataset)
pipe(command, [envVars])
coalesce(numPartitions)
重分區:
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)
舉例:
一、建立一個RDD,每一個元素乘以2,再排序
scala> val rdd1 = sc.parallelize(Array(3,4,5,100,79,81,6,8))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at parallelize at <console>:29
scala> val rdd2 = rdd1.map(_*2)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[37] at map at <console>:31
scala> rdd2.collect
res37: Array[Int] = Array(6, 8, 10, 200, 158, 162, 12, 16)
scala> rdd2.sortBy(x=>x,true).collect
res39: Array[Int] = Array(6, 8, 10, 12, 16, 158, 162, 200)
scala> rdd2.sortBy(x=>x,false).collect
res40: Array[Int] = Array(200, 162, 158, 16, 12, 10, 8, 6)
def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true)
過濾出大於20的元素:
scala> val rdd3 = rdd2.filter(_>20)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[53] at filter at <console>:33
scala> rdd3.collect
res41: Array[Int] = Array(200, 158, 162)
二、字符串(字符)類型的RDD
scala> val rdd4 = sc.parallelize(Array("a b c","d e f","g h i"))
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[54] at parallelize at <console>:29
scala> rdd4.flatMap(_.split(" ")).collect
res42: Array[String] = Array(a, b, c, d, e, f, g, h, i)
三、RDD的集合運算:
scala> val rdd6 = sc.parallelize(List(1,2,3,6,7,8,9,100))
rdd6: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[56] at parallelize at <console>:29
scala> val rdd7 = sc.parallelize(List(1,2,3,4))
rdd7: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[57] at parallelize at <console>:29
scala> val rdd8 = rdd6.union(rdd7)
rdd8: org.apache.spark.rdd.RDD[Int] = UnionRDD[58] at union at <console>:33
scala> rdd8.collect
res43: Array[Int] = Array(1, 2, 3, 6, 7, 8, 9, 100, 1, 2, 3, 4)
scala> rdd8.distinct.collect
res44: Array[Int] = Array(100, 4, 8, 1, 9, 6, 2, 3, 7)
四、分組操做:reduceByKey
<key value>
scala> val rdd1 = sc.parallelize(List(("Tom",1000),("Andy",2000),("Lily",1500)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[62] at parallelize at <console>:29
scala> val rdd2 = sc.parallelize(List(("Andy",1000),("Tom",2000),("Mike",500)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[63] at parallelize at <console>:29
scala> val rdd3 = rdd1 union rdd2
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[64] at union at <console>:33
scala> rdd3.collect
res45: Array[(String, Int)] = Array((Tom,1000), (Andy,2000), (Lily,1500), (Andy,1000), (Tom,2000), (Mike,500))
scala> val rdd4= rdd3.groupByKey
rdd4: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[65] at groupByKey at <console>:35
scala> rdd4.collect
res46: Array[(String, Iterable[Int])] = Array(
(Tom,CompactBuffer(1000, 2000)),
(Andy,CompactBuffer(2000, 1000)),
(Mike,CompactBuffer(500)), (
Lily,CompactBuffer(1500)))
scala> rdd3.reduceByKey(_+_).collect
res47: Array[(String, Int)] = Array((Tom,3000), (Andy,3000), (Mike,500), (Lily,1500))
reduceByKey will provide much better performance.
官方不推薦使用 groupByKey 推薦使用 reduceByKey
五、cogroup
scala> val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[67] at parallelize at <console>:29
scala> val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[68] at parallelize at <console>:29
scala> val rdd3 = rdd1.cogroup(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[70] at cogroup at <console>:33
scala> rdd3.collect
res48: Array[(String, (Iterable[Int], Iterable[Int]))] = Array(
(tom,(CompactBuffer(1, 2),CompactBuffer(1))),
(jerry,(CompactBuffer(3),CompactBuffer(2))),
(shuke,(CompactBuffer(),CompactBuffer(2))),
(kitty,(CompactBuffer(2),CompactBuffer())))
六、reduce操做(Action)
聚合操做
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[71] at parallelize at <console>:29
scala> rdd1.reduce(_+_)
res49: Int = 15
七、需求:按照value排序。
作法:
一、交換,把key 和 value交換,而後調用sortByKey方法
二、再次交換
scala> val rdd1 = sc.parallelize(List(("tom",1),("jerry",3),("ketty",2),("shuke",2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[72] at parallelize at <console>:29
scala> val rdd2 = sc.parallelize(List(("jerry",1),("tom",3),("shuke",5),("ketty",1)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[73] at parallelize at <console>:29
scala> val rdd3 = rdd1.union(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[74] at union at <console>:33
scala> val rdd4 = rdd3.reduceByKey(_+_)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[75] at reduceByKey at <console>:35
scala> rdd4.collect
res50: Array[(String, Int)] = Array((tom,4), (jerry,4), (shuke,7), (ketty,3))
scala> val rdd5 = rdd4.map(t=>(t._2,t._1)).sortByKey(false).map(t=>(t._2,t._1))
rdd5: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[80] at map at <console>:37
scala> rdd5.collect
res51: Array[(String, Int)] = Array((shuke,7), (tom,4), (jerry,4), (ketty,3))
(2)Action
reduce(func)
collect()
count()
first()
take(n)
takeSample(withReplacement,num, [seed])
takeOrdered(n, [ordering])
saveAsTextFile(path)
saveAsSequenceFile(path)
saveAsObjectFile(path)
countByKey()
foreach(func):與map相似,沒有返回值。
三、特性:
(1)RDD的緩存機制
(*)做用:提升性能
(*)使用:標識RDD能夠被緩存 persist cache
(*)能夠緩存的位置:
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def cache(): this.type = persist()
舉例:測試數據,92萬條
scala> val rdd1 = sc.textFile("hdfs://192.168.109.131:8020/tmp_files/test_Cache.txt")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://192.168.109.131:8020/tmp_files/test_Cache.txt MapPartitionsRDD[82] at textFile at <console>:29
scala> rdd1.count --> 直接出發計算
res52: Long = 923452
scala> rdd1.cache --> 標識RDD能夠被緩存,不會觸發計算
res53: rdd1.type = hdfs://192.168.109.131:8020/tmp_files/test_Cache.txt MapPartitionsRDD[82] at textFile at <console>:29
scala> rdd1.count --> 和第一步同樣,觸發計算,可是,把結果進行緩存
res54: Long = 923452
scala> rdd1.count --> 從緩存中直接讀出結果
res55: Long = 923452
(2)RDD的容錯機制:經過檢查點來實現。
/**
* Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
* directory set with `SparkContext#setCheckpointDir` and all references to its parent
* RDDs will be removed. This function must be called before any job has been
* executed on this RDD. It is strongly recommended that this RDD is persisted in
* memory, otherwise saving it on a file will require recomputation.
*/
(*)複習檢查點:HDFS中的檢查點:有SecondaryNamenode來實現日誌的合併。
(*)RDD的檢查點:容錯
概念:血統 Lineage
理解:表示任務執行的生命週期。
WordCount textFile ---> redceByKey
若是血統越長,越容易出錯。
假若有檢查點,能夠從最近的一個檢查點開始,日後面計算。不用重頭計算。
(*)RDD檢查點的類型:
(1)基於本地目錄:須要將Spark shell 或者任務運行在本地模式上(setMaster("local"))
開發和測試
(2)HDFS目錄:用於生產。
sc.setCheckPointDir(目錄)
舉例:設置檢查點
scala> var rdd1 = sc.textFile("hdfs://192.168.109.131:8020/tmp_files/test_Cache.txt")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://192.168.109.131:8020/tmp_files/test_Cache.txt MapPartitionsRDD[1] at textFile at <console>:24
設置檢查點目錄:
scala> sc.setCheckpointDir("hdfs://192.168.109.131:8020/sparkckpt")
標識rdd1能夠執行檢查點操做
scala> rdd1.checkpoint
scala> rdd1.count
res2: Long = 923452
(3)依賴關係:寬依賴,窄依賴。
劃分任務執行的stage
見講義。
6、RDD的高級算子
一、mapPartitionsWithIndex:對RDD中的每一個分區(帶有下標)進行操做,下標用index表示
經過這個算子,咱們能夠獲取分區號。
def mapPartitionsWithIndex[U](
f: (Int, Iterator[T]) ⇒ Iterator[U],
preservesPartitioning: Boolean = false)(
implicit arg0: ClassTag[U]): RDD[U]
參數:f是個函數參數 f 中第一個參數是Int,表明分區號,第二個Iterator[T]表明分區中的元素
舉例:把分區中的元素,包括分區號,都打印出來。
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
scala> def fun1(index:Int, iter:Iterator[Int]) : Iterator[String] = {
| iter.toList.map(x => "[partId : " + index + " , value = " + x + " ]").iterator
| }
fun1: (index: Int, iter: Iterator[Int])Iterator[String]
scala> rdd1.mapPartitions
mapPartitions mapPartitionsWithIndex
scala> rdd1.mapPartitionsWithIndex(fun1).collect
res3: Array[String] = Array(
[partId : 0 , value = 1 ], [partId : 0 , value = 2 ],
[partId : 1 , value = 3 ], [partId : 1 , value = 4 ], [partId : 1 , value = 5 ],
[partId : 2 , value = 6 ], [partId : 2 , value = 7 ], [partId : 2 , value = 8 ])
二、aggregate:聚合操做。相似於分組。
(*)先對局部進行聚合操做,再對全局進行聚合操做。
調用聚合操做
scala> val rdd2 = sc.parallelize(List(1,2,3,4,5),2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> rdd2.mapPartitionsWithIndex(fun1).collect
res4: Array[String] = Array(
[partId : 0 , value = 1 ], [partId : 0 , value = 2 ],
[partId : 1 , value = 3 ], [partId : 1 , value = 4 ], [partId : 1 , value = 5 ])
scala> import scala.math._
import scala.math._
scala> rdd2.aggregate(0)(max(_,_),_+_)
res6: Int = 7
說明:aggregate
(0) 初始值是 0
(max(_,_) 局部操做的函數
, _+_ 全局操做的函數
)
scala> rdd2.aggregate(100)(max(_,_),_+_)
res8: Int = 300
分析結果:初始值是100,表明每一個分區多了一個100
全局操做,也多了一個100
100+100+100 = 300
對RDD中的元素進行求和
一、RDD.map
二、聚合操做
scala> rdd2.aggregate(0)(_+_,_+_)
res9: Int = 15
MapReduce Combiner
scala> rdd2.aggregate(10)(_+_,_+_)
res10: Int = 45
(*)對字符串操做
scala> val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:27
scala> rdd2.aggregate("")(_+_,_+_)
res11: String = abcdef
scala> rdd2.aggregate("*")(_+_,_+_)
res12: String = **def*abc
結果分析:
一、*abc *def
二、**def*abc
(*)複雜的例子:
一、
scala> val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at <console>:27
scala> def fun1(index:Int, iter:Iterator[String]) : Iterator[String] = {
| iter.toList.map(x => "[partId : " + index + " , value = " + x + " ]").iterator
| }
scala> rdd3.mapPartitionsWithIndex(fun1).collect
res17: Array[String] = Array(
[partId : 0 , value = 12 ], [partId : 0 , value = 23 ],
[partId : 1 , value = 345 ], [partId : 1 , value = 4567 ])
scala> rdd3.aggregate("")((x,y)=> math.max(x.length,y.length).toString,(x,y)=>x+y)
res13: String = 42
執行過程:
第一個分區:
第一次比較: "" "12" 長度最大值 2 2-->"2"
第二次比較: 「2」 「23」 長度最大值 2 2-->"2"
第二個分區:
第一次比較: "" "345" 長度最大值 3 3-->"3"
第二次比較: 「3」 「4567」 長度最大值 4 4-->"4"
二、
rdd3.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
scala> rdd3.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
res18: String = 11
執行過程:
第一個分區:
第一次比較: "" "12" 長度最小值 0 0-->"0"
第二次比較: 「0」 「23」 長度最小值 1 1-->"1"
第二個分區:
第一次比較: "" "345" 長度最小值 0 0-->"0"
第二次比較: 「0」 「4567」 長度最小值 1 1-->"1"
val rdd3 = sc.parallelize(List("12","23","345",""),2)
rdd3.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
scala> val rdd3 = sc.parallelize(List("12","23","345",""),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:27
scala> rdd3.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
res19: String = 10
scala> rdd3.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
res20: String = 01
三、aggregateByKey:相似於aggregate,區別:操做的是 key value 的數據類型。
scala> def fun3(index:Int, iter:Iterator[(String,Int)]) : Iterator[String] = {
| iter.toList.map(x => "[partId : " + index + " , value = " + x + " ]").iterator
| }
scala> pairRDD.mapPartitionsWithIndex(fun3).collect
res22: Array[String] = Array(
[partId : 0 , value = (cat,2) ], [partId : 0 , value = (cat,5) ], [partId : 0 , value = (mouse,4) ],
[partId : 1 , value = (cat,12) ], [partId : 1 , value = (dog,12) ], [partId : 1 , value = (mouse,2) ])
(1)將每一個動物園(分區)中,動物數最多的動物,進行求和
動物園0
[partId : 0 , value = (cat,2) ], [partId : 0 , value = (cat,5) ], [partId : 0 , value = (mouse,4) ],
動物園1
[partId : 1 , value = (cat,12) ], [partId : 1 , value = (dog,12) ], [partId : 1 , value = (mouse,2) ])
pairRDD.aggregateByKey(0)(math.max(_,_),_+_)
scala> pairRDD.aggregateByKey(0)(math.max(_,_),_+_).collect
res24: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
(2)將全部動物求和
pairRDD.aggregateByKey(0)(_+_,_+_).collect
scala> pairRDD.reduceByKey(_+_).collect
res27: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
aggregateByKey效率更高。
四、 coalesce與repartition
與分區有關
都是對RDD進行重分區。
區別:
coalesce 默認不會進行Shuffle 默認 false 如需修改分區,需置爲true
repartition 會進行Shuffle
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:27
scala> val rdd2 = rdd1.repartition(3)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[21] at repartition at <console>:29
scala> rdd2.partitions.length
res28: Int = 3
scala> val rdd3 = rdd1.coalescse(3,true)
<console>:29: error: value coalescse is not a member of org.apache.spark.rdd.RDD[Int]
val rdd3 = rdd1.coalescse(3,true)
^
scala> val rdd3 = rdd1.coalesce(3,true)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[25] at coalesce at <console>:29
scala> rdd3.partitions.length
res29: Int = 3
scala> val rdd4 = rdd1.coalesce(4)
rdd4: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[26] at coalesce at <console>:29
scala> rdd4.partitions.length
res30: Int = 2
五、其餘高級算子
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html
7、編程案例
(1)分析日誌
需求:找到訪問量最高的兩個網頁
(*)第一步:對網頁的訪問量求和
(*)第二步:排序,降序
(2)建立自定義分區
(3)使用JDBCRDD 操做數據庫
(4)操做數據庫:把結果存放到數據庫中
IDE
-----------------Spark SQL------------------------
相似於Hive
1、Spark SQL 基礎
一、什麼是Spark SQL
Spark SQL is Apache Spark's module for working with structured data.
Spark SQL 是spark 的一個模塊。來處理 結構化 的數據
不能處理非結構化的數據
特色:
一、容易集成
不須要單獨安裝。
二、統一的數據訪問方式
結構化數據的類型:JDBC JSon Hive parquer文件 均可以做爲Spark SQL 的數據源
對接多種數據源,且使用方式相似
三、徹底兼容hive
把Hive中的數據,讀取到Spark SQL中運行。
四、支持標準的數據鏈接
JDBC
二、爲何學習Spark SQL
執行效率比Hive高
hive 2.x 執行引擎可使用 Spark
三、核心概念:表(DataFrame DataSet)
mysql中的表:表結構、數據
DataFrame:Schema、RDD(數據)
DataSet 在spark1.6之後,對DataFrame作了一個封裝。
四、建立DataFrame
(*)測試數據:員工表、部門表
第一種方式:使用case class
一、定義Schema
樣本類來定義Schema。
case class 特色:
能夠支持模式匹配,使用case class創建表結構
7521, WARD, SALESMAN,7698, 1981/2/22, 1250, 500, 30
case class Emp(empno:Int,ename:String,job:String,mgr:Int,hiredate:String,sal:Int,comm:Int,deptno:Int)
二、讀取文件
val lines = sc.textFile("/usr/local/tmp_files/emp.csv").map(_.split(","))
三、把每行數據,映射到Emp上。
val allEmp = lines.map(x => Emp(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt))
四、生成DataFrame
val df1 = allEmp.toDF
df1.show
第二種方式 使用Spark Session
(1)什麼是Spark Session
Spark session available as 'spark'.
2.0之後引入的統一訪問方式。能夠訪問全部的Spark組件。
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
(2)使用StructType來建立Schema
val struct =
StructType(
StructField("a", IntegerType, true) ::
StructField("b", LongType, false) ::
StructField("c", BooleanType, false) :: Nil)
case class Emp(empno:Int,
ename:String,
job:String,
mgr:Int,
hiredate:String,
sal:Int,
comm:Int,
deptno:Int)
val myschema = StructType(
List(
StructField("empno",DataTypes.IntegerType),
StructField("ename",DataTypes.StringType),
StructField("job",DataTypes.StringType),
StructField("mgr",DataTypes.IntegerType),
StructField("hiredate",DataTypes.StringType),
StructField("sal",DataTypes.IntegerType),
StructField("comm",DataTypes.IntegerType),
StructField("deptno",DataTypes.IntegerType),
))
import org.apache.spark.sql.types._
準備數據 RDD[Row]
val allEmp = lines.map(x => Row(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt))
import org.apache.spark.sql.Row
val df2 = spark.createDataFrame(allEmp,myschema)
第三種方式
直接讀取一個帶格式的文件。
val df3 = spark.read 讀文件,默認是Parquet文件
val df3 = spark.read.json("/usr/local/tmp_files/people.json")
df3.show
val df4 = spark.read.format("json").load("/usr/local/tmp_files/people.json")
五、操做DataFrame
(1)DSL語句
mybatis Hibernate
df1.select($"ename",$"sal",$"sal"+100).show
$"sal" 能夠看作是一個變量。
查詢薪水大於2000的員工
df1.filter($"sal" > 2000).show
求每一個部門的員工人數
df1.groupBy($"deptno").count.show
select deptno,count(1) from emp group by deptno
(2)SQL語句
注意:不能直接執行SQL,須要生成一個視圖,再執行sql。
scala> df1.create
createGlobalTempView createOrReplaceTempView createTempView
通常用到 createOrReplaceTempView createTempView
視圖:相似於表,但不保存數據。
df1.createOrReplaceTempView("emp")
操做:
spark.sql("select * from emp").show
查詢薪水大於2000的員工
spark.sql("select * from emp where sal > 2000").show
求每一個部門的員工人數
spark.sql("select deptno,count(1) from emp group by deptno").show
(3)多表查詢
10,ACCOUNTING,NEW YORK
case class Dept(deptno:Int,dname:String,loc:String)
val lines = sc.textFile("/usr/local/tmp_files/dept.csv").map(_.split(","))
val allDept = lines.map(x=>Dept(x(0).toInt,x(1),x(2)))
df5.createOrReplaceTempView("dept")
spark.sql("select dname,ename from emp,dept where emp.deptno=dept.deptno").show
六、操做DataSet
跟DataFrame相似,是一套新的接口。高級的Dataframe
舉例:
(1)建立DataSet
一、使用序列來建立DataSet。
定義一個case class
case class MyData(a:Int,b:String)
生成序列,並建立DataSet
val ds = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS
.toDS 生成DataSet
ds.show
二、使用JSON數據來建立DataSet
定義case class
case class Person(name:String,age:BigInt)
經過Json數據來生成DataFrame
val df = spark.read.format("json").load("/usr/local/tmp_files/people.json")
將DataFrame轉換成DataSet
df.as[Person].show
df.as[Person] 就是一個DataSet
三、使用其餘數據
RDD操做和DataFrame操做相結合 ---> DataSet
讀取數據,建立DataSet
val linesDS = spark.read.text("/usr/local/tmp_files/test_WordCount.txt").as[String]
對DataSet進行操做:
val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3)
words.show
words.collect
執行一個WordCount程序
val result = linesDS.flatMap(_.split(" ")).map((_,1)).groupByKey( x => x._1).count
result.show
排序:
result.orderBy($"value").show
result.orderBy($"count(1)").show
(2)DataSet操做案例
使用emp.json 生成一個DataFrame
val empDF = spark.read.json("/usr/local/tmp_files/emp.json")
查詢工資大於3000的員工
empDF.where($"sal" >= 3000).show
建立case class
case class Emp(empno:BigInt,ename:String,job:String,mgr:String,hiredate:String,sal:BigInt,comm:String,deptno:BigInt)
生成DataSet
val empDS = empDF.as[Emp]
查詢工資大於3000的員工
empDS.filter(_.sal > 3000).show
查詢10號部門的員工
empDS.filter(_.deptno == 10).show
(3)多表查詢
一、建立部門表
val deptRDD = sc.textFile("/usr/local/tmp_files/dept.csv").map(_.split(","))
case class Dept(deptno:Int,dname:String,loc:String)
val deptDS = deptRDD.map( x=> Dept(x(0).toInt,x(1),x(2))).toDS
二、建立員工表
case class Emp(empno:Int,ename:String,job:String,mgr:Int,hiredate:String,sal:Int,comm:Int,deptno:Int)
val empRDD = sc.textFile("/usr/local/tmp_files/emp.csv").map(_.split(","))
7369,SMITH,CLERK,7902,1980/12/17,800,0,20
val empDS = empRDD.map(x=> Emp(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt)).toDS
三、執行多表查詢:等值鏈接
val result = deptDS.join(empDS,"deptno")
val result1 = deptDS.joinWith(empDS, deptDS("deptno") === empDS("deptno") )
join 和 joinWith 區別:鏈接後schema不一樣
四、多表鏈接後再篩選
deptDS.join(empDS,"deptno").where("deptno == 10").show
七、Spark SQL 中的視圖
視圖是一個虛表,不存儲數據。
兩種類型:
一、普通視圖(本地視圖):只在當前Session中有效。createOrReplaceTempView createTempView
二、全局視圖: createGlobalTempView
在不一樣的Session中都有用 把全局視圖建立在命名空間中:global_temp中。相似於一個庫。
scala> df1.create
createGlobalTempView createOrReplaceTempView createTempView
舉例:
建立一個新session,讀取不到emp視圖
spark.newSession.sql("select * from emp")
如下兩種方式都可讀到 全局視圖 中的數據。
df1.createGlobalTempView("emp1")
spark.newSession.sql("select * from global_temp.emp1").show
spark.sql("select * from global_temp.emp1").show
2、使用數據源
在Spark SQL中,可使用各類各樣的數據源來操做。 結構化
一、使用load函數、save函數
load函數是加載數據,save是存儲數據。
注意:使用load 或 save時,默認是Parquet文件。列式存儲文件。
舉例:
讀取 users.parquet 文件
val userDF = spark.read.load("/usr/local/tmp_files/users.parquet")
userDF.printSchema
userDF.show
val userDF = spark.read.load("/usr/local/tmp_files/emp.json")
保存parquet文件
userDF.select($"name",$"favorite_color").write.save("/usr/local/tmp_files/parquet")
讀取剛剛寫入的文件:
val userDF1 = spark.read.load("/usr/local/tmp_files/parquet/part-00000-1ab4e661-32c6-441a-b320-79d")---> 不推薦
生產:
val userDF2 = spark.read.load("/usr/local/tmp_files/parquet")
讀json文件 必須format
val userDF = spark.read.format("json").load("/usr/local/tmp_files/emp.json")
val userDF3 = spark.read.json("/usr/local/tmp_files/emp.json")
關於save函數:
調用save函數的時候,能夠指定存儲模式,追加、覆蓋等等
userDF2.write.save("/usr/local/tmp_files/parquet")
userDF2.write.save("/usr/local/tmp_files/parquet")
org.apache.spark.sql.AnalysisException: path file:/usr/local/tmp_files/parquet already exists.;
save的時候覆蓋
userDF2.write.mode("overwrite").save("/usr/local/tmp_files/parquet")
將結果保存成表
userDF2.select($"name").write.saveAsTable("table1")
scala> userDF.select($"name").write.saveAsTable("table2")
scala> spark.sql("select * from table2").show
+------+
| name|
+------+
|Alyssa|
| Ben|
+------+
二、Parquet文件:列式存儲文件,是Spark SQL 默認的數據源
就是一個普通的文件
舉例:
一、把其餘文件,轉換成Parquet文件
調用save函數
把數據讀進來,再寫出去,就是Parquet文件。
val empDF = spark.read.json("/usr/local/tmp_files/emp.json")
empDF.write.mode("overwrite").save("/usr/local/tmp_files/parquet")
empDF.write.mode("overwrite").parquet("/usr/local/tmp_files/parquet")
val emp1 = spark.read.parquet("/usr/local/tmp_files/parquet")
emp1.createOrReplaceTempView("emp1")
spark.sql("select * from emp1")
二、支持Schema的合併
項目開始 表結構簡單 schema簡單
項目愈來愈大 schema愈來愈複雜
舉例:
經過RDD來建立DataFrame
val df1 = sc.makeRDD(1 to 5).map( i => (i,i*2)).toDF("single","double")
"single","double" 是表結構
df1.show
df1.write.mode("overwrite").save("/usr/local/tmp_files/test_table/key=1")
val df2 = sc.makeRDD(6 to 10).map( i => (i,i*3)).toDF("single","triple")
df2.show
df2.write.mode("overwrite").save("/usr/local/tmp_files/test_table/key=2")
合併兩個部分
val df3 = spark.read.parquet("/usr/local/tmp_files/test_table")
val df3 = spark.read.option("mergeSchema",true).parquet("/usr/local/tmp_files/test_table")
經過RDD來建立DataFrame
val df1 = sc.makeRDD(1 to 5).map( i => (i,i*2)).toDF("single","double")
"single","double" 是表結構
df1.show
df1.write.mode("overwrite").save("/usr/local/tmp_files/test_table/tzkt=1")
val df2 = sc.makeRDD(6 to 10).map( i => (i,i*3)).toDF("single","triple")
df2.show
df2.write.mode("overwrite").save("/usr/local/tmp_files/test_table/key=2")
合併兩個部分
val df3 = spark.read.parquet("/usr/local/tmp_files/test_table")
val df3 = spark.read.option("mergeSchema",true).parquet("/usr/local/tmp_files/test_table")
三、json文件
讀取Json文件,生成DataFrame
val peopleDF = spark.read.json("/usr/local/tmp_files/people.json")
peopleDF.printSchema
peopleDF.createOrReplaceTempView("peopleView")
spark.sql("select * from peopleView").show
Spark SQL 支持統一的訪問接口。對於不一樣的數據源,讀取進來,生成DataFrame後,操做徹底同樣。
四、JDBC
使用JDBC操做關係型數據庫,加載到Spark中進行分析和處理。
方式一:
val mysqlDF = spark.read.format("jdbc")
.option("url","jdbc:mysql://192.168.109.1:3306/company?serverTimezone=UTC&characterEncoding=utf-8")
.option("user","root")
.option("password","123456")
.option("driver","com.mysql.jdbc.Driver")
.option("dbtable","emp").load
mysqlDF.show
方式二:
定義一個Properties類
import java.util.Properties
val mysqlProps = new Properties()
mysqlProps.setProperty("user","root")
mysqlProps.setProperty("password","123456")
val mysqlDF1 = spark.read.jdbc("jdbc:mysql://192.168.109.1:3306/company?serverTimezone=UTC&characterEncoding=utf-8","emp",mysqlProps)
mysqlDF1.show
五、使用Hive
比較常見
(*)spark SQL 徹底兼容hive
(*)須要進行配置
拷貝一下文件到spark/conf目錄下:
Hive 配置文件: hive-site.xml
Hadoop 配置文件:core-site.xml hdfs-site.xml
配置好後,重啓spark
啓動Hadoop 與 hive
spark.sql("create table comany.emp_0410(empno Int,ename String,job String,mgr String,hiredate String,sal Int,comm String,deptno Int)row format delimited fields terminated by ','")
3、在IDE中開發Spark SQL
4、性能優化
與RDD相似
一、把內存中緩存表的數據
直接讀取內存的值,來提升性能。
RDD中如何緩存:
rdd.cache 或者 rdd.persist
在Spark SQL中,使用SparkSession.sqlContext.cacheTable
spark中全部context對象
一、sparkContext : SparkCore
二、sql Context : SparkSQL
三、Streaming Context :SparkStreaming
統一塊兒來:SparkSession
操做mysql,啓動spark shell 時,須要:
./bin/spark-shell --master spark://node3:7077 --jars /usr/local/tmp_files/mysql-connector-java-8.0.11.jar --driver-class-path /usr/local/tmp_files/mysql-connector-java-8.0.11.jar
val mysqlDF = spark.read.format("jdbc").option("driver","com.mysql.jdbc.Driver").option("url","jdbc:mysql://192.168.109.1:3306/company?serverTimezone=UTC&characterEncoding=utf-8").option("user","root").option("password","123456").option("dbtable","emp").load
mysqlDF.show
mysqlDF.createOrReplaceTempView("emp")
spark.sqlContext.cacheTable("emp") ----> 標識這張表能夠被緩存,數據尚未真正被緩存
spark.sql("select * from emp").show ----> 依然讀取mysql
spark.sql("select * from emp").show ----> 從緩存中讀取數據
spark.sqlContext.clearCache
清空緩存後,執行查詢,會觸發查詢mysql數據庫。
二、瞭解性能優化的相關參數:參考講義
-----------------Spark Streaming------------------------
流式計算框架,相似於Storm
經常使用的實時計算引擎(流式計算)
一、Apache Storm:真正的流式計算
二、Spark Streaming :嚴格上來講,不是真正的流式計算(實時計算)
把連續的流式數據,當成不連續的RDD
本質:是一個離散計算(不連續)
三、Apache Flink:真正的流式計算。與Spark Streaming相反。
把離散的數據,當成流式數據來處理
四、JStorm
1、Spark Streaming基礎
一、什麼是 Spark Streaming。
Spark Streaming makes it easy to build scalable fault-tolerant streaming applications.
易於構建靈活的、高容錯的流式系統。
特色:
一、易用,已經集成到Spark中
二、容錯性:底層RDD,RDD自己具備容錯機制
三、支持多種語言:Java Scala Python
二、演示官方的Demo
往Spark Streaming中發送字符串,Spark 接收到之後,進行計數
使用消息服務器 netcat Linux自帶
yum install nc.x86_64
nc -l 1234
注意:總核心數 大於等於2。一個核心用於接收數據,另外一個用於處理數據
在netcat中寫入數據 Spark Streaming能夠取到
三、開發本身的NetWorkWordCount程序
和Spark Core相似
問題:Hello Hello
Hello World
如今現象:(Hello,2)
(Hello , 1) (World , 1)
能不能累加起來?保存記錄下之前的狀態?
經過Spark Streaming提供的算子來實現
2、高級特性
一、什麼是DStream?離散流
把連續的數據變成不連續的RDD
由於DStream的特性,致使,Spark Streaming不是真正的流式計算
二、重點算子講解
(1)updateStateByKey
默認狀況下,Spark Streaming不記錄以前的狀態,每次發數據,都會從0開始
如今使用本算子,實現累加操做。
(2)transform
三、窗口操做
窗口:對落在窗口內的數據進行處理,也是一個DStream,RDD
舉例:每10秒鐘把過去30秒的數據採集過來
注意:先啓動nc 再啓動程序 local[2]
四、集成Spark SQL : 使用SQL語句來處理流式數據
五、緩存和持久化:和RDD同樣
六、支持檢查點:和RDD同樣
3、數據源
Spark Streaming是一個流式計算引擎,就須要從外部數據源來接收數據
一、基本的數據源
文件流:監控文件系統的變化,若是文件有增長,讀取文件中的內容
但願Spark Streaming監控一個文件夾,若是有變化,則把變化採集過來
RDD隊列流:能夠從隊列中獲取數據
套接字流:socketTextStream
二、高級數據源
(1)Flume
Spark SQL 對接flume有多種方式:
push方式:flume將數據推送給Spark Streaming
custom sink 模式:比第一種有更好的健壯性和容錯性。使用這種方式,flume配置一個sink。
使用官方提供的spark sink組件
須要把 spark-streaming-flume-sink_2.10-2.1.0.jar 拷貝到flume lib下
須要把 spark-streaming-flume-sink_2.10-2.1.0.jar 拷貝到IDE的lib下添加到build path中
(2)Kafka
在講Kafka時,舉例。
4、性能優化的參數
性能優化:
spark submit的時候,程序報OOM錯誤
程序跑的很慢
方法:調整spark參數
conf.set...
-----------------Spark 調優------------------------
問題:只要會用就能夠,爲何還要精通內核源碼與調優?
Spark 性能優化概覽:
Spark的計算本質是,分佈式計算。
因此,Spark程序的性能可能由於集羣中的任何因素出現瓶頸:CPU、網絡帶寬、或者內存。
CPU、網絡帶寬,是運維來維護的。
聚焦點:內存。
若是內存可以容納下全部的數據,那就不須要調優了。
若是內存比較緊張,不足以放下全部數據(10億量級---500G),須要對內存的使用進行性能優化。
好比:使用某些方法減小內存的消耗。
Spark性能優化,主要針對在內存的使用調優。
Spark性能優化的技術:
一、使用高性能序列化類庫
二、優化數據結構
三、對於屢次使用的RDD進行持久化、checkpoint
四、持久化級別:MEMORY_ONLY ---> MEMORY_ONLY_SER 序列化
五、Java虛擬機垃圾回收調優
六、Shuffle調優,1.x版本中,90%的性能問題,都是因爲Shuffle致使的。
其餘性能優化:
一、提升並行度
二、廣播共享數據
等等。。。
1、診斷Spark內存使用
首先要看到內存使用狀況,才能進行鍼對性的優化。
一、內存花費:
(1)每一個Java對象,都有一個對象頭,佔用16字節,包含一些對象的元信息,好比指向他的類的指針。
若是對象自己很小,好比int,可是他的對象頭比對象本身還大。
(2)Java的String對象,會比他內存的原始數據,多出40個字節。
String內部使用的char數組來保存內部的字符串序列,而且還要保存諸如輸出長度之類的信息。
char使用的是UTF-16編碼,每一個字符會佔2個字節。好比,包含10個字符的String,2*10+40=60字節
(3)Java中的集合類型,好比HashMap和LinkedList,內部使用鏈表數據結構。
鏈表中的每一個數據,使用Entry對象包裝。
Entry對象,不光有對象頭,還有指向下一個Entry的指針,佔用8字節。
(4)元素類型爲原始數據類型(int),內部一般會使用原始數據類型的包裝類型(Integer)來存儲元素。
二、如何判斷Spark程序消耗內存狀況?
預估
(1)設置RDD的並行度。
兩種方法建立RDD,parallelize() textFile() 在這兩個方法中,傳入第二個參數,設置RDD的partition數量。
在SparkConfig中設置一個參數:
spark.default.parallelism
能夠統一設置這個application中全部RDD的partition數量
(2)將RDD緩存 cache()
(3)觀察日誌:driver日誌
/usr/local/spark-2.1.0-bin-hadoop2.7/work
19/04/13 22:01:05 INFO MemoryStore: Block rdd_3_1 stored as values in memory (estimated size 26.0 MB, free 339.9 MB)
19/04/13 22:01:06 INFO MemoryStore: Block rdd_3_0 stored as values in memory (estimated size 26.7 MB, free 313.2 MB)
(4)將這個內存信息相加,就是RDD內存佔用量。
2、使用高性能序列化類庫
一、數據序列化概述
數據序列化,就是將對象或者數據結構,轉換成特定的格式,使其可在網絡中傳輸,或存儲在內存或文件中。
反序列化,是相反的操做,將對象從序列化數據中還原出來。
序列化後的數據格式,能夠是二進制,xml,Json等任何格式。
對象、數據序列化的重點在於數據的交換與傳輸。
在任何分佈式系統中,序列化都是扮演着一個重要的角色。
若是使用的序列化技術,操做很慢,或者序列化後的數據量仍是很大,會讓分佈式系統應用程序性能降低不少。
因此,Spark性能優化的第一步,就是進行序列化的性能優化。
Spark自身默認會在一些地方對數據進行序列化,好比Shuffle。另外,咱們使用了外部數據(自定義類型),也要讓其課序列化。
Spark自己對序列化的便捷性和性能進行了取捨
默認狀況下:Spark傾向於序列化的便捷性,使用了Java自身提供的序列化機制,很方便使用。
可是,Java序列化機制性能不高,序列化速度慢,序列化後數據較大,比較佔用內存空間。
二、kryo
Spark支持使用kryo類庫來進行序列化。
速度快,佔用空間更小,比Java序列化數據佔用空間小10倍。
三、如何使用kryo序列化機制
(1)設置Spark conf
bin/spark-submit will also read configuration options from conf/spark-defaults.conf,
in which each line consists of a key and a value separated by whitespace. For example:
spark.master spark://5.6.7.8:7077
spark.executor.memory 4g
spark.eventLog.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer
(2)使用kryo是,要求須要序列化的類,要提早註冊,以得到高性能
conf.registerKryoClasses(Array(classOf[Count],......))
四、kryo類庫的優化
(1)優化緩存大小
若是註冊的自定義類型,自己特別大(100個字段),會致使要序列化的對象太大。
此時須要對kyro自己進行優化。由於kryo內部的緩存,可能不能存放這麼大的class對象。
spark.kryoserializer.buffer.max 設置這個參數,將其調大。
(2)預先註冊自定義類型
雖然不註冊自定義類型,kryo也能夠正常工做,但會保存一份他的全限定類名,耗費內存。
推薦預先註冊要序列化的自定義類型。
3、優化數據結構
一、概述
要減小內存的消耗,除了使用高效的序列化類庫外,還要優化數據結構。
避免Java語法特性中所致使的額外內存開銷。
核心:優化算子函數內部使用到的局部數據或算子函數外部的數據。
目的:減小對內存的消耗和佔用。
二、如何作?
(1)優先使用數組以及字符串,而不是集合類。即:優先使用Array,而不是ArrayList、LinkedList、HashMap
使用int[] 會比List<Integer> 節省內存
(2)將對象轉換成字符串。
企業中,將HashMap、List這種數據,統一用String拼接成特殊格式的字符串
Map<Integer,Person> persons = new HashMap<Integer,Person>()
能夠優化爲:
"id:name,address"
String persons = "1:Andy,Beijing|2:Tom,Tianjin...."
(3)避免使用多層嵌套對象結構
舉例:
下面的例子很差,由於Teacher類的內部又嵌套了大量的小的Student對象
public class Teacher{ private .....; privage List<Student> students = new ArrayList()}
解決:轉換成字符串進行處理。
{"teacherId": 1, "students":[{"stuId":1.....},{}]}
(4)對於可以避免的場景,儘可能使用int代替String
雖然String比List效率高,但int類型佔用更少內存
好比:數據庫主鍵,id,推薦使用自增的id,而不是uuid
4、rdd.cache checkpoint
5、持久化級別:MEMORY_ONLY ---> MEMORY_ONLY_SER 序列化
6、Java虛擬機的調優
一、概述
若是在持久化RDD的時候,持久化了大量的數據,那麼Java虛擬機的垃圾回收就可能成爲一個瓶頸
Java虛擬機會按期進行垃圾回收,此時會追蹤全部Java對象,而且在垃圾回收時,找到那些已經再也不使用的對象。
清理舊對象,給新對象騰出空間。
垃圾回收的性能開銷,是與內存中的對象數量成正比。
在作Java虛擬機調優以前,必須先作好上面的調優工做,這樣纔有意義。
必須注意順序
二、Spark GC原理
見圖片
三、監測垃圾回收
咱們能夠進行監測,好比多久進行一次垃圾回收以及耗費的時間等等。
spark-submit腳本中,添加一個配置
--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimesStamps"
注意:這個是輸出到worker日誌中,而不是driver日誌。
/usr/local/spark-2.1.0-bin-hadoop2.7/logs worker日誌
/usr/local/spark-2.1.0-bin-hadoop2.7/work driver日誌
四、優化Executor內存比例
目的:減小GC次數。
對於GC調優來講,最重要的就是調節,RDD的緩存佔用的內存空間 與 算子執行時建立對象所佔用的內存空間 的比例
對於默認狀況,Spark使用每一個Executor 60% 的內存空間來緩存RDD,在task運行期間所建立的對象,只有40%內存空間來存放。
使用:conf.set("spark.storage.memoryFraction",0.5)
五、Java GC 調優 (-)
7、shuffle原理
一、優化前
圖片
二、優化後
圖片
8、其餘調優
一、提升並行度
二、廣播共享數據
---------------Spark MLlib--------------------------
MLlib 是 Spark 能夠擴展的機器學習庫。
MLlib is Apache Spark's scalable machine learning library.
1、MLlib概述
MLlib 是 Spark 能夠擴展的機器學習庫。
Spark在機器學習方面具備得天獨厚的有事,有如下幾個緣由:
一、機器學習算法通常都有多個步驟迭代計算,須要在屢次迭代後,得到足夠小的偏差或者收斂纔會中止。
double wucha = 1.0
while(wucha>=0.00001){
建模 wucha -= 某個值
}
模型計算完畢
當迭代使用Hadoop的MapReduce計算框架時,每次都要讀寫硬盤以及任務啓動工做,致使很大的IO開銷。
而Spark基於內存的計算模型天生擅長迭代計算。只有在必要時,纔會讀寫硬盤。
因此Spark是機器學習比較理想的平臺。
二、通訊,Hadoop的MapReduce計算框架,經過heartbeat方式來進行通訊和傳遞數據,執行速度慢。
spark 有高效的 Akka 和 Netty 的通訊系統,通行效率高。
SPark MLlib 是Spark 對經常使用的機器學習算法的實現庫,同時包括相關測試和數據生成器。
2、什麼是機器學習?
一、機器學習的定義。
A computer program is said to learn from experience E with respect to some class of tasks T and performance measure P,
if its performance at tasks in T, as measured by P, improves with experience E。
三個關鍵詞:算法、經驗、模型評價
在數據的基礎上,經過算法構建出模型,並進行評價
若是達到要求,則用該模型測試其餘數據
若是不達到要求,要調整算法來從新創建模型,再次進行評估
循環往復,知道得到滿意的經驗
應用:金融反欺詐、語音識別、天然語言處理、翻譯、模式識別、智能控制等等
二、基於大數據的機器學習
傳統的機器學習算法,因爲技術和單機存儲的現值,只能在少許數據上使用。
即,依賴於數據抽樣。
問題:很難作好隨機,致使學習的模型不許確。
在大數據上進行機器學習,直接處理全量數據並進行大量迭代計算。
Spark自己計算優點,適合機器學習。
另外 spark-shell pyspark 均可以提供及時查詢工具
三、MLlib
MLlib是Spark機器學習庫,簡化機器學習的工程實踐工做,方便擴展到更大規模。
集成了通用的學習算法:分類、迴歸、聚類、協同過濾、降維等等
另外,MLlib自己在Spark中,數據清洗、SQL、建模放在一塊兒。
3、線性迴歸
4、餘弦類似性
https://blog.csdn.net/u012160689/article/details/15341303
機率論課程:
可汗學院 機率論 英文 有中文字幕
------------Spark Graphx--------------------
1、Spark Graphx 是什麼?
一、是Spark 的一個模塊,主要用於進行以圖爲核心的計算,還有分佈式圖計算
二、Graphx 底層基於RDD計算,和RDD共用一種存儲形態。在展現形態上,能夠用數據集來表示,也能夠用圖來表示。
2、Spark GraphX 有哪些抽象?
一、頂點
RDD[(VertexId,VD)]表示
VertexId 表明了頂點的ID,是Long類型
VD 是頂點的屬性,能夠是任何類型
二、邊
RDD[Edge[ED]]表示
Edge表示一個邊
包含一個ED類型參數來設定屬性
另外,邊還包含了源頂點ID和目標頂點ID
三、三元組
三元組結構用RDD[EdgeTriplet[VD,ED]]表示
三元組包含一個邊、邊的屬性、源頂點ID、源頂點屬性、目標頂點ID、目標頂點屬性。
四、圖 Graph表示,經過頂點和邊來構建。