Spark入門階段一之掃盲筆記

介紹

spark是分佈式並行數據處理框架
與mapreduce的區別:
mapreduce一般將中間結果放在hdfs上,spark是基於內存並行大數據框架,中間結果放在內存,對於迭代數據spark效率更高,mapreduce老是消耗大量時間排序,而有些場景不須要排序,spark能夠避免沒必要要的排序所帶來的開銷,spark是一張有向無環圖,spark支持scala,python,java等
適用範圍:
spark更適合於迭代雲端比較多的ml和dm運算,由於spark裏面有rdd的抽象概念,spark比hadoop更通用,spark提供的數據集操做類型有不少,不像hadoop只提供map和reduce倆種操做,好比map,filter,flatmapt,sample,groupbykey,reducebykey,union,join,cogroup,mapvalues,sort,partionby等多種操做類型,spark
把這些操做稱爲transformations,同時還提供count,collect,reduce,lookup,save等多種action操做。這些多種多樣的數據集操做類型,給開發上層應用的用戶提供了方便,各個處理節點之間的通訊模型不在像hadoop那樣就是惟一的data shuffle一種模式,用戶能夠明明,物化,控制中間結果的存儲,分區等,能夠說編程模型比hadoop更靈活。 html

spark是基於內存的迭代計算框架,使用與須要屢次操做特定數據集的應用場合,須要反覆操做的次數越多,所須要讀取的數據量越大,受益越大,數據量小可是計算密集度較大的場合,受益就相對較小. 不過因爲rdd的特性,spark不適用那種一部細粒度更新狀態的應用,例如web服務的存儲或者增量的web爬蟲和索引,就是對於那種增量修改的應用模型不合適。 java

spark和hadoop的結合:
spark能夠直接對hdfs進行數據的讀寫,一樣支持spark on yarn。spark能夠與mapreduce運行於同集羣中,共享存儲資源與計算,數據倉庫shark實現上借用hive,幾乎和hive徹底兼容。 node

四種spark運行模式,local模型用於測試開發,standlone 獨立集羣模式,spark on yarn spark在yarn上 ,spark on mesos spark在mesos上。python

應用:
企業大數據應用: 1,count 平均值 2.分類,對比 3.趨勢,統計分析 4,精準預測 人工智能
行業大數據案例:電商,傳媒,能源,交通mysql

spark生態系統介紹:
spark 能夠很容易和yarn結合,直接調用HDFS、Hbase上面的數據,和hadoop結合。
spark核心部分分爲RDD。Spark SQL、Spark Streaming、MLlib、GraphX、Spark R等核心組件解決了不少的大數據問題 程序員

Spark分爲driver和executor,driver提交做業,executor是application早worknode上的進程,運行task,driver對應爲sparkcontext。Spark的RDD操做有transformation、action。Transformation對RDD進行依賴包裝,RDD所對應的依賴都進行DAG的構建並保存,在worknode掛掉以後除了經過備份恢復還能夠經過元數據對其保存的依賴再計算一次獲得。看成業提交也就是調用runJob時,spark會根據RDD構建DAG圖,提交給DAGScheduler,這個DAGScheduler是在SparkContext建立時一同初始化的,他會對做業進行調度處理。當依賴圖構建好之後,從action開始進行解析,每個操做做爲一個task,每遇到shuffle就切割成爲一個taskSet,並把數據輸出到磁盤,若是不是shuffle數據還在內存中存儲。就這樣再往前推動,直到沒有算子,而後運行從前面開始,若是沒有action的算子在這裏不會執行,直到遇到action爲止纔開始運行,這就造成了spark的懶加載,taskset提交給TaskSheduler生成TaskSetManager而且提交給Executor運行,運行結束後反饋給DAGScheduler完成一個taskSet,以後再提交下一個,當TaskSet運行失敗時就返回DAGScheduler並從新再次建立。一個job裏面可能有多個TaskSet,一個application可能包含多個job。web

一、shark介紹:
shark基本上就是spark的框架基礎上提供和hive同樣的hivesql命令接口,爲了最大程度的保持和hive的兼容性,shark使用hive的api來實現query parsing和logic plan generation,最後的physicalplan execution階段用spark代替hadoop mapreduce,用過配置shark參數,shark能夠自動在內存中緩存特定的rdd,實現數據重用,進而加快特定數據集的檢索,同時,shark經過udf用戶自定義函數實現特定的數據分析學習算法,使得sql數據查詢和運算分析能結合在一塊兒,最大化rdd的重複使用。算法

二、spark streaming介紹:
Spark Streaming 是 Spark 提供的對實時數據進行流式計算的組件,通常與kafka結合,基本的原理是將stream數據分紅小的時間片斷,以相似batch批量處理的方式來處理這些小部分數據。spark streaming構建在spark上,一方面是由於spark的低延遲執行引擎能夠用於實時計算,此外小批量的處理方式使得他能夠同時兼容批量和實時數據處理的邏輯和算法,方便了一些須要歷史數據和實時數據聯合分析的特定應用場景。
Spark Streaming也有一個StreamingContext,其核心是DStream,是經過以組時間序列上的連續RDD來組成的,包含一個有Time做爲key、RDD做爲value的結構體,每個RDD都包含特定時間間隔的數據流,能夠經過persist將其持久化。在接受不斷的數據流後,在blockGenerator中維護一個隊列,將流數據放到隊列中,等處理時間間隔到來後將其中的全部數據合併成爲一個RDD(這一間隔中的數據)。其做業提交和spark類似,只不過在提交時拿到DStream內部的RDD併產生Job提交,RDD在action觸發以後,將job提交給jobManager中的JobQueue,又jobScheduler調度,JobScheduler將job提交到spark的job調度器,而後將job轉換成爲大量的任務分發給spark集羣執行。sql

三、Graphx
主要用於圖的計算。核心算法有PageRank、SVD奇異矩陣、TriangleConut等。shell

四、Spark SQL
是Spark新推出的交互式大數據SQL技術。把sql語句翻譯成Spark上的RDD操做能夠支持Hive、Json等類型的數據。

五、Spark R
經過R語言調用spark,目前不會擁有像Scala或者java那樣普遍的API,Spark經過RDD類提供Spark API,而且容許用戶使用R交互式方式在集羣中運行任務。同時集成了MLlib機器學習類庫。

六、MLBase
從上到下包括了MLOptimizer(給使用者)、MLI(給算法使用者)、MLlib(給算法開發者)、Spark。也能夠直接使用MLlib。ML Optimizer,一個優化機器學習選擇更合適的算法和相關參數的模塊,還有MLI進行特徵抽取和高級ML編程 抽象算法實現API平臺,MLlib分佈式機器學習庫,能夠不斷擴充算法。MLRuntime基於spark計算框架,將Spark的分佈式計算應用到機器學習領域。MLBase提供了一個簡單的聲明方法指定機器學習任務,而且動態地選擇最優的學習算法。

七、Tachyon
高容錯的分佈式文件系統。宣稱其性能是HDFS的3000多倍。有相似java的接口,也實現了HDFS接口,因此Spark和MR程序不須要任何的修改就能夠運行。目前支持HDFS、S3等。

什麼是rdd:

rdd是spark最基本,也是最根本的數據抽象,RDD表示分佈在多個計算節點上的能夠並行操做的元素集合,rdd是隻讀的,分區記錄的集合。
rdd支持兩種操做,1,轉換從現有的數據集建立一個新的數據集,2,動做 在數據集上運行計算後,返回一個值給驅動程序,例如,map就是一種轉換,他將數據集每個元素都傳遞給函數,並返回一個新的分佈數據集表示結果,另外一個方面,reduce是一個動做,經過一些函數將全部的元組疊加起來,並將結果返回給driver程序,spark中的全部轉換都有惰性的,也就是說,他們並不會直接計算結果,相反的,他們只是記住應用哦個到基礎數據集上的這些轉換動做,例如,咱們能夠實現,經過map建立的一個新數據集,並在reduce使用,最終只返回reduce的結果給driver,而不是整個大的新數據集。默認狀況下,每一個轉換過的rdd都會在你在他之上執行一個動做時被從新計算,不過,你也可使用persist方法,持久話一個rdd在內存中,在這種狀況下,spark將會在集羣中,保存相關元素,下次你查詢這個rdd是,他將能更快訪問,在磁盤上持久化數據集,或在集羣間賦值數據集也是支持的。除了這些操做外,用戶還能夠請求將rdd緩存起來,並且,用戶還能夠經過partitioner類獲取rdd的分區順序,而後將另外一個rdd按照一樣的方式分區。

如何操做rdd?
一、如何獲取rdd 1,從共享的文件系統獲取,hdfs,2.經過已存在的rdd轉換 3.將已存在的scala集合並行化,經過調用sparkcontext的parallelize方法實現 4.改變現有rdd的之久性,rdd是懶散,短暫的
二、操做rdd的倆個動做,1,actions:對數據集計算後返回一個數值value給驅動程序,例如redue將數據集的全部元素用某個函數聚合後,將最終結果返回給程序,2.transformation 根據數據集建立一個新的數據集,計算後返回一個新rdd;例如map將數據的每一個元素講過某個函數計算後,返回一個姓的分佈式數據集。

actions具體內容:

  • reduce(func)經過函數func彙集數據集中全部元素,func函數接受2個參數,返回一個值,這個函數必須是關聯性的,確保能夠被正確的併發執行。

  • collect() 在driver的程序中,以數組的形式,返回數據集的全部元素,這一般會在使用filter或者其餘操做後,返回一個縱溝小的數據本身在使用,直接將整個rdd集coloect返回,極可能會讓driver程序oom。

  • count() 返回數據集的元素個數

  • take(n) 返回一個數組,用數據集的前n個元素組成,注意,這個操做目前並不是在多個節點上,並行執行,而是driver程序所在機制,單機計算全部的元素:注;gateway的內存壓力會增大,須要謹慎使用

  • first()返回數據集的第一個元素

  • saveAsTextFile(path) 將數據集的元素,以txtfile的形式,保存到本地文件系統,hdfs或者其餘hadoop支持的文件系統,spark將會調用每一個元素的tostring方法,並將他轉換成文件中一行文本。

  • saveAsSequenceFile(path)將數據集的元素,以sequencefile的格式,到指定的目錄下,本地系統,hdfs或者其餘hadoop支持的文件系統,rdd的元組必須有key-value對組成,並都實現了hadoop的- writable接口或隱式能夠轉換爲wirtable

  • foreach(func)在數據集的每一個元素上,運行函數func,這一般用於更新一個累加器變量,或者和外部存儲系統作交互。直接使用 rdd.foreach(println) 在local模式下是可行的,可是在cluster模式下是不行的,必需要執行collect()方法,將全部的數據拉取到本地,而後執行foreach()操做。若是是數據量比較小的話可使用take方法,rdd.take(100).foreach(println)

transformation具體內容:

  • map(func) 返回一個新的分佈式數據集,有每一個原元素通過func函數轉換後組成

  • filter(func) 返回一個新的數據集,有通過func函數後返回值爲true的原元素組成

  • flatmap(func)相似於map 可是每個輸入元素,會被映射0到多個輸出元素,所以func函數的返回值是一個seq,而不是單一元素

  • sample(withReplacement,frac,seed) 給定的隨機種子seed,隨機抽樣出數量爲frac的數據

  • union(otherdataset)返回一個新的數據集,由原數據集和參數聯合而成

  • intersection : 只返回兩個RDD中都有的元素,intersecton()在運行時會去掉全部重複的元素(單個RDD內重複元素也會一塊兒移除)。 須要經過網絡混洗來發現共有數據。

  • distinct : 生成一個只包含不一樣元素的新RDD。須要注意:distinct() 操做的開銷很大,由於它須要將全部數據經過網絡進行混洗(shuffle),以確保每一個元素只有一份。

  • subtract : 接受另外一個RDD做爲參數,返回一個由只存在在第一個RDD而不存在第二個RDD中的全部元素組成的RDD。 須要數據混洗。

  • cartesian : 返回全部可能的(a,b)對,其中a是源RDD中的元素,b是另外一個RDD中的元素。

  • groupbykey(【num tasks】)在一個有kv對組成的數據集上調用,返回一個k,seq【v】對的數據集,注意,默認狀況下,使用8個並行任務進行分組,你能夠傳入num task可選參數,根絕數據量設置不一樣數目的task

  • reducebykey(func,【num tasks】)在一個kv對的數據集上使用,返回一個kv的數據集,key相同的值都被使用指定的reduce函數聚合在一塊兒,和groupbykey相似,任務個數是第二個參數來配置

  • join(otherdataset,【num tasks】)在類型kev和kw類型的數據集上調用,返回一個k(v w)對,每一個key中全部元素都在一塊兒的數據集

  • groupwith(otherdataset,【num tasks】)在類型爲kv和kw類型的數據集上調用,返回一個數據集,組成元組爲k seq【v】seq[w]tuples ,這個在其餘框架稱爲cogroup

  • cartesian(otherdataset) 笛卡兒積,但在數據集t和u調用是,返回一個tu對的數據集,全部元素交互進行笛卡兒積。

持久化(緩存)

  • persist()

  • cache()

基本開發思路

每一個saprk應用都有一個驅動器程序來發起集羣上的各類並行操做。驅動器程序經過一個SparkContext對象來訪問Spark。這個對象表明對計算集羣的一個鏈接。
一旦有了SparkContext,你就能夠用它來建立RDD。要執行這些操做,啓動器程序通常要管理多個執行器(executor)節點。
能夠先經過SparkConf對象來配置你的應用,而後基於這個SparkConf建立一個SparkContext對象。
建立SparkConf的基本方法,傳遞兩個參數:
一、集羣URL:告訴Spark如何鏈接到集羣上。
二、應用名:當鏈接到一個集羣式,這個值能夠幫助你在集羣管理器的用戶界面中找到你的應用。

關閉Spark:調用SparkContext的stop()方法。或直接退出應用。(system.exit(0)/sys.exit())
在Spark中,對數據的全部操做不外乎是: 建立RDD、 轉化已有的RDD、調用RDD操做進行求值
Spark中的RDD是一個不可變的分佈式對象集合。每一個RDD都被分爲多個分區,這些分區運行在集羣中的不一樣節點上。
當咱們調用一個新的行動操做時,整個RDD都會從頭開始計算。要避免這種行爲,用戶能夠將中間結果持久化。

demo(Python版)

一、初始化sparkcontext

from pyspark import SparkConf, SparkContxt
conf = SparkConf().setMaster("local").setAppName("my app")
sc = SparkContext(conf=conf)

# 關閉鏈接
sc.stop()

二、RDD編程

# 從文件讀取數據
line = sc.textFile("README.md")
# parallelize 方法
line = sc.parallelize(['pandas','i like pandas'])


inputRDD = sc.textFile('log.txt')
errRDD = inputRDD.filter(lambda x:'error' in x)
warnRDD = inputRDD.filter(lambda x:'warning' in x)
bindRDD = errRDD.union(warnRDD)


bindRDD.count()
bindRDD.take(10)
# 返回所有數據集
bindRDD.collect()


# lambda 函數
word = rdd.filter(lambda s:'python' in s)
# def 定義的函數
def containsErr(s):
    return 'error' in s
word = rdd.filter(containsErr)

2.一、RDD常見轉換操做
以 rdd={1,2,3,3} 爲例的轉換操做

# 將函數應用與RDD中的每一個元素,將返回值構建新的RDD
rdd.map(x => x+1)

# 將函數應用用RDD中的每一個元素,將返回的迭代器的全部內容構成新的RDD。一般用於切分單詞。
rdd.flatMap(x=>x.to(3))  --> {1,2,3,2,3,3,3})

# 返回一個由經過傳給filter()的函數的元素組成的RDD
rdd.filter(x=>x!=1)  -->  {2,3,4}

# 去重
rdd.distinct()  -->  {1,2,3}

sample(withReplacement,fraction,[seed])
# 對RDD進行採樣,以及是否替換
rdd.sample(false,0.5)   -->  非肯定的

以{1,2,3}和{3,4,5}的RDD轉換操做

# 求並集
rdd.union(other)  --> {1,2,3,4,5}

# 求交集
rdd.intersection(other)  --> {3}

# 移除一個RDD中的內容,至關於減去一個交集
rdd.subtract(other)  -->  {1,2}

# 與另外一個RDD的笛卡爾積
rdd.cartesian(other)  --> {(1,3),(1,4)...(3,5)}

2.二、RDD常見行動操做
以{1,2,3,3}爲列說明常見行動操做

# 返回RDD中全部的元素
rdd.collect()  --> {1,2,3,3}

# 計數
rdd.count()

# 各元素在RDD中出現的次數
rdd.countByValue()   --> {(1,1),(2,1),(3,2)}

take(num)
# 返回前n元素

top(n)
# 排序後的前n個元素

# 按照指定順序,從rdd中返回前n個元素
rdd.takeOrdered(2)(myOrdering)   --> {3,3}

takeSample(withReplacement,num,[seed])
# 從RDD中返回任意一些元素
rdd.takeSample(false,1)  --> 非肯定的

# 並行整合rdd中全部的數據,好比sum
rdd.reduce((x,y)=>x+y)  --> 9

fold(zero)(func)
# 和reduce()同樣,可是須要提供初始值
rdd.fold(0)((x,y)=>x+y) --> 9

aggregate(zeroValue)(seqOp,combOp)
# 和reduce相似,可是一般返回不一樣類型的函數
aggregate((0,0))((x,y)=>(x._1+y,x._2+1),
                 (x,y)=>(x._1+y._1,x._2+y._2) )  --> 9

# 對RDD中的每一個元素使用給定的函數
rdd.foreach(func)

2.三、持久化緩存

from pyspark.storage import StorageLvel
rdd.presist(StoragLevel.DISK_ONLY)

RDD.cache()

# 緩存的級別
# MEMORY_ONLY
# MEMORY_ONLY_SER
# MEMORY_AND_DISK  # 若是內存放不下,則溢出寫到磁盤上
# MEMORY_AND_DISK_SER  # 若是內存放不下,則溢出寫到磁盤上,在內存中存放序列化後的數據
# DISK_ONLY

# 移除緩存
RDD.unpersist()

三、鍵值對操做

# 以{(1,2),(3,4),(3,6)}爲例

# 合併具備形同鍵的值
rdd.reduceByKey((x,y)=>x+y)  -->{(1,2),(3,10)}

# 對具備相同鍵的值分組
rdd.groupByKey()  --->{(1,[2]),(3,[4,6])}

combineByKey(createCombiner,mergeValue,mergeComBiners,partitioner)
# 使用不一樣的返回類型合併具備相同鍵的值。有多個參數分別對應聚合操做的各個階段,於是很是適合用來解釋聚合操做各個階段的功能劃分。
# 下面是求每一個鍵的平均值
sumCount=num.combineByKey((lambda x:(x,1)),
                           (lambda x,y:(x[0]+y,x[1]+1)),
                           (lambda x,y:(x[0+y[0],x[1]+y[1]])))
sumCount.map(lambda key,xy:(key,xy[0]/xy[1])).collectAaMap()

# 對pairRDD的每一個值應用一個函數而不改變鍵
rdd.mapVlues(x=>x+1)

# 對pairRDD的每一個值應用一個返回迭代器的函數,而後對返回的每一個元素都生成一個對應原鍵的鍵值對記錄,一般用於符號化
rdd.flatMapValues(x=>(x to 5))  -->{(1,2),(1,3),(1,4),(1,5),(3,4),(3,5)}

# 返回一個僅含有鍵的RDD
rdd.keys()   ->{1,3,3}

# 返回一個僅包含值的RDD
rdd.values()   -->{2,4,6}

# 返回一個根據鍵排序的RDD
rdd.sortByKey(ascending=True)  -->{(1,2),(3,4),(3,6)}

3.一、兩個鍵值對RDD的轉換操做

# 以rdd={(1,2),(3,4),(3,6)} other={(3,9)} 爲例

# 刪除rdd中鍵與other中鍵相同的元素
rdd.subtracByKey(other)  --> {(1,2)}

# 對兩個rdd內連接
rdd.join(other)   --> {(3,(4,9)),(3,(6,9))}

# 對兩個rdd進行鏈接操做,確保第一個rdd中的鍵必須存在(右外連接)
rdd.rightOuterJoin(other)   --> {(3,(some(4),9)),(3,(some(6),9))}

# 對兩個rdd進行鏈接操做,確保第二個rdd中的鍵必須存在(左外鏈接)
rdd.leftOuterJoin(other)  --> {(1,(2,None)),(3,(4,some(9))),(3,(6,some(9)))}

# 將兩個rdd中擁有相同鍵的數據分組到一塊兒
rdd.congroup(other)  --> {(1,([2],[])),(3,([4,6],[9]))}

3.二、鍵值對Pair RDD的行動操做

# 以 rdd={(1,2),(3,4),(3,6)} 爲例

# 對每一個鍵對應的元素分別計數
rdd.countByKey()  --> {(1,1,),(3,2)})

# 將結果以映射表的形式返回,以便查詢
rdd.collectAsMap()  --> Map{(1,2),(2,6)}

# 返回給定鍵對應的全部值
rdd.lookup(3)   --> [4,6]

四、並行度調優
每一個rdd都有固定數目的分區,分區數決定了在rdd上執行操做的並行度。 大多數操做符都能接受第二個參數,用來指定分組結果或者聚合結果的rdd的分區數。
好比 sc.parallelize(data).reduceByKey(lambda x,y:x+y,10) 指定分區數10
查看分區數 rdd.partitions.size或rdd.getNumPartitions ,改變分區的方法repartition()

五、數據讀取與保存
讀取txt文件,輸入的每一行都會成爲RDD的一個元素。

# 讀取文件
input=sc.textFile("file:///home/holden/README.md")
# 保存文件
result.saveAsTextFile(outputFile)

讀取json

# 將json文件的每一行假設爲一條記錄來處理
import json
data = input.map(lambda x:json.load(x))
# 寫
(data.filter(lambda x:x[lovesPandas"]).map(lambda x:json.dumps(x)).saveAsTextFile(outputFile))

讀取csv,一樣是將讀取的文本的每一行當作一條記錄

import csv
from io import StringIO
def loadRecord(line):
    """解析一行csv記錄"""
    input = StringIO(line)
    reader = csv.DictReader(input,filednames=["name","favouriteAnimal"])
    return reader.next()
input = sc.textFile(inputFile).map(loadRecord)

# 保存csv
def writeRecords(records):
    """寫出一些csv記錄"""
    output = StringIO()
    writer = csv.DictWriter(output,fieldnames=["name","favoriteAnimal"])
    for record in records:
        writer.writerow(record)
    return [output.getvalue()]
pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)

讀取SequenceFile
Hadoop輸入輸出格式
關係型數據庫
HBase

六、Spark進階編程
6.一、兩種類型的共享變量

  • 累加器(qccumulator):用於對信息聚合,提供了將工做節點中的值聚合到驅動器程序中的簡單語法。

  • 廣播變量(broadcast variable):用來高效分發較大的對象,讓程序高效地向全部工做節點發送一個較大的值,以供一個或多個spark操做使用。

# 在python中累加空行,使用了累加器
file = sc.textFile(inputFile)
# 建立累加器並初始化爲0
blankLine=sc.accumulator(0)

def extractCallSigs(line):
    global blankLine  # 訪問全局變量
    if (line==""):
        blankLine+=1
    return line.split(" ")

callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile(output)


# 使用廣播變量查詢國家
# 查詢rdd中呼叫號對應的位置,將呼號前綴讀取爲國家代碼來進行查詢
signPrefixes = sc.broadcast(loadCallSignTable())   # 廣播變量

def processSignCount(sign_count,signPrefixes):
    country=lookupCountry(sign_count[0],signPrefixes.value)
    count = sign_count[1]
    return (country,count)

countryContactCounts=(contactCounts.map(processSignCount).reduceByKey((lambda x,y:x+y)))
countryContactCounts.saveAsTextFile(output)

基於分區進行操做
spark提供基於分區的map和foreach,使部分代碼只對rdd的每一個分區運行一次,能夠幫助下降這些操做的代價。

# 按照分區執行的操做符

mapPartitions()
# 參數:該分區中元素的迭代器。返回:元素的迭代器
# 對於RRD[T]的函數簽名 :f:(iterator[T])  --> iterator[U]

mapPartitionsWithIndex()
# 參數:分區序號,以及每一個分區中的元素的迭代器。返回:元素的迭代器
# 對於RRD[T]的函數簽名 :f:(int,iterator[T])  --> iterator[U]

foreachPartitions()
# 參數:元素迭代器。返回:無
# 對於RRD[T]的函數簽名 :f:(iterator(T))  -->Unit

數值RDD的操做

count()
# RDD中元素個數
mean()
# 元素平均值
sum()
# 
max()
min()
variance()  # 方差
sampleVariance()  # 從採樣中計算出的方差
stdev()  # 標準差
sampleStdev()   # 採用的標準差

七、基於MLlib的機器學習

# 邏輯迴歸的垃圾郵件分類
from pyspark.mllib.regression import LabeldPoint
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.classification import LogisticRegressionWithSGD

spam=sc.textFile('spam.txt')
normal = sc.textFile('normal.txt')

# 建立一個HashingTF實例來把郵件文本映射爲包含10000個特徵的向量
tf=HashingTF(numFeatures=10000)
# 各郵件都切分爲單詞,每一個單詞映射爲一個特徵
spamFeatures = spam.map(lambda email: tf.transForm(email.split(' ')))
normalFeatures = normal.map(lambda email: tf.transform(email.split(' ')))

# 建立LabelPoint數據集分別存放陽性(垃圾郵件)和陰性(正常郵件)的例子
positiveExample = spamFeatures.map(lambda features:LabeldPoint(1,features))
negativeExamples = normalFeatures.map(lambda features:labeldPoint(0,features))
trainingData = positiveExample.union(negativeExample)
trainingData.cache()  # 由於邏輯迴歸是迭代算法,因此須要緩存訓練數據RDD

# 使用SGD算法
model = LogisticRegressionWithSGD.train(trainningData)

# 以陽性和陰性的例子分別測試。
# 首先用同樣的HashingTF特徵來獲得特徵向量,而後對該向量應用獲得的模型
posTest = tf.transform("O M G GET cheap stuff by sending money to ...".split(' '))
negTest = tf.transform("Hi Dad, i started studying spark the other ...".split(' '))
print( "predict for postive test example:%g" % model.predict(posTest))
print( "predict for negative test example:%g" % model.predict(negTest))

MLlib包含一些特有的數據類型,對於Scala和Java,它們位於org.apache.spark.mllib下,對於Python則是位於pyspark.mllib下。

入門:

spark有兩個重要的抽象:

  • RDD,分佈式彈性數據集,他是一個跨越多個節點的分佈式集合。

  • 另外一個抽象是共享變量。spark支持兩種類型的共享變量:一個是廣播(broadcast variables)他能夠緩存一個值在集羣的各個節點。另外一個是累加器(accumulators)他只能執行累加的操做,好比能夠作計數器和求和。

初始化 Spark
在一個Spark程序中要作的第一件事就是建立一個SparkContext對象來告訴Spark如何鏈接一個集羣。爲了建立SparkContext,你首先須要建立一個SparkConf對象,這個對象會包含你的應用的一些相關信息。這個一般是經過下面的構造器來實現的:
new SparkContext(master, appName, [sparkHome], [jars])
參數說明:

  • master:用於指定所鏈接的 Spark 或者 Mesos 集羣的 URL。

  • appName :應用的名稱,將會在集羣的 Web 監控 UI 中顯示。

  • sparkHome:可選,你的集羣機器上 Spark 的安裝路徑(全部機器上路徑必須一致)。

  • jars:可選,在本地機器上的 JAR 文件列表,其中包括你應用的代碼以及任何的依賴,Spark 將會把他們部署到全部的集羣結點上。
    在 python 中初始化,示例代碼以下:

//conf = SparkContext("local", "Hello Spark")
conf = SparkConf().setAppName("Hello Spark").setMaster("local")
sc = SparkContext(conf=conf)

說明:若是部署到集羣,在分佈式模式下運行,最後兩個參數是必須的,第一個參數能夠是如下任一種形式:
Master URL 含義

  • local 默認值,使用一個 Worker 線程本地化運行(徹底不併行)

  • local[N] 使用 N 個 Worker 線程本地化運行,N 爲 * 時,表示使用系統中全部核

  • local[N,M] 第一個表明的是用到的核個數;第二個參數表明的是允許該做業失敗M次

  • spark://HOST:PORT 鏈接到指定的 Spark 單機版集羣 master 進程所在的主機和端口

  • mesos://HOST:PORT 鏈接到指定的 Mesos 集羣。host 參數是Moses master的hostname。端口默認是5050
    若是你在一個集羣上運行 spark-shell,則 master 參數默認爲 local。在實際使用中,當你在集羣中運行你的程序,你通常不會把 master 參數寫死在代碼中,而是經過用 spark-submit 運行程序來得到這個參數。可是,在本地測試以及單元測試時,你仍須要自行傳入 local 來運行Spark程序。

運行代碼有幾種方式,一是經過 spark-shell 來運行 scala 代碼,一是編寫 java 代碼並打成包以 spark on yarn 方式運行,還有一種是經過 PySpark 來運行 python 代碼。

在 spark-shell 和 PySpark 命令行中,一個特殊的集成在解釋器裏的 SparkContext 變量已經創建好了,變量名叫作 sc,建立你本身的 SparkContext 不會起做用。

<dependencies>
 <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>2.1.1</version>
  </dependency>
  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
    <scope>test</scope>
  </dependency>
</dependencies>

建立一個簡單的spark程序:

public class SimpleApp {
  public static void main(String[] args) {
      // 文件路徑
      String logFile = "/home/wm/apps/spark-1.4.0-bin-hadoop2.6/README.md";
      SparkConf conf = new SparkConf().setAppName("Simple Application").setMaster("local");
      JavaSparkContext sc = new JavaSparkContext(conf);
      JavaRDD<String> logData = sc.textFile(logFile).cache();
      @SuppressWarnings("serial")
      long numAs = logData.filter(new Function<String, Boolean>() {
          public Boolean call(String s) throws Exception {
              return s.contains("a");
          }

      }).count();
      @SuppressWarnings("serial")
      long numBs = logData.filter(new Function<String, Boolean>() {

          public Boolean call(String s) throws Exception {
              return s.contains("b");
          }

      }).count();
      System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
      sc.close();
  }
}

Spark的核心就是圍繞着RDD,它是一個自動容錯的分佈式數據集合。他有兩種方式建立,第一種就是在驅動程序中對一個集合進行並行化。第二種是來源於一個外部的存儲系統。好比:共享系統、HDFS、HBase或者任何提供任何Hadoop 輸入格式的數據源。

第一種:Parallelized Collections 建立這個集合須要調用那個JavaSparkContext的parallelize方法來初始化一個已經存在的集合。

List<Integer> data = Arrays.asList(1,2,3,4,5);
JavaRDD<Iteger> distData = sc.parallelize(data);

這就建立了一個並行的集合,在這個集合上能夠執行 distData.reduce((a, b) -> a + b)
在並行數組中一個很重要的參數是partitions,它來描述數組被切割的數據集數量。Spark會在每個partitions上運行任務,這個partitions會被spark自動設置,通常都是集羣中每一個CPU上運行2-4partitions,可是也能夠本身設置,能夠經過parallelize (e.g. sc.parallelize(data, 10)),在有些地方把partitions成爲 slices。

第二種:External Datasets

JavaRDD distFile = sc.textFile("data.txt");

textFile也能夠設置partitions參數,通常都是一個block一個partitions,可是也能夠本身設置,本身設置必需要不能少於block的數量。
針對Hadoop的其餘輸入格式,你能用這個JavaSparkContext.hadoopRDD方法,你須要設置JobConf和輸入格式的類。也可使用JavaSparkContext.newAPIHadoopRDD針對輸入格式是基於「new」的MapReduceAPI

demo(python) 分析 Nginx 日誌中狀態碼出現次數

先將測試數據上傳到 hdfs:
$ hadoop fs -put access.log
而後,編寫一個 python 文件,保存爲 SimpleApp.py:

from pyspark import SparkContext

logFile = "access.log"

sc = SparkContext("local", "Simple App")

rdd = sc.textFile(logFile).cache()

counts = rdd.map(lambda line: line.split()[8]).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey(lambda x: x) 

# This is just a demo on how to bring all the sorted data back to a single node.  
# In reality, we wouldn't want to collect all the data to the driver node.
output = counts.collect()  
for (word, count) in output:  
    print "%s: %i" % (word, count)  

counts.saveAsTextFile("/data/result")

sc.stop()

接下來,運行下面代碼:

$ spark-submit  --master local[4]   SimpleApp.py

demo(java) 統計單詞出現次數

JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);

demo (java) 讀取HDFS中的數據,並簡單分析,最後結果寫入mysql數據庫中。

<dependency> <!-- Spark dependency -->
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>2.11</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.13</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.6.0</version>
</dependency>
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
    <scope>test</scope>
</dependency>

因爲須要讀取HDFS中的數據,因此須要hadoop-client文件

在main函數中首先建立JavaSparkcontext對象。

SparkConf conf = new SparkConf().setAppName("FindError");
JavaSparkContext sc = new JavaSparkContext(conf);
/**
* 
* 列出指定目錄中的文件,這裏的文件是不包括子目錄的。
* @param pathOfDirectory
*     目錄路徑
* @return
* @throws IOException 
*/
public static String[] findFilePathFromDir(String dst) throws IOException {
  Set<String> filePathSet = new HashSet<String>();
  String[] result = null;
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get(URI.create(dst), conf);
  FileStatus fileList[] = fs.listStatus(new Path(dst));
  int size = fileList.length;
  for (int i = 0; i < size; i++) {
      filePathSet.add(fileList[i].getPath().toString());
  }
  if (filePathSet.size() > 0) {
      result = new String[filePathSet.size()];
      int i = 0;
      for (String str : filePathSet) {
          result[i++] = str;
      }
  }
  fs.close();
  return result;
}

依次遍歷文件路徑併爲每一個文件建立一個新的RDD而後計算出這個文件中包涵ERROR字符串的行數。

Map<String, Long> result = new HashMap<String, Long>();
if (filePaths != null) {
  for (String path : filePaths) {
      result.put(path, sc.textFile(path).filter(new Function<String, Boolean>() {

          public Boolean call(String line) throws Exception {
              return line.contains("ERROR");
          }

      }).count());
  }
}

將results中的數據寫入mysql中

/**
* 將結果寫入mysql中
* @param result
* @throws Exception 
*/
public static void wirteResultToMysql(Map<String, Long> result) throws Exception {
  String DBDRIVER = "com.mysql.jdbc.Driver";  
  //鏈接地址是由各個數據庫生產商單獨提供的,因此須要單獨記住  
  String DBURL = "jdbc:mysql://ip:3306/test";  
  //鏈接數據庫的用戶名  
  String DBUSER = "root";  
  //鏈接數據庫的密碼  
  String DBPASS = "root";
  Connection con = null; //表示數據庫的鏈接對象  
  PreparedStatement pstmt = null; //表示數據庫更新操做  
  String sql = "insert into aaa values(?,?)";  
  Class.forName(DBDRIVER); //一、使用CLASS 類加載驅動程序  
  con = DriverManager.getConnection(DBURL,DBUSER,DBPASS); //二、鏈接數據庫  
  pstmt = con.prepareStatement(sql); //使用預處理的方式建立對象  
  if (result != null) {
      for (String str : result.keySet()) {
          pstmt.setString(1, str);
          pstmt.setLong(2, result.get(str));
          pstmt.addBatch();
      }
  }
  //pstmt.executeUpdate(); //執行SQL 語句,更新數據庫  
  pstmt.executeBatch();
  pstmt.close();  
  con.close(); // 四、關閉數據庫  
}

共享變量

一般狀況下,當一個函數傳遞給一個在遠程集羣節點上運行的Spark操做(好比map和reduce)時,Spark會對涉及到的變量的全部副本執行這個函數。這些變量會被複制到每一個機器上,並且這個過程不會被反饋給驅動程序。一般狀況下,在任務之間讀寫共享變量是很低效的。可是,Spark仍然提供了有限的兩種共享變量類型用於常見的使用場景:廣播變量和累加器。
一、廣播變量
廣播變量容許程序員在每臺機器上保持一個只讀變量的緩存而不是將一個變量的拷貝傳遞給各個任務。它們能夠被使用,好比,給每個節點傳遞一份大輸入數據集的拷貝是很低效的。Spark 試圖使用高效的廣播算法來分佈廣播變量,以此來下降通訊花銷。 能夠經過 SparkContext.broadcast(v) 來從變量 v 建立一個廣播變量。這個廣播變量是 v 的一個包裝,同時它的值能夠功過調用 value 方法來得到。如下的代碼展現了這一點:

broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>

>>> broadcastVar.value
[1, 2, 3]

在廣播變量被建立以後,在全部函數中都應當使用它來代替原來的變量v,這樣就能夠保證v在節點之間只被傳遞一次。另外,v變量在被廣播以後不該該再被修改了,這樣能夠確保每個節點上儲存的廣播變量的一致性(若是這個變量後來又被傳輸給一個新的節點)。

二、累加器
累加器是在一個相關過程當中只能被」累加」的變量,對這個變量的操做能夠有效地被並行化。它們能夠被用於實現計數器(就像在MapReduce過程當中)或求和運算。Spark原生支持對數字類型的累加器,程序員也能夠爲其餘新的類型添加支持。累加器被以一個名字建立以後,會在Spark的UI中顯示出來。這有助於瞭解計算的累進過程(注意:目前Python中不支持這個特性)。

能夠經過SparkContext.accumulator(v)來從變量v建立一個累加器。在集羣中運行的任務隨後可使用add方法或+=操做符(在Scala和Python中)來向這個累加器中累加值。可是,他們不能讀取累加器中的值。只有驅動程序能夠讀取累加器中的值,經過累加器的value方法。

如下的代碼展現了向一個累加器中累加數組元素的過程:

accum = sc.accumulator(0)
Accumulator<id=0, value=0>

>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
10

這段代碼利用了累加器對 int 類型的內建支持,程序員能夠經過繼承 AccumulatorParam 類來建立本身想要的類型支持。AccumulatorParam 的接口提供了兩個方法:zero用於爲你的數據類型提供零值;addInPlace 用於計算兩個值得和。好比,假設咱們有一個 Vector類表示數學中的向量,咱們能夠這樣寫:

class VectorAccumulatorParam(AccumulatorParam):
    def zero(self, initialValue):
        return Vector.zeros(initialValue.size)

    def addInPlace(self, v1, v2):
        v1 += v2
        return v1

# Then, create an Accumulator of this type:
vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())

累加器的更新操做只會被運行一次,Spark 提供了保證,每一個任務中對累加器的更新操做都只會被運行一次。好比,重啓一個任務不會再次更新累加器。在轉化過程當中,用戶應該留意每一個任務的更新操做在任務或做業從新運算時是否被執行了超過一次。

累加器不會改變Spark 的惰性求值模型。若是累加器在對RDD的操做中被更新了,它們的值只會在啓動操做中做爲 RDD 計算過程當中的一部分被更新。因此,在一個懶惰的轉化操做中調用累加器的更新,並無法保證會被及時運行。 下面的代碼段展現了這一點:

accum = sc.accumulator(0)
data.map(lambda x => acc.add(x); f(x))
// 這裏,accum任然是0,由於沒有action算子,因此map也不會進行實際的計算

任務的提交以及Standalone集羣模式的部署

參考官方文檔:http://spark.apache.org/docs/...
spark-submit
首先須要打包代碼,若是你的代碼須要依賴其餘的包環境則須要單獨的打包這些依賴,應爲cluster會將全部依賴的jar包分發到各個節點上進行使用。推薦的方法是將依賴包和程序都統一的打成一個包,這樣就能夠直接使用spark-submit方法來運行,具體的pom.xml配置以下:

<dependencies>
    <dependency> <!-- Spark dependency -->
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>2.11</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.13</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.6.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.11</version>
        <scope>test</scope>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <!-- 使用1.7 jdk進行編譯 -->
                <source>1.7</source>
                <target>1.7</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>2.5.5</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

spark && hadoop 的scope值都設置爲provided
在服務器上提交的命令以下:

./bin/spark-submit \
  --class <main-class>
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]

spark-submit 能夠加載一個配置文件,默認是加載在conf/spark-defaults.conf

單元測試

Spark對全部常見的單元測試框架提供友好的支持。你只須要在測試中建立一個SparkContext對象,而後吧master URL設爲local,運行測試操做,最後調用 SparkContext.stop() 來中止測試。注意,必定要在 finally 代碼塊或者單元測試框架的 tearDown方法裏調用SparkContext.stop(),由於Spark不支持同一程序中有多個SparkContext對象同時運行。

部署

一、Spark Standalone Mode
除了運行在Mesos和YARN集羣以外,spark也提供了簡單的獨立部署模式。能夠經過手動的啓動master和worker,也能夠經過spark提供的啓動腳原本啓動。獨立部署也能夠經過運行在一個機器上,進行測試。
爲了安裝你須要放置一個編譯好的spark版本到每一個機器上。
啓動集羣有兩種方式,一種是手動啓動,另外一種是經過啓動腳本啓動。
1.一、手動啓動spark集羣
啓動一個獨立的master可使用以下的命令:
./sbin/start-master.sh
一旦啓動能夠經過訪問:http://localhost:8080端口訪問master
可使用以下的命令來使worker節點鏈接到master上:
./sbin/start-slave.sh <worker#> <master-spark-URL>
worker在加入到master後能夠訪問master的http://localhost:8080,能夠看到被加入的worker節點的信息。
在啓動master和worker的時候能夠帶上參數進行設置,參數的列表以下:其中比較重要的是:
-c CORES, 這個是指定多少個cpu分配給spark使用,默認是所有cpu
-m MEM,這個是指定多少的內存分配給spark使用,默認是所有的內存的減去1g的操做系統內存所有分配給spark使用。通常的格式是1000M or 2G
-d DIR, 這個指定spark任務的日誌輸出目錄。
–properties-file FILE 指定spark指定加載的配置文件的路徑默認是: conf/spark-defaults.conf

1.二、腳本方式部署
經過spark的部署腳本部署首先須要在spark的主目錄下建立一個conf/slaves的文件,這個文件中每一行表明一個worker的hostname.須要注意的是,master訪問worker節點是經過SSH訪問的,因此須要master經過ssh無密碼的登陸到worker,不然須要設置一個 SPARK_SSH_FOREGROUND的環境變量,這個變量的值就是每一個worker的密碼

而後能夠經過spark安裝目錄下的sbin/….sh文件進行啓動, 若是要啓動和中止master和slave可使用:
sbin/start-all.sh
sbin/stop-all.sh
注意的是這些腳本必須是在master機器上執行

同時你能夠經過配置集羣的 conf/spark-env.sh文件來進一步配置集羣的環境。可是這也文件須要經過拷貝conf/spark-env.sh.template文件來建立,而且須要把這個文件拷貝到全部的worker節點上。
其中: SPARK_MASTER_OPTS && SPARK_WORKER_OPTS 兩個配置項比較複雜。
經過在SparkContext構造器中傳入spark://IP:PORT這個來啓用這個集羣。同時能夠在交互式的方式啓動腳本中使用:./bin/spark-shell –master spark://IP:PORT 來啓動集羣執行。
獨立部署模式的集羣如今只是簡單的支持FIFO調度。 爲了容許多個併發用戶,能夠經過SparkConf設置每一個應用程序須要的資源的最大數。默認狀況下,它會請求使用集羣的所有的核,而這只是同時運行一個應用程序纔回有意義。

val conf = new SparkConf()
             .setMaster(...)
             .setAppName(...)
             .set("spark.cores.max", "10")
val sc = new SparkContext(conf)

除了能夠在程序中指定你也能夠在spark-env.sh中設置默認的值,export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"

二、spark的高可用設置
spark的高可用設置有兩種,一種是經過Zookeeper來實現,另外一種是經過本地文件系統來實現。

2.一、使用ZooKeeper備份master,利用zookeeper提供的領導選舉和狀態保存,你可讓更多的master鏈接到zookeepre實例。一個將會被選舉爲leader其餘的則會保存備份他的狀態。若是master死掉,zookeeper能夠選舉一個新的leader,整個過程須要1到2分鐘的時間,可是這個過程只會對新的任務調度有影響。爲了使用這種方式須要的配置項爲:SPARK_DAEMON_JAVA_OPTS,這個配置項有三個配置信息:spark.deploy.recoveryMode/spark.deploy.zookeeper.url/spark.deploy.zookeeper.dir

2.二、使用本地文件系統來恢復該節點。爲了使用這種方式須要的配置項爲:SPARK_DAEMON_JAVA_OPTS,這個配置項有兩個配置信息:spark.deploy.recoveryMode、spark.deploy.recoveryDirectory

Spark架構與原理


Spark架構採用了分佈式計算中的Master-Slave模型。Master是對應集羣中的含有Master進程的節點,Slave是集羣中含有Worker進程的節點。Master做爲整個集羣的控制器,負責整個集羣的正常運行;Worker至關因而計算節點,接收主節點命令與進行狀態彙報;Executor負責任務的執行;Cluster做爲用戶的客戶端負責提交應用,Driver負責控制一個應用的執行。
Spark集羣部署後,須要在主節點和從節點分別啓動Master進程和Woker進程,對整個集羣進行控制。在一個Spark應用的執行過程當中,Driver和Worker是兩個重要角色。Driver程序是應用邏輯執行的起點,負責做業的調度,即Task任務的分發,而多個Worker用來管理計算節點和建立Executor並行處理任務。在執行階段,Driver會將Task和Task所依賴的
file和jar序列化後傳遞給對應的Worker機器,同時Exucutor對相應數據分區的任務進行處理。
下面詳細介紹Spark的架構中的基本組件。

  • ClusterManager:在Standalone模式中即爲Master(主節點),控制整個集羣,監控Worker。在YARN模式中爲資源管理器。

  • Worker:從節點,負責控制計算節點,啓動Executor或Driver。在YARN模式中爲NodeManager,負責計算節點的控制。
    Spark總體流程爲:Client提交應用,Master找到一個Worker啓動Driver,Driver向Master或者資源管理器申請資源,以後將應用轉化爲RDD Graph,再由DAGScheduler將RDD Graph轉化爲Stage的有向無環圖提交給TaskScheduler,由TaskScheduler提交任務給Executor執行。在任務執行過程當中,其餘組件協同工做,確保整個應用順利進行。

計算模型

  • Application:應用。能夠認爲是屢次批量計算組合起來的過程,在物理上能夠表現爲你寫的程序包+部署配置。應用的概念相似於計算機中的程序,它只是一個藍本,尚沒有運行起來。

  • RDD:Resilient Distributed Datasets,彈性分佈式數據集。RDD便是計算模型裏的一個概念,也是你編程時用到的一種類。一個RDD能夠認爲是spark在執行分佈式計算時的 一批相同來源、相同結構、相同用途的數據集,這個數據集可能被切割成多個分區,分佈在不一樣的機器上,不管如何,這個數據集被稱爲一個RDD。在編程 時,RDD對象就對應了這個數據集,而且RDD對象被看成一個數據操做的基本單位。好比,對某個RDD對象進行map操做,其實就至關於將數據集中的每一個 分區的每一條數據進行了map映射。

  • Partition:分區。一個RDD在物理上被切割成多個數據子集,分佈在不一樣的機器上。每一個數據子集叫一個分區。

  • RDD Graph:RDD組成的DAG(有向無環圖)。RDD是不可變的,一個RDD通過某種操做後,會生成一個新的RDD。這樣說來,一個 Application中的程序,其內容基本上都是對各類RDD的操做,從源RDD,通過各類計算,產生中間RDD,最後生成你想要的RDD並輸出。這個 過程當中的各個RDD,會構成一個有向無環圖。

  • Lineage:血統。RDD這個概念自己包含了這種信息「由哪一個父類RDD通過哪一種操做獲得」。因此某個RDD能夠經過不斷尋找父類,找到最原始的那個RDD。這條繼承路徑就認爲是RDD的血統。

  • Job:從Application和RDD Graph的概念能夠知道,一個應用每每對應了一個RDD Graph。這個應用在準備被spark集羣運行前,實際上就是會生成一個或多個RDD Graph結構,而一個RDD Graph,又能夠生成一個或多個Job。一個Job能夠認爲就是會最終輸出一個結果RDD(後面會介紹,實際上這是action操做)的一條由RDD組 織而成的計算,在Application生成的RDD Graph上表現爲一個子圖。Job在spark裏應用裏也是一個被調度的單位。

  • 寬依賴:RDD生成另外一個RDD時,各個兩個父子RDD間分區的對應關係,被叫作RDD間依賴。寬依賴就是子RDD的某個分區,依賴父RDD的所有分區。

  • 窄依賴:窄依賴就是子RDD的某個分區,只依賴常數個父RDD的分區。寬窄依賴的區別以下圖所示。

  • Stage:Stage能夠理解爲完成一個Job的不一樣階段。一個Job被劃分爲多個Stage,每一個Stage又包含了對多個RDD的多個操做。一個Stage裏,通常包含了一個寬依賴操做,或者多個窄依賴操做。
    窄依賴是指前一個rdd計算能出一個惟一的rdd,好比map或者filter等;寬依賴則是指多個rdd生成一個或者多個rdd的操做,好比groupbykey reducebykey等,這種寬依賴一般會進行shuffle。

  • 算子:父子RDD間的某種操做,被叫某種算子。好比下面會介紹的map,filter,groupByKey等。算子可從多個維度分類,以後再介紹。

  • Task:一個分區對應一個Task。實際上一個Task就是在一個Stage範圍內,某個Executor所要執行的算子。

  • TaskSet:一個Stage範圍內,全部相同的Task被稱爲一個TaskSet。

  • DAGScheduler:DAGScheduler用於根據RDD DAG切分Stage,並維護各個Stage的前後依賴關係,至關於完成了一個Job內的不一樣Stage間的調度策略。

  • TasksetManager:管理一個TaskSet,並決定了這個TaskSet中各個Task的分發策略。

  • TaskScheduler:執行實際的Task分發操做。

SparkUI、History Server:

SparkUI: 4044
History Server:18080
怎麼看?http://www.cnblogs.com/xing90...

參考

http://blog.csdn.net/qq_26562...
http://blog.csdn.net/suzyu123...
http://www.cnblogs.com/helloc...
http://blog.csdn.net/suzyu123...
http://www.jianshu.com/nb/340...
http://www.cnblogs.com/ainima...
http://www.chinahadoop.cn/gro...
https://yq.aliyun.com/article...
http://ifeve.com/category/spa...

相關文章
相關標籤/搜索