Spark探究

學習素材

https://blog.csdn.net/rlnLo2pNEfx9c/article/details/78738084html

http://dblab.xmu.edu.cn/blog/1264/ 廈門大學數據庫實驗室java

雜碎問題

元素:一行爲一個元素mysql

例:flatMap(x => (x to 5)) 元素行拆分,可用於切分單詞git

reduce() 兩兩操做 同構github

fold() 同reduce 可是有初始值web

aggregate() :算法

它先聚合每個分區裏的元素,而後將全部結果返回回來,再用一個給定的conbine方法以及給定的初始值zero value進行聚合。sql

def aggregate [U: ClassTag] (zeroValue: U) (seqOp: (U,T)=>U,combOp: (U,U)=>U):U數據庫

分區如何計算?apache

惰性求值:

rdd的轉化操做都是惰性求值的。調用行動操做前不會開始計算。

持久化:

持久化有內存,硬盤。

鍵值對操做:

歸約

組合

操做模式:原始rdd-》二元組rdd

逐行掃描:key不變,對值進行計算。能夠進行單行操做,也能夠兩兩相加,或者分組。

例: flatMapValues(x => (x to 5)) 符號化?

兩個rdd

(求鍵值rdd中鍵的平均值)

操做模式:鍵值rdd -> 值轉換(擴展成二元組)-> reduce (值相加,值擴展相加)- >

2018-07-13:

基本概念:

交互式查詢:

http://www.voidcn.com/article/p-tglsuazy-kc.html

內存計算:相比hadoop的批處理,spark更多的把數據放到內存中。Spark可看 作基於內存的Map-Reduce實現

流式計算的典型範式之一是不肯定數據速率的事件流流入系統

https://blog.csdn.net/jiyiqinlovexx/article/details/27403761

迭代式計算:

https://www.cnblogs.com/wei-li/p/Spark.html

7月15日

spark streaming 即時處理

離散化流,時間區間的概念,

離散化流 DStream 支持轉換保存

連續批次,時間片。

2019年2月2日:

計劃:

教程,原理,實踐。

2020-04-25:

spark 緩存存到哪兒了?

數據不均衡問題怎麼引發的?

yarn調度原理流程以及注意?
  

spark內存模型?

rdd內的元素類型必須同樣嗎?


driver端程序,與excutor端程序區別?

教程

https://www.yiibai.com/spark/

1、spark的計算和存儲:

hadoop的存儲,spark本身的計算處理。

2、spark特性

80個高層次的操做符互助查詢。

除了map reduce,還支持sql,流數據,機器學習,圖形算法

3、數據集抽象RDD(彈性分佈式數據集)

RDD兩種建立方式:內部已有的RDD,或者外部文件系統(HDFS,HBase,Hadoop的輸入格式)

MapReduce之間共享數據只能外部共享,共享效率比較慢!

map的中間結果會放到hdfs上。

http://spark.coolplayer.net

關注 spark技術分享,擼spark源碼 玩spark最佳實踐

實踐:

1、安裝

spark 安裝包 以及能夠擴展的組件?:

最小安裝:

YARN須要安裝:

hdfs如何掛接:

2、教程中的例子

一、內部,外部讀取數據的例子

二、多個map串聯

三、如何找到緩存的RDD數據集

mapReduce侷限性:

1、侷限

1.僅支持Map和Reduce兩種操做;

    2.處理效率低效;不適合迭代計算(如機器學習、圖計算等),交互式處理(數據挖掘)和流失處理(日誌分析)

    3.Map中間結果須要寫磁盤,Reduce寫HDFS,多個MR之間經過HDFS交換數據;

    4.任務調度和啓動開銷大;

    5.沒法充分利用內存;(與MR產生時代有關,MR出現時內存價格比較高,採用磁盤存儲代價小)

    6.Map端和Reduce端均須要排序;

  2.MapReduce編程不夠靈活。(比較Scala函數式編程而言)

  3.框架多樣化[採用一種框架技術(Spark)同時實現批處理、流式計算、交互式計算]:

    1.批處理:MapReduce、Hive、Pig;

  2.流式計算:Storm

    3.交互式計算:Impala 

Spark核心概(未看)

http://www.javashuo.com/article/p-nccakrfs-cz.html

BlockManager

嵌入在 spark 中的 key-value型分佈式存儲系統

Block :

是Spark storage模塊中最小的單位。

會存儲到Memory 、Disk。

Block是Partition的基礎。

RDD是由不一樣的partition組成的,也是由不一樣的block組成的。

Block Interal:

默認是200ms,推薦最小50ms。

Receiver接收,切分紅Block,一個週期batch會有Block數量=> RDD的分區Partition數量和Task數量

Task數量 = batch週期間隔 / block間隔 ,而後再與Spark Core數量進行比較

SparkSession

http://www.javashuo.com/article/p-fxcvxczz-bq.html

Application、SparkSession、SparkContext之間具備包含關係,而且是1對1的關係。

SparkSession 是 Spark 2.0 版本引入的新入口。

RDD

1.分佈在集羣中的只讀對象集合(由多個Partition 構成);

    2.能夠存儲在磁盤或內存中(多種存儲級別);

    3.經過並行「轉換」操做構造; transformations(map filter)

    4.失效後自動重構;

    5.RDD基本操做(operator)

二、Transformation具體內容

......

三、actions具體內容

........

四、算子分類

transformations 算子

  1. Value數據類型的Transformation算子,這種變換並不觸發提交做業,針對處理的數據項是Value型的數據。
  2. Key-Value數據類型的Transfromation算子,這種變換並不觸發提交做業,針對處理的數據項是Key-Value型的數據對。

action算子

  1. Action算子,這類算子會觸發SparkContext提交Job做業。

接口定義方式不一樣:

          Transformation: RDD[X]-->RDD[y]

          Action:RDD[x]-->Z (Z不是一個RDD,多是一個基本類型,數組等)

惰性執行:

          Transformation:只會記錄RDD轉化關係,並不會觸發計算

          Action:是觸發程序執行(分佈式)的算子。

Transformation和Action區別:

http://www.cnblogs.com/beiyi888/p/9802249.html

Transformation:表明的是轉化操做就是咱們的計算流程,返回是RDD[T],能夠是一個鏈式的轉化,而且是延遲觸發的。 

Action:表明是一個具體的行爲,返回的值非RDD類型,能夠一個object,或者是一個數值,也能夠爲Unit表明無返回值,而且action會當即觸發job的執行。

spark分區:

https://www.cnblogs.com/qingyunzong/p/8987065.html

分區:

相同的key的數據存儲到了相同的節點,減小了網絡傳輸問題。

分區只對鍵值對的數據計算纔有幫助。

如何設置分區:

分區多意味着任務多

分區少可能會致使節點沒有平均分配到數據,利用不充分

分區少還可能致使處理的數據多,節點內存不夠用

合理的分區數:

通常合理的分區數設置爲總核數的2~3倍

總核數=一個executor的cores * executor個數 

spark分區做用

減小了通訊開銷

分區個數:儘可能等於集羣中的CPU核心數量:假設CPU核心與分區數據一對一執行

本地模式:能夠

輸入,輸出都有分區嗎?

hdfs part 文件跟分區有關係?

job,stage與shuffe:

https://zhuanlan.zhihu.com/p/50752866

job定義:程序中遇到一個action算子的時候,就會提交一個job

spark stage 定義:一個job一般包含一個或多個stage。job須要在分區之間進行數據交互,那麼一個新的stage將會產生。

shuffe定義:分區之間的數據交互其實就是shuffle, 下一個stage的執行首先要去拉取上一個stage的數據(shuffle read操做),保存在本身的節點上,就會增長網絡通訊和IO。

spark與有向無環圖:

narrow dependency(窄依賴) 與 wide dependency( 寬依賴)

若是一個父RDD的數據只進入到一個子RDD,好比map、union等操做,稱之爲narrow dependency(窄依賴)。不然,就會造成wide dependency( 寬依賴),通常也成爲shuffle依賴,好比groupByKey等操做。

spark從原始文件到生成最終partion文件【名詞】:

https://www.zhihu.com/question/33270495

file->block 原始文件拆分紅塊:

一對多 (inputformat?)

block->inputSplit 塊合併成一個輸入分片:

多對一

inputSplit不能跨文件?

inputSplit->task:

一一對應

集羣->節點

一對多

節點->executor:

一對多

core 是虛擬的core,不是物理機器的cpu core,此處的core能夠理解爲executor的一個工做線 程。

executor可使用多個core。

executor->task

一對多

一個由多個task組成

task併發 = executor數據 * 每一個executor核數

task->partition

一對一

一個任務生成一個partition

目標RDD->partition

由多個partition組成

spark driver 與 executor 區別:

https://blog.csdn.net/lp284558195/article/details/80931818

sprak的DAG調度程序,把用戶的應用程序先拆分紅大的stage,再分紅小的task。

driver:

用戶程序轉爲任務,把邏輯圖轉爲物理執行計劃。

跟蹤excutor運行狀態,

調度任務task給executor,

UI展現任務執行狀況。

executor是工做進程:

負責運行任務,

返回結果給driver

一個executor能夠運行多個task

一個executor能夠同時運行最多 核心數個任務

rdd持久化(緩存)的意義:

http://www.javashuo.com/article/p-hwigawhe-kp.html

一、意義:加快速度,數據共享?

當持久化一個 RDD 時,每一個節點的其它分區均可以使用 RDD 在內存中進行計算,在該數據上的其餘 action 操做將直接使用內存中的數據。這樣會讓之後的 action 操做計算速度加快(一般運行速度會加速 10 倍)。

緩存迭代算法和快速的交互式使用的重要工具

二、存儲級別

默認的策略:

若是內存空間不夠,部分數據分區將再也不緩存,在每次須要用到這些數據時從新進行計算

三、cahce方法 與 persit方法

cache()調用的persist(),是使用默認存儲級別的快捷設置方法 

RDD緩存策略

http://www.cnblogs.com/luogankun/p/3801047.html

序列化好處缺點:

序列化後的對象存放在內存中,佔用的內存少,可是用時須要反序列化,會消耗CPU;

默認的方式(原生存儲,cpu效率高,讀取快):

spark默認存儲策略爲MEMORY_ONLY:只緩存到內存而且以原生方式存(反序列化)一個副本;

RDD

http://www.cnblogs.com/BYRans/p/5003029.html

rdd使用舉例:舉例建立,操做,

DataFrame :

dataFrame是從Rdd擴展:

能夠生成rdd;

提供對類sql的支持:

篩選,合併,從新入庫;

能夠理解關係數據庫的一張表;

對java api支持相比sacala 有限。

DataSet

並行度,核心數,任務數,分區數

https://blog.csdn.net/u012965373/article/details/80847543

計算節點數和每一個計算節點核數,決定了最大任務數

提升並行度的方法(調優的方法)就是提升資源利用效率:在最少的時間內把任務運行完成。

一個executor的核心數決定了最大同時運行的任務數

一個分區的結果由一個任務生成

shuffe與效率

故障恢復

excutor故障恢復

如何處理的,很好的

driver故障恢復

todo

SPARK 管理

集羣管理

Spark yarn 管理方式:

問題一:如何與yarn集成在一塊兒?

spark-1.6.1-bin-hadoop2.6:

進程:

Master 與 Worker(Slave)

在yarn上啓動sprak 應用程序的兩種部署模式?

客戶端模式:

driver運行在客戶端進程中

集羣模式:

dirver運行在 yarn applicationmaster中,與客戶端進程是分離的

在yarn上的提交任務?

./bin/spark-submit --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ --driver-memory 4g \ --executor-memory 2g \ --executor-cores 1 \ --queue thequeue \ examples/jars/spark-examples*.jar \ 10

Spark mesos 管理方式:

問題一:如何安裝mesos?

Cluster Manager:

standalong 模式:

Spark manager

mesos 模式:

mesos manager

用戶權限

使用場景

流式處理的場景:

實時大數據分析、風控預警、實時預測、金融交易等諸多業務場景領域。

時延要求低的場景;

批量(或者說離線)處理對於以上業務需求不能勝任;

數據量大;

過去開發的統計數據計算,也算成一個簡單的流失計算!

流式處理框架特色:

無界的數據集;

進行接二連三的處理,聚合,分析的過程;

延遲須要儘量的低,時效性高,數據具備時效性;

實時計算與批量計算:

https://help.aliyun.com/knowledge_detail/62440.html

spark與storm對比:

http://www.cnblogs.com/yaohaitao/p/5703288.html

性能健壯性不及storm,可是吞吐量比storm高。

spark是一個生態,和Spark Core、Spark SQL無縫整合。

spark中間數據能夠直接批處理、交互式查詢;

Hive、Spark SQL、Impala比較:

https://www.cnblogs.com/jins-note/p/9513448.html

好像沒有明顯區別。

spark streaming + flume:

http://lxw1234.com/archives/2015/05/217.htm

組件:

flume-ng-core

spark-streaming-flume

spark架構:

https://spark.apache.org/docs/2.2.0/cluster-overview.html

image.png

spark配置:

https://spark.apache.org/docs/2.2.1/configuration.html

應用程序配置:

三種來源:

命令行

SparkConf

spark-defaults.conf

如何查看:

經過web ui 查看,4040端口,「Environment」 tab。

spark 監控:

https://spark.apache.org/docs/2.2.1/monitoring.html

spark streaming

參考

https://www.w3cschool.cn/spark/y27pgozt.html

概念

基於spark rdd數據結構。

增長一個時間的概念,定時多久去數據源取一次數據,建立一個RDD。

建立與關閉:

建立:

//SparkContext建立

import org.apache.spark.streaming._

val ssc = new StreamingContext(sc, Seconds(1))

注意:

jvm中同一時間只有一個StreamingContext處於活躍狀態

關閉前一個StreamingContext才能建立下一個StreamingContext。

調優:

http://www.javashuo.com/article/p-dqcxvkzc-kz.html

一、數據接收並行度調優

建立多個DStream和Receive

合理設置 Batch Interval 和 Block Interval

二、數據處理並行度調優

task任務序列化 ?

啓動合理個數的task,reduce有些計算能夠指定並行度

三、數據序列化調優

數據格式優化,減小數據序列化開銷。

使用Kyro序列化類庫,該類庫效率高。

根據數據量選擇合適的序列話級別:小量數據能夠不序列化。

四、batch interval 週期優化

UI上的batch處理時間與週期進行比較。處理時間長考慮縮短處理時間或者增長週期時長。

時間至關最好。數據充分利用處理能力

五、內存調優

stream儘可能不須要放到硬盤上,畢竟stream的目的就是實時。

觀測內存夠用,不須要花費太長時間的GC,影響程序正常運行時間

離散流(DStreams):

來源:

從源中獲取的輸入流

輸入流經過轉換算子生成的處理後的數據流

特色:

連續

肯定時間間隔

與RDD:

由一系列連續的rdd組成

與Receiver:

每個輸入流DStream和一個Receiver對象相關聯

核佔用問題:

receiver佔用一個單獨的核心。

數據源類型:

Spark Streaming擁有兩類數據源:

基本源(Basic sources):

文件系統、套接字鏈接、Akka的actor等

高級源(Advanced sources):

Kafka,Flume,Kinesis,Twitter等

窗口計算:

參數:

窗口長度(windowDuration):窗口的持續時間

滑動的時間間隔(slideDuration):窗口操做執行的時間間隔。

注意:這兩個參數必須是源DStream的批時間間隔的倍數

常見問題:

窗口比較大的狀況如何處理?

輸出操做:

foreachRDD:

建立鏈接:

在每個rdd中建立 (序列化錯誤,鏈接對象在機器間不能傳送)

dstream.foreachRDD(rdd => { 建立鏈接

在每個元素中建立 (耗費資源)

dstream.foreachRDD(rdd => { rdd.foreach(record => { 建立鏈接

在每個rdd分區中建立 right!

dstream.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { 建立鏈接

轉換:

UpdateStateByKey

Transform

有狀態:

跨多個批,跨rdd的

Spark Streaming Checkpointing:

包括:

元數據,配置信息,操做集合,未完成的批,

什麼時候必須開啓checkpoint:

使用有狀態的transformation。

好比:updateStateByKey,reduceByKeyAndWindow

從運行應用程序的driver的故障中恢復過來。

使用元數據恢復處理信息。

怎樣配置checkpoint:

在容錯可靠的文件系統中設置checkpoint目錄:

getOrCreate建立StreamContext上下文。

應用程序的基礎設置設置driver重啓。

設置checkpoint間隔時間5-10秒。

Structured Streaming

參考:

http://vishnuviswanath.com/spark_structured_streaming.html Spark 結構化流之旅

https://ohmycloud.github.io/2018/11/27/a-tour-of-spark-structured-streaming/ A Tour of Spark Structured Streaming

https://www.infoq.cn/article/UEOq-ezu4ImwHxGiDFc8 是時候放棄 Spark Streaming,轉向 Structured Streaming 了 (TO READ !)

https://github.com/lw-lin/CoolplaySpark/blob/master/Structured%20Streaming%20源碼解析系列/4.2%20Structured%20Streaming%20之%20Watermark%20解析.md 解決了什麼問題(TO EXPLORE)

原理

用戶定義邏輯計劃,交給spark進行計劃的優化,執行。

偏移量追蹤+狀態管理

保證一次,不用使用者去維護sink去重邏輯。

幾個Time

EventTime: 事件被產生的時間,

ProcessingTime:事件被系統處理的時間

IngestionTime:事件被放入系統的時間

Watermark水印

參考: https://towardsdatascience.com/watermarking-in-spark-structured-streaming-9e164f373e9

https://github.com/lw-lin/CoolplaySpark/blob/master/Structured%20Streaming%20%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%E7%B3%BB%E5%88%97/4.2%20Structured%20Streaming%20%E4%B9%8B%20Watermark%20%E8%A7%A3%E6%9E%90.md 官方圖

定義

withWatermark(eventTime: String, delayThreshold: String): Dataset[T]

第一個參數:消息時間列,必須跟聚合groupBy的列同樣

第二個參數:閾值,有效消息的閾值單位能夠是秒,以窗口的開始時間爲基準

丟棄與更新

處理數據的時候,消息數據的範圍是兩次處理時間範圍。

可是收到的消息的eventTime有多是任什麼時候候,或者更老。

因此處理的時候要從全部消息中經過eventTime進行篩選,是滾動篩選的,

在這批數據中找到最大的eventTime,eventTIme-業務定義的更晚的時間值=本次會處理消息最小的時間(水印)。

這批數據纔是真的業務處理的數據。

若是一個事件落在水印內,更新;若是是更舊的數據,丟棄。

檢查消息是否被保留: max eventTime — delayThreshold > T

T : 當前窗口開始的時刻。

delayThreshold : 用戶設置的水印閾值

max eventTime: 系統處理全部事件的最大時刻.

一個窗口正在處理,若是有一個新事件進來,而且這個新事件的發生時間沒有超過期間區間(窗口開始時間+水印閾值),則更新查詢!

不然丟棄該事件。

舉例:

接收了一個10:00:07秒產生的消息, 水印是5秒, 窗口開始10:00:00 ,超過10:00:05,因此該消息被丟棄

注意:

原來的聚合狀態必須保存。(可是內存會變大)

窗口

窗口除了有時間特徵外,還有分組,使用groupBy實現。

事件產生的時間戳用來標識屬於哪一個窗口。

翻轉窗口Tumbling Window

不重疊,連續。

一個事件只屬於一個窗口。

屬於一種特殊的滑動窗口。

滑動窗口 Sliding Window

兩個窗口會重疊。

一個事件可能會屬於多個窗口。

舉例:

收集過去4秒汽車的平均速度?

代碼:

//a tumbling window of size 4 seconds

val aggregates = cars

.groupBy(window($"timestamp","4 seconds"), $"carId")

.agg(avg("speed").alias("speed"))

//a sliding window of size 4 seconds that slides every 2 seconds can be created using cars.groupBy(window($"timestamp","4 seconds","2 seconds"), $"carId")

代碼輸出:

Batch 1

[2018-01-21 00:50:00, 2018-01-21 00:50:04] car1 75.0

Batch 2

.....

SQLContext HiveContext

spark1.x時代的產物,演化到了spark2.0可能就沒用了

機器學習庫

Pipleline

做用:
    構建複雜機器學習工做流應用。
    
    是ML Lib的補充。
步驟:
    一、定義stage
        指標提取和轉換模型訓練等。
        
        主要是Transformer 和 Estimator。
        
        具體功能是:據集處理轉化,模型訓練,參數設置或數據預測
    二、建立pipleline
        組合器stage
    三、獲得piplelineModel
        具體應用
參考:
    https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice5/

PipelineStage

子類是Transformer  和 Estimator

Transformer

主要是用來把 一個 DataFrame 轉換成另外一個 DataFrame

子類有Model類。

api:
                http://spark.apache.org/docs/2.1.3/api/java/org/apache/spark/ml/Transformer.html

Estimator

評估器或適配器,主要是經過DataFrame訓練獲得一個Model(是Transformer的子類)。

子類有Predictor類。

api:
    http://spark.apache.org/docs/2.1.3/api/java/org/apache/spark/ml/Estimator.html

RandomForestClassifier例子

類關係:
    父類是Predictor,父類的父類是Estimator

內部方法:
    方法fit,裏面調用的是train訓練數據的方法。

返回:
    訓練結束會返回一個RandomForestClassificationModel 模型。

預測:
    返回的model能夠用來Transformer預測。

特徵處理

https://blog.csdn.net/xuejianbest/article/details/90769557
Spark提供的基礎特徵處理類之VectorAssembler和VectorIndexer

代碼

rdd

foreachPartition VS foreach

https://my.oschina.net/dongtianxi/blog/745908

用foreachPartition替代foreach,有助於性能的提升

coalesce VS repartition

http://www.javashuo.com/article/p-pahjohwo-mr.html

寫入mysql

https://bit1129.iteye.com/blog/2186405

最佳實踐:

應該在worker打開連接,能夠調用forEachPartition,在函數裏打開連接

一個分區打開一個數據庫連接,不要打開太多,減小連接數

儘可能使用批量

寫失敗的狀況,如何應對? 彈性

寫大數據集註意網絡超時

考慮使用數據庫鏈接池(一個分區一個連接夠了?就不須要鏈接池了吧)

連接共享數據庫?

dataset

explode

https://blog.csdn.net/macanv/article/details/78297150

df = df.select(functions.explode(functions.col("data"))).toDF("key", "value");

dataFrame

explode

http://www.javashuo.com/article/p-nemqtjow-gu.html

val dfScore = df.select(df("name"),explode(df("myScore"))).toDF("name","myScore")

ml

參考

https://www.cnblogs.com/code2one/p/9872241.html#collaborative-filtering-with-alternating-least-squares Spark之MLlib

als

https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala 官方例子

https://spark.apache.org/docs/latest/ml-collaborative-filtering.html 官方文章

https://blog.csdn.net/u011239443/article/details/51752904 深刻理解Spark ML:基於ALS矩陣分解的協同過濾算法與源碼分析

評估

http://www.javashuo.com/article/p-cfwmdueu-em.html spark機器學習庫評估指標總結

其它

內存溢出

spark 序列化

任務序列話

Core rdd 序列化

默認持久化級別是MEMORY_ONLY

stream rdd 序列話

默認級別: MEMORY_ONLY_SER,序列化佔用空間更小,減小GC

多語言庫以及版本

與hadoop的關係:
spark 使用了hadoop的存儲 調度 管理的平臺。
spark 惟一作的是替代hadoop的計算部分,也就是MapReduce部分。

spark與hadoop版本一致問題:
spark會跟着hadoop的版本走,hadoop是基礎。
spark發佈的時候會附帶着hadoop一塊兒打包發佈。

pyspark庫:
pyspark通過py4J經過nativeSocket的方式轉換成jvm可一運行的指令。
pyspark每每跟着spark運行環境版本一塊兒會發佈一個配套的包。

scala庫能夠先不用安裝spark:有一種方法不用安裝spark環境只須要引入一些庫就能夠寫spark程序:使用scala庫。

相關文章
相關標籤/搜索