Spark運行架構:sql
Spark運行架構包括集羣資源管理器(Cluster Manager)、運行做業任務的工做節點(Worker Node)、每一個應用的任務控制節點(Driver)和每一個工做節點上負責具體任務的執行進程(Executor)

與Hadoop MapReduce計算框架相比,Spark所採用的Executor有兩個優勢:
一是利用多線程來執行具體的任務(Hadoop MapReduce採用的是進程模型),減小任務的啓動開銷;
二是Executor中有一個BlockManager存儲模塊,會將內存和磁盤共同做爲存儲設備,當須要多輪迭代計算時,能夠將中間結果存儲到這個存儲模塊裏,下次須要時,就能夠直接讀該存儲模塊裏的數據,而不須要讀寫到HDFS等文件系統裏,於是有效減小了IO開銷;或者在交互式查詢場景下,預先將表緩存到該存儲系統上,從而能夠提升讀寫IO性能。
在Spark中,一個應用(Application)由一個任務控制節點(Driver)和若干個做業(Job)構成,一個做業由多個階段(Stage)構成,一個階段由多個任務(Task)組成。當執行一個應用時,任務控制節點會向集羣管理器(Cluster Manager)申請資源,啓動Executor,並向Executor發送應用程序代碼和文件,而後在Executor上執行任務,運行結束後,執行結果會返回給任務控制節點,或者寫到HDFS或者其餘數據庫中。

Spark的基本運行流程以下:
(1)當一個Spark應用被提交時,首先須要爲這個應用構建起基本的運行環境,即由任務控制節點(Driver)建立一個SparkContext,由SparkContext負責和資源管理器(Cluster Manager)的通訊以及進行資源的申請、任務的分配和監控等。SparkContext會向資源管理器註冊並申請運行Executor的資源;
(2)資源管理器爲Executor分配資源,並啓動Executor進程,Executor運行狀況將隨着「心跳」發送到資源管理器上;
(3)SparkContext根據RDD的依賴關係構建DAG圖,DAG圖提交給DAG調度器(DAGScheduler)進行解析,將DAG圖分解成多個「階段」(每一個階段都是一個任務集),而且計算出各個階段之間的依賴關係,而後把一個個「任務集」提交給底層的任務調度器(TaskScheduler)進行處理;Executor向SparkContext申請任務,任務調度器將任務分發給Executor運行,同時,SparkContext將應用程序代碼發放給Executor;
(4)任務在Executor上運行,把執行結果反饋給任務調度器,而後反饋給DAG調度器,運行完畢後寫入數據並釋放全部資源。

Spark運行架構具備如下特色:
(1)每一個應用都有本身專屬的Executor進程,而且該進程在應用運行期間一直駐留。Executor進程以多線程的方式運行任務,減小了多進程任務頻繁的啓動開銷,使得任務執行變得很是高效和可靠;
(2)Spark運行過程與資源管理器無關,只要可以獲取Executor進程並保持通訊便可;
(3)Executor上有一個BlockManager存儲模塊,相似於鍵值存儲系統(把內存和磁盤共同做爲存儲設備),在處理迭代計算任務時,不須要把中間結果寫入到HDFS等文件系統,而是直接放在這個存儲系統上,後續有須要時就能夠直接讀取;在交互式查詢場景下,也能夠把表提早緩存到這個存儲系統上,提升讀寫IO性能;
(4)任務採用了數據本地性和推測執行等優化機制。數據本地性是儘可能將計算移到數據所在的節點上進行,即「計算向數據靠攏」,由於移動計算比移動數據所佔的網絡資源要少得多。並且,Spark採用了延時調度機制,能夠在更大的程度上實現執行過程優化。好比,擁有數據的節點當前正被其餘的任務佔用,那麼,在這種狀況下是否須要將數據移動到其餘的空閒節點呢?
答案是不必定。由於,若是通過預測發現當前節點結束當前任務的時間要比移動數據的時間還要少,那麼,調度就會等待,直到當前節點可用。
Spark部署模式:
三種模式:
standalone,Spark on Mesos,Spark on YARN
S:不須要依賴其餘系統
M:資源調度管理框架,Spark充分支持,官方推薦
Y:與Hadoop統一部署,資源管理和調度依賴YARN,分佈式存儲依賴HDFS
用Spark架構同時知足批處理和流處理:

Spark核心 -- RDD
RDD設計背景:
提供一個抽象數據架構,避免中間結果存儲
RDD概念:
分佈式對象集合,本質上是隻讀的分區記錄集合
RDD操做:行動(返回非RDD),轉換(返回RDD)
典型執行過程:
1.RDD讀入外部數據
2.進行一系列「轉換」操做,產生不一樣的RDD
3.最後一個RDD通過「行動」,輸出到外部數據源
「行動」纔會真正發生計算,而「轉換」是記錄相互之間的依賴關係
當「行動」要進行輸出時,Spark根據RDD的依賴關係生成DAG,從起點開始計算
創建的「轉換」而後生成DAG圖稱爲一個「血緣關係」
血緣關係鏈接起來的RDD操做實現管道化,這就保證了中途不須要保存數據,而是直接管道式流入下一步
具體例子
val sc= new SparkContext(「spark://localhost:7077」,」Hello World」, 「YOUR_SPARK_HOME」,」YOUR_APP_JAR」)
// 建立SC對象
val fileRDD = sc.textFile(「hdfs://192.168.0.103:9000/examplefile」)
// 從HDFS讀取數據建立一個RDD
val filterRDD = fileRDD.filter(_.contains(「Hello World」))
// 「轉換」獲得一個新的RDD
filterRDD.cache()
// 採用cache接口把RDD保存在內存中,進行持久化
filterRDD.count()
// 「行動」,計算包含元素個數
以上執行流程:
1.建立sc對象
2.從外部數據源(HDFS)讀取並建立fileRDD對象
3.構建fileRDD和filterRDD的依賴關係,造成DAG圖(轉換軌跡)
4.觸發計算,把結果持久化到內存
RDD特性:
1.高效的容錯性,只須要記錄粗粒度的轉換,不需細粒度的日誌
2.中間結果持久化到內存
3.能夠存放Java對象
RDD依賴關係:
分爲窄依賴(一父對應一子分區,多子可對應一父)與寬依賴(一父對應多子)
窄依賴對應協同劃分(key落在同一子分區),寬依賴對應非協同劃分
這種依賴設計具備容錯性,加快了執行速度
窄依賴的恢復更高效,Spark還有數據檢查點和記錄日誌,在恢復時不需從頭開始
階段劃分:
在DAG反向解析,遇到窄依賴才把當前RDD加入當前階段
DAG劃分爲多個階段,每一個階段時任務集合,任務調度器把任務集合分配到executor
總體執行框圖:

兩種RDD建立方法:
1.經過外部數據集。本地、HDFS文件系統、HBase等外部數據源
2.調用SparkContext的parallelize方法,在Driver的存在集合上建立
準備:
打開hadoop的hdfs和spark
./sbin/start-dfs.sh
./bin/spark-shell
1.textFile()方法:
把文件的URL做爲參數,也就是地址
Val lines = sc.textFile(…)
生成的是一個String類型RDD,也就是RDD[String]
輸入參數能夠是文件名、目錄和壓縮文件
能夠輸入第二個參數來指定分區數,默認爲每一個block建立一個分區
2.經過並行集合建立RDD
parallelize方法
讀入數組/整數array,獲得RDD[Int]
RDD操做具體解釋:
轉換、行動
轉換:惰性求值,只記錄轉換軌跡
* filter(func):篩選出知足函數func的元素,並返回一個新的數據集
* map(func):將每一個元素傳遞到函數func中,並將結果返回爲一個新的數據集
* flatMap(func):與map()類似,但每一個輸入元素均可以映射到0或多個輸出結果
* groupByKey():應用於(K,V)鍵值對的數據集時,返回一個新的(K, Iterable)形式的數據集
* reduceByKey(func):應用於(K,V)鍵值對的數據集時,返回一個新的(K, V)形式的數據集,其中的每一個值是將每一個key傳遞到函數func中進行聚合
行動:真正的計算
* count() 返回數據集中的元素個數
* collect() 以數組的形式返回數據集中的全部元素
* first() 返回數據集中的第一個元素
* take(n) 以數組的形式返回數據集中的前n個元素
* reduce(func) 經過函數func(輸入兩個參數並返回一個值)聚合數據集中的元素
* foreach(func) 將數據集中的每一個元素傳遞到函數func中運行
惰性機制:經過map切分,接着才reduce處理
Filter 操做
RDD.filter()會遍歷RDD中每行文本,並執行括號內的匿名函數
RDD.filter().count()
括號內可填入line => line.contains(「」) 這種Lamda表達式
執行表達式前,把當前行賦值給line,再執行後面的邏輯,而後放入結果集
等全部行執行完,獲得結果集,最後才執行count行動操做
map與reduce操做
RDD.map()把每行都傳遞到括號內函數
Lines.map(line => line.split(「 「).size) 對每一行line先進行切分,獲得集合RDD[Array[String]],再求出集合中的個數,因此此步轉換成Int類型的RDD[Int]
Reduce.((a, b) => if (a > b) a else b)
計算出RDD[Int]中的最大值,每次比較中保留較大的數,並用到下一次的比較中,也就是後續每次比較只取一個新的數
持久化
因爲spark的惰性,每次行動都會從頭開始算,因此避免計算重複,引入持久化(緩存)
persist() 方法可用來標記RDD持久化,到第一次行動時,就會使標記的RDD真正持久化,持久化後可重複使用
用cache()方法會調用persist(MEMORY_ONLY),只使用內存。而persist(MEMORY_AND_DISK)還會在內存不足時放在硬盤上
分區
RDD分區原則:分區的個數儘可能等於集羣中的CPU核心(core)數目
用spark.default.parallelism 這個參數來配置默認分區數目
從HDFS讀取文件,分區數爲文件分片數
打印
rdd.foreach(println)或者rdd.map(println)
在集羣中操做時,用collect()方法,把全部worker節點的RDD都抓到Driver Program中才能把全部元素打印出,但可能會致使內存溢出
可用take()方法打印部分元素
經常使用的鍵值對RDD轉換操做:
包括reduceByKey()、groupByKey()、sortByKey()、join()、cogroup()等
reduceByKey(func)
功能:使用func函數合併具備相同鍵的值
reduceByKey((a, b) => a + b) 就是把相同鍵的值都加起來,因此a,b都表明值
groupByKey()
功能:對具備相同鍵的值進行分組,如有(hey, 1),(hey, 2) 則會生成(hey, (1, 2))
Keys/values
功能:返回鍵值對的鍵/值
sortedByKey()
功能:返回根據鍵排序的RDD
mapValues(func)
功能:只對value進行函數處理
join
就是鏈接,包括內連接(join)、左外鏈接(leftOuterJoin)、右外鏈接(rightOuterJoin)等
把兩個pairRDD的相同鍵對應的值,鏈接在一塊兒
好比第一個RDD有(「spark」, 1)、(「spark」, 2),第二個RDD有(「spark」, 「fast」),那麼join的結果是(spark, (1, fast))、(spark, (2, fast))
實例:
rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).mapValues(x => (x._1 / x._2)).collect()
求相同鍵對應值的平均數
1.第一步創建(x, 1),記錄鍵值對個數
2.把值和個數分別相加
3.把總值和總個數相除
Spark中的共享變量
Spark中的另外一個抽象:共享變量
知足多個任務之間、人物與控制節點之間共享變量
兩種類型:廣播變量(broadcast)、累加器(accumulators)
廣播變量:
在每臺機器上緩存一個只讀變量
val broadcastV = SparkContext.broadcast(v)
從一個普通變量v來建立一個廣播變量,在集羣中可免去重複分發v,一旦建立後,v的值不能修改
累加器:
用來實現計數器(counter)和求和(sum)
用SparkContext.longAccumulator(名稱)或SparkContext.doubleAccumulator()建立
用add方法把數值累加到累加器中,而後要經過控制節點使用value方法讀取
val accu = sc.longAccumulator
sc.parallelize(Array).foreach(x => accum.add(x))
accum.value
Spark SQL簡介 -- 核心爲DataFrame
Spark SQL優化了Shark(Hive on Spark)
僅依賴了HiveQL解析和Hive元數據
由Catalyst(函數式關係查詢優化框架)負責執行計劃
增長了SchemaRDD,後來變成了DataFrame
Spark SQL支持的數據格式和編程語言(上面爲語言 下面爲數據格式):

Spark能實現從MySQL到DataFrame的轉化,支持SQL查詢
DataFrame與RDD的區別:
RDD是分佈式的JAVA對象的集合,對具體對象的結構不瞭解
DataFrame以RDD爲基礎,是分佈式的Row對象集合,提供了詳細的schema(結構信息),清楚每一個列的信息
DataFrame也和RDD同樣有「惰性」機制,生成DAG圖到最後才計算
獲得DataFrame的兩種方法:
直接建立:
在json文件中讀取數據並建立DataFrame(記得先開了Hadoop再開Spark-shell!!)
1.導入org.apache.spark.sql.SparkSession
2.建立對象,builder()、getOrCreate()
3.使支持RDD轉換爲DataFrames及後續sql操做 import spark.implicits._
4.讀取文件 read,獲得DataFrames
5.各類經常使用操做 show、printSchema(打印模式)、select(選擇某些列)、filter(過濾)、groupBy(分組)、sort(排序 desc倒序)、select as(重命名)後面幾個操做都要接show()
RDD轉換:
兩種方法:1.利用反射推斷RDD的schema 2.使用編程接口構造一個schema
1、利用反射推斷RDD,定義case class,隱式轉成DataFrame:
1.導包:org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
org.apache.spark.sql.sql.Encoder
spark.implicits._ 支持RDD隱式轉成DataFrame
2.建立case class
3.sc對象,textFile()導入文件,map()轉換文件並定義DF,toDF()建立了DataFrame
4.註冊臨時表,DF.createOrReplaceTempView(「」)
5.用spark.sql()引入sql語句可從臨時表生成另外一DataFrame,最終經過Show()顯示
2、沒法提早定義case class時,採用編程方式定義RDD模式:
1.導包:org.apache.spark.sql.types._
org.apache.spark.sql.Row
2.導入文件生成RDD(sc的textFile)
3.定義模式Schema字符串
4.根據字符串生成模式:split()、map()方法生成StructField,而後StructType()轉化成schema
5.生成rowRDD,而後經過rowRDD和schema用createDataFrame()方法創建關係,並生成DataFrame