https://blog.csdn.net/rlnLo2pNEfx9c/article/details/78738084html
http://dblab.xmu.edu.cn/blog/1264/ 廈門大學數據庫實驗室java
元素:一行爲一個元素mysql
例:flatMap(x => (x to 5)) 元素行拆分,可用於切分單詞git
reduce() 兩兩操做 同構github
fold() 同reduce 可是有初始值web
aggregate() :算法
它先聚合每個分區裏的元素,而後將全部結果返回回來,再用一個給定的conbine方法以及給定的初始值zero value進行聚合。sql
def aggregate [U: ClassTag] (zeroValue: U) (seqOp: (U,T)=>U,combOp: (U,U)=>U):U數據庫
分區如何計算?apache
惰性求值:
rdd的轉化操做都是惰性求值的。調用行動操做前不會開始計算。
持久化:
持久化有內存,硬盤。
鍵值對操做:
歸約
組合
操做模式:原始rdd-》二元組rdd
逐行掃描:key不變,對值進行計算。能夠進行單行操做,也能夠兩兩相加,或者分組。
例: flatMapValues(x => (x to 5)) 符號化?
兩個rdd
(求鍵值rdd中鍵的平均值)
操做模式:鍵值rdd -> 值轉換(擴展成二元組)-> reduce (值相加,值擴展相加)- >
2018-07-13:
基本概念:
交互式查詢:
http://www.voidcn.com/article/p-tglsuazy-kc.html
內存計算:相比hadoop的批處理,spark更多的把數據放到內存中。Spark可看 作基於內存的Map-Reduce實現
流式計算的典型範式之一是不肯定數據速率的事件流流入系統
https://blog.csdn.net/jiyiqinlovexx/article/details/27403761
迭代式計算:
https://www.cnblogs.com/wei-li/p/Spark.html
7月15日
spark streaming 即時處理
離散化流,時間區間的概念,
離散化流 DStream 支持轉換保存
連續批次,時間片。
2019年2月2日:
計劃:
教程,原理,實踐。
2020-04-25:
spark 緩存存到哪兒了? 數據不均衡問題怎麼引發的? yarn調度原理流程以及注意? spark內存模型? rdd內的元素類型必須同樣嗎? driver端程序,與excutor端程序區別?
1、spark的計算和存儲:
hadoop的存儲,spark本身的計算處理。
2、spark特性
80個高層次的操做符互助查詢。
除了map reduce,還支持sql,流數據,機器學習,圖形算法
3、數據集抽象RDD(彈性分佈式數據集)
RDD兩種建立方式:內部已有的RDD,或者外部文件系統(HDFS,HBase,Hadoop的輸入格式)
MapReduce之間共享數據只能外部共享,共享效率比較慢!
map的中間結果會放到hdfs上。
關注 spark技術分享,擼spark源碼 玩spark最佳實踐
1、安裝
spark 安裝包 以及能夠擴展的組件?:
最小安裝:
YARN須要安裝:
hdfs如何掛接:
2、教程中的例子
一、內部,外部讀取數據的例子
二、多個map串聯
三、如何找到緩存的RDD數據集
1、侷限
1.僅支持Map和Reduce兩種操做;
2.處理效率低效;不適合迭代計算(如機器學習、圖計算等),交互式處理(數據挖掘)和流失處理(日誌分析)
3.Map中間結果須要寫磁盤,Reduce寫HDFS,多個MR之間經過HDFS交換數據;
4.任務調度和啓動開銷大;
5.沒法充分利用內存;(與MR產生時代有關,MR出現時內存價格比較高,採用磁盤存儲代價小)
6.Map端和Reduce端均須要排序;
2.MapReduce編程不夠靈活。(比較Scala函數式編程而言)
3.框架多樣化[採用一種框架技術(Spark)同時實現批處理、流式計算、交互式計算]:
1.批處理:MapReduce、Hive、Pig;
2.流式計算:Storm
3.交互式計算:Impala
http://www.javashuo.com/article/p-nccakrfs-cz.html
嵌入在 spark 中的 key-value型分佈式存儲系統
Block :
是Spark storage模塊中最小的單位。
會存儲到Memory 、Disk。
Block是Partition的基礎。
RDD是由不一樣的partition組成的,也是由不一樣的block組成的。
Block Interal:
默認是200ms,推薦最小50ms。
Receiver接收,切分紅Block,一個週期batch會有Block數量=> RDD的分區Partition數量和Task數量
Task數量 = batch週期間隔 / block間隔 ,而後再與Spark Core數量進行比較
http://www.javashuo.com/article/p-fxcvxczz-bq.html
Application、SparkSession、SparkContext之間具備包含關係,而且是1對1的關係。
SparkSession 是 Spark 2.0 版本引入的新入口。
1.分佈在集羣中的只讀對象集合(由多個Partition 構成);
2.能夠存儲在磁盤或內存中(多種存儲級別);
3.經過並行「轉換」操做構造; transformations(map filter)
4.失效後自動重構;
5.RDD基本操做(operator)
二、Transformation具體內容
......
三、actions具體內容
........
四、算子分類
transformations 算子
action算子
接口定義方式不一樣:
Transformation: RDD[X]-->RDD[y]
Action:RDD[x]-->Z (Z不是一個RDD,多是一個基本類型,數組等)
惰性執行:
Transformation:只會記錄RDD轉化關係,並不會觸發計算
Action:是觸發程序執行(分佈式)的算子。
http://www.cnblogs.com/beiyi888/p/9802249.html
Transformation:表明的是轉化操做就是咱們的計算流程,返回是RDD[T],能夠是一個鏈式的轉化,而且是延遲觸發的。
Action:表明是一個具體的行爲,返回的值非RDD類型,能夠一個object,或者是一個數值,也能夠爲Unit表明無返回值,而且action會當即觸發job的執行。
https://www.cnblogs.com/qingyunzong/p/8987065.html
分區:
相同的key的數據存儲到了相同的節點,減小了網絡傳輸問題。
分區只對鍵值對的數據計算纔有幫助。
如何設置分區:
分區多意味着任務多
分區少可能會致使節點沒有平均分配到數據,利用不充分
分區少還可能致使處理的數據多,節點內存不夠用
合理的分區數:
通常合理的分區數設置爲總核數的2~3倍
總核數=一個executor的cores * executor個數
減小了通訊開銷
分區個數:儘可能等於集羣中的CPU核心數量:假設CPU核心與分區數據一對一執行
本地模式:能夠
輸入,輸出都有分區嗎?
hdfs part 文件跟分區有關係?
https://zhuanlan.zhihu.com/p/50752866
job定義:程序中遇到一個action算子的時候,就會提交一個job
spark stage 定義:一個job一般包含一個或多個stage。job須要在分區之間進行數據交互,那麼一個新的stage將會產生。
shuffe定義:分區之間的數據交互其實就是shuffle, 下一個stage的執行首先要去拉取上一個stage的數據(shuffle read操做),保存在本身的節點上,就會增長網絡通訊和IO。
spark與有向無環圖:
若是一個父RDD的數據只進入到一個子RDD,好比map、union等操做,稱之爲narrow dependency(窄依賴)。不然,就會造成wide dependency( 寬依賴),通常也成爲shuffle依賴,好比groupByKey等操做。
https://www.zhihu.com/question/33270495
file->block 原始文件拆分紅塊:
一對多 (inputformat?)
block->inputSplit 塊合併成一個輸入分片:
多對一
inputSplit不能跨文件?
inputSplit->task:
一一對應
集羣->節點
一對多
節點->executor:
一對多
core 是虛擬的core,不是物理機器的cpu core,此處的core能夠理解爲executor的一個工做線 程。
executor可使用多個core。
executor->task
一對多
一個由多個task組成
task併發 = executor數據 * 每一個executor核數
task->partition
一對一
一個任務生成一個partition
目標RDD->partition
由多個partition組成
https://blog.csdn.net/lp284558195/article/details/80931818
sprak的DAG調度程序,把用戶的應用程序先拆分紅大的stage,再分紅小的task。
driver:
用戶程序轉爲任務,把邏輯圖轉爲物理執行計劃。
跟蹤excutor運行狀態,
調度任務task給executor,
UI展現任務執行狀況。
executor是工做進程:
負責運行任務,
返回結果給driver
一個executor能夠運行多個task
一個executor能夠同時運行最多 核心數個任務
http://www.javashuo.com/article/p-hwigawhe-kp.html
一、意義:加快速度,數據共享?
當持久化一個 RDD 時,每一個節點的其它分區均可以使用 RDD 在內存中進行計算,在該數據上的其餘 action 操做將直接使用內存中的數據。這樣會讓之後的 action 操做計算速度加快(一般運行速度會加速 10 倍)。
緩存迭代算法和快速的交互式使用的重要工具
二、存儲級別
默認的策略:
若是內存空間不夠,部分數據分區將再也不緩存,在每次須要用到這些數據時從新進行計算
三、cahce方法 與 persit方法
cache()調用的persist(),是使用默認存儲級別的快捷設置方法
http://www.cnblogs.com/luogankun/p/3801047.html
序列化好處缺點:
序列化後的對象存放在內存中,佔用的內存少,可是用時須要反序列化,會消耗CPU;
默認的方式(原生存儲,cpu效率高,讀取快):
spark默認存儲策略爲MEMORY_ONLY:只緩存到內存而且以原生方式存(反序列化)一個副本;
http://www.cnblogs.com/BYRans/p/5003029.html
rdd使用舉例:舉例建立,操做,
dataFrame是從Rdd擴展:
能夠生成rdd;
提供對類sql的支持:
篩選,合併,從新入庫;
能夠理解關係數據庫的一張表;
對java api支持相比sacala 有限。
https://blog.csdn.net/u012965373/article/details/80847543
計算節點數和每一個計算節點核數,決定了最大任務數
提升並行度的方法(調優的方法)就是提升資源利用效率:在最少的時間內把任務運行完成。
一個executor的核心數決定了最大同時運行的任務數
一個分區的結果由一個任務生成
excutor故障恢復
如何處理的,很好的
driver故障恢復
todo
集羣管理
Spark yarn 管理方式:
問題一:如何與yarn集成在一塊兒?
spark-1.6.1-bin-hadoop2.6:
進程:
Master 與 Worker(Slave)
在yarn上啓動sprak 應用程序的兩種部署模式?
客戶端模式:
driver運行在客戶端進程中
集羣模式:
dirver運行在 yarn applicationmaster中,與客戶端進程是分離的
在yarn上的提交任務?
./bin/spark-submit --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ --driver-memory 4g \ --executor-memory 2g \ --executor-cores 1 \ --queue thequeue \ examples/jars/spark-examples*.jar \ 10
Spark mesos 管理方式:
問題一:如何安裝mesos?
Cluster Manager:
standalong 模式:
Spark manager
mesos 模式:
mesos manager
用戶權限
?
實時大數據分析、風控預警、實時預測、金融交易等諸多業務場景領域。
時延要求低的場景;
批量(或者說離線)處理對於以上業務需求不能勝任;
數據量大;
過去開發的統計數據計算,也算成一個簡單的流失計算!
流式處理框架特色:
無界的數據集;
進行接二連三的處理,聚合,分析的過程;
延遲須要儘量的低,時效性高,數據具備時效性;
https://help.aliyun.com/knowledge_detail/62440.html
http://www.cnblogs.com/yaohaitao/p/5703288.html
性能健壯性不及storm,可是吞吐量比storm高。
spark是一個生態,和Spark Core、Spark SQL無縫整合。
spark中間數據能夠直接批處理、交互式查詢;
https://www.cnblogs.com/jins-note/p/9513448.html
好像沒有明顯區別。
http://lxw1234.com/archives/2015/05/217.htm
組件:
flume-ng-core
spark-streaming-flume
https://spark.apache.org/docs/2.2.0/cluster-overview.html
https://spark.apache.org/docs/2.2.1/configuration.html
應用程序配置:
三種來源:
命令行
SparkConf
spark-defaults.conf
如何查看:
經過web ui 查看,4040端口,「Environment」 tab。
https://spark.apache.org/docs/2.2.1/monitoring.html
https://www.w3cschool.cn/spark/y27pgozt.html
基於spark rdd數據結構。
增長一個時間的概念,定時多久去數據源取一次數據,建立一個RDD。
建立:
//SparkContext建立
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(1))
jvm中同一時間只有一個StreamingContext處於活躍狀態
關閉前一個StreamingContext才能建立下一個StreamingContext。
http://www.javashuo.com/article/p-dqcxvkzc-kz.html
一、數據接收並行度調優
建立多個DStream和Receive
合理設置 Batch Interval 和 Block Interval
二、數據處理並行度調優
task任務序列化 ?
啓動合理個數的task,reduce有些計算能夠指定並行度
三、數據序列化調優
數據格式優化,減小數據序列化開銷。
使用Kyro序列化類庫,該類庫效率高。
根據數據量選擇合適的序列話級別:小量數據能夠不序列化。
四、batch interval 週期優化
UI上的batch處理時間與週期進行比較。處理時間長考慮縮短處理時間或者增長週期時長。
時間至關最好。數據充分利用處理能力
五、內存調優
stream儘可能不須要放到硬盤上,畢竟stream的目的就是實時。
觀測內存夠用,不須要花費太長時間的GC,影響程序正常運行時間
來源:
從源中獲取的輸入流
輸入流經過轉換算子生成的處理後的數據流
特色:
連續
肯定時間間隔
與RDD:
由一系列連續的rdd組成
與Receiver:
每個輸入流DStream和一個Receiver對象相關聯
核佔用問題:
receiver佔用一個單獨的核心。
數據源類型:
Spark Streaming擁有兩類數據源:
基本源(Basic sources):
文件系統、套接字鏈接、Akka的actor等
高級源(Advanced sources):
Kafka,Flume,Kinesis,Twitter等
窗口計算:
參數:
窗口長度(windowDuration):窗口的持續時間
滑動的時間間隔(slideDuration):窗口操做執行的時間間隔。
注意:這兩個參數必須是源DStream的批時間間隔的倍數
常見問題:
窗口比較大的狀況如何處理?
輸出操做:
foreachRDD:
建立鏈接:
在每個rdd中建立 (序列化錯誤,鏈接對象在機器間不能傳送)
dstream.foreachRDD(rdd => { 建立鏈接
在每個元素中建立 (耗費資源)
dstream.foreachRDD(rdd => { rdd.foreach(record => { 建立鏈接
在每個rdd分區中建立 right!
dstream.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { 建立鏈接
轉換:
UpdateStateByKey
Transform
有狀態:
跨多個批,跨rdd的
Spark Streaming Checkpointing:
包括:
元數據,配置信息,操做集合,未完成的批,
什麼時候必須開啓checkpoint:
使用有狀態的transformation。
好比:updateStateByKey,reduceByKeyAndWindow
從運行應用程序的driver的故障中恢復過來。
使用元數據恢復處理信息。
怎樣配置checkpoint:
在容錯可靠的文件系統中設置checkpoint目錄:
getOrCreate建立StreamContext上下文。
應用程序的基礎設置設置driver重啓。
設置checkpoint間隔時間5-10秒。
http://vishnuviswanath.com/spark_structured_streaming.html Spark 結構化流之旅
https://ohmycloud.github.io/2018/11/27/a-tour-of-spark-structured-streaming/ A Tour of Spark Structured Streaming
https://www.infoq.cn/article/UEOq-ezu4ImwHxGiDFc8 是時候放棄 Spark Streaming,轉向 Structured Streaming 了 (TO READ !)
https://github.com/lw-lin/CoolplaySpark/blob/master/Structured%20Streaming%20源碼解析系列/4.2%20Structured%20Streaming%20之%20Watermark%20解析.md 解決了什麼問題(TO EXPLORE)
用戶定義邏輯計劃,交給spark進行計劃的優化,執行。
偏移量追蹤+狀態管理
保證一次,不用使用者去維護sink去重邏輯。
EventTime: 事件被產生的時間,
ProcessingTime:事件被系統處理的時間
IngestionTime:事件被放入系統的時間
參考: https://towardsdatascience.com/watermarking-in-spark-structured-streaming-9e164f373e9
定義
withWatermark(eventTime: String, delayThreshold: String): Dataset[T]
第一個參數:消息時間列,必須跟聚合groupBy的列同樣
第二個參數:閾值,有效消息的閾值單位能夠是秒,以窗口的開始時間爲基準
丟棄與更新
處理數據的時候,消息數據的範圍是兩次處理時間範圍。
可是收到的消息的eventTime有多是任什麼時候候,或者更老。
因此處理的時候要從全部消息中經過eventTime進行篩選,是滾動篩選的,
在這批數據中找到最大的eventTime,eventTIme-業務定義的更晚的時間值=本次會處理消息最小的時間(水印)。
這批數據纔是真的業務處理的數據。
若是一個事件落在水印內,更新;若是是更舊的數據,丟棄。
檢查消息是否被保留: max eventTime — delayThreshold > T
T : 當前窗口開始的時刻。
delayThreshold : 用戶設置的水印閾值
max eventTime: 系統處理全部事件的最大時刻.
一個窗口正在處理,若是有一個新事件進來,而且這個新事件的發生時間沒有超過期間區間(窗口開始時間+水印閾值),則更新查詢!
不然丟棄該事件。
舉例:
接收了一個10:00:07秒產生的消息, 水印是5秒, 窗口開始10:00:00 ,超過10:00:05,因此該消息被丟棄
注意:
原來的聚合狀態必須保存。(可是內存會變大)
窗口除了有時間特徵外,還有分組,使用groupBy實現。
事件產生的時間戳用來標識屬於哪一個窗口。
翻轉窗口Tumbling Window
不重疊,連續。
一個事件只屬於一個窗口。
屬於一種特殊的滑動窗口。
滑動窗口 Sliding Window
兩個窗口會重疊。
一個事件可能會屬於多個窗口。
舉例:
收集過去4秒汽車的平均速度?
代碼:
//a tumbling window of size 4 seconds
val aggregates = cars
.groupBy(window($"timestamp","4 seconds"), $"carId")
.agg(avg("speed").alias("speed"))
//a sliding window of size 4 seconds that slides every 2 seconds can be created using cars.groupBy(window($"timestamp","4 seconds","2 seconds"), $"carId")
代碼輸出:
Batch 1
[2018-01-21 00:50:00, 2018-01-21 00:50:04] car1 75.0
Batch 2
.....
spark1.x時代的產物,演化到了spark2.0可能就沒用了
做用: 構建複雜機器學習工做流應用。 是ML Lib的補充。 步驟: 一、定義stage 指標提取和轉換模型訓練等。 主要是Transformer 和 Estimator。 具體功能是:據集處理轉化,模型訓練,參數設置或數據預測 二、建立pipleline 組合器stage 三、獲得piplelineModel 具體應用 參考: https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice5/
子類是Transformer 和 Estimator
主要是用來把 一個 DataFrame 轉換成另外一個 DataFrame 子類有Model類。 api: http://spark.apache.org/docs/2.1.3/api/java/org/apache/spark/ml/Transformer.html
評估器或適配器,主要是經過DataFrame訓練獲得一個Model(是Transformer的子類)。 子類有Predictor類。 api: http://spark.apache.org/docs/2.1.3/api/java/org/apache/spark/ml/Estimator.html
類關係: 父類是Predictor,父類的父類是Estimator 內部方法: 方法fit,裏面調用的是train訓練數據的方法。 返回: 訓練結束會返回一個RandomForestClassificationModel 模型。 預測: 返回的model能夠用來Transformer預測。
https://blog.csdn.net/xuejianbest/article/details/90769557 Spark提供的基礎特徵處理類之VectorAssembler和VectorIndexer
foreachPartition VS foreach
https://my.oschina.net/dongtianxi/blog/745908
用foreachPartition替代foreach,有助於性能的提升
coalesce VS repartition
http://www.javashuo.com/article/p-pahjohwo-mr.html
寫入mysql
https://bit1129.iteye.com/blog/2186405
最佳實踐:
應該在worker打開連接,能夠調用forEachPartition,在函數裏打開連接
一個分區打開一個數據庫連接,不要打開太多,減小連接數
儘可能使用批量
寫失敗的狀況,如何應對? 彈性
寫大數據集註意網絡超時
考慮使用數據庫鏈接池(一個分區一個連接夠了?就不須要鏈接池了吧)
連接共享數據庫?
explode
https://blog.csdn.net/macanv/article/details/78297150
df = df.select(functions.explode(functions.col("data"))).toDF("key", "value");
explode
http://www.javashuo.com/article/p-nemqtjow-gu.html
val dfScore = df.select(df("name"),explode(df("myScore"))).toDF("name","myScore")
參考
https://www.cnblogs.com/code2one/p/9872241.html#collaborative-filtering-with-alternating-least-squares Spark之MLlib
als
https://spark.apache.org/docs/latest/ml-collaborative-filtering.html 官方文章
https://blog.csdn.net/u011239443/article/details/51752904 深刻理解Spark ML:基於ALS矩陣分解的協同過濾算法與源碼分析
評估
http://www.javashuo.com/article/p-cfwmdueu-em.html spark機器學習庫評估指標總結
內存溢出
spark 序列化
任務序列話
Core rdd 序列化
默認持久化級別是MEMORY_ONLY
stream rdd 序列話
默認級別: MEMORY_ONLY_SER,序列化佔用空間更小,減小GC
與hadoop的關係:
spark 使用了hadoop的存儲 調度 管理的平臺。
spark 惟一作的是替代hadoop的計算部分,也就是MapReduce部分。
spark與hadoop版本一致問題:
spark會跟着hadoop的版本走,hadoop是基礎。
spark發佈的時候會附帶着hadoop一塊兒打包發佈。
pyspark庫:
pyspark通過py4J經過nativeSocket的方式轉換成jvm可一運行的指令。
pyspark每每跟着spark運行環境版本一塊兒會發佈一個配套的包。
scala庫能夠先不用安裝spark:有一種方法不用安裝spark環境只須要引入一些庫就能夠寫spark程序:使用scala庫。