Spark學習筆記java
Spark簡介node
spark 能夠很容易和yarn結合,直接調用HDFS、Hbase上面的數據,和hadoop結合。配置很容易。算法
spark發展迅猛,框架比hadoop更加靈活實用。減小了延時處理,提升性能效率實用靈活性。也能夠與hadoop切實相互結合。sql
spark核心部分分爲RDD。Spark SQL、Spark Streaming、MLlib、GraphX、Spark R等核心組件解決了不少的大數據問題,其完美的框架日受歡迎。其相應的生態環境包括zepplin等可視化方面,正日益壯大。大型公司爭相實用spark來代替原有hadoop上相應的功能模塊。Spark讀寫過程不像hadoop溢出寫入磁盤,都是基於內存,所以速度很快。另外DAG做業調度系統的寬窄依賴讓Spark速度提升。數據庫
Spark核心組成編程
1、RDD數組
是彈性分佈式數據集,徹底彈性的,若是數據丟失一部分還能夠重建。有自動容錯、位置感知調度和可伸縮性,經過數據檢查點和記錄數據更新金象容錯性檢查。經過SparkContext.textFile()加載文件變成RDD,而後經過transformation構建新的RDD,經過action將RDD存儲到外部系統。緩存
RDD使用延遲加載,也就是懶加載,只有當用到的時候才加載數據。若是加載存儲全部的中間過程會浪費空間。所以要延遲加載。一旦spark看到整個變換鏈,他能夠計算僅需的結果數據,若是下面的函數不須要數據那麼數據也不會再加載。轉換RDD是惰性的,只有在動做中才可使用它們。網絡
Spark分爲driver和executor,driver提交做業,executor是application早worknode上的進程,運行task,driver對應爲sparkcontext。Spark的RDD操做有transformation、action。Transformation對RDD進行依賴包裝,RDD所對應的依賴都進行DAG的構建並保存,在worknode掛掉以後除了經過備份恢復還能夠經過元數據對其保存的依賴再計算一次獲得。看成業提交也就是調用runJob時,spark會根據RDD構建DAG圖,提交給DAGScheduler,這個DAGScheduler是在SparkContext建立時一同初始化的,他會對做業進行調度處理。當依賴圖構建好之後,從action開始進行解析,每個操做做爲一個task,每遇到shuffle就切割成爲一個taskSet,並把數據輸出到磁盤,若是不是shuffle數據還在內存中存儲。就這樣再往前推動,直到沒有算子,而後運行從前面開始,若是沒有action的算子在這裏不會執行,直到遇到action爲止纔開始運行,這就造成了spark的懶加載,taskset提交給TaskSheduler生成TaskSetManager而且提交給Executor運行,運行結束後反饋給DAGScheduler完成一個taskSet,以後再提交下一個,當TaskSet運行失敗時就返回DAGScheduler並從新再次建立。一個job裏面可能有多個TaskSet,一個application可能包含多個job。app
二、Spark Streaming
經過對kafka數據讀取,將Stream數據分紅小的時間片斷(幾秒),以相似batch批處理的方式來處理這一部分小數據,每一個時間片生成一個RDD,有高效的容錯性,對小批量數據能夠兼容批量實時數據處理的邏輯算法,用一些歷史數據和實時數據聯合進行分析,好比分類算法等。也能夠對小批量的stream進行mapreduce、join等操做,而保證其實時性。針對數據流時間要求不到毫秒級的工程性問題均可以。
Spark Streaming也有一個StreamingContext,其核心是DStream,是經過以組時間序列上的連續RDD來組成的,包含一個有Time做爲key、RDD做爲value的結構體,每個RDD都包含特定時間間隔的數據流,能夠經過persist將其持久化。在接受不斷的數據流後,在blockGenerator中維護一個隊列,將流數據放到隊列中,等處理時間間隔到來後將其中的全部數據合併成爲一個RDD(這一間隔中的數據)。其做業提交和spark類似,只不過在提交時拿到DStream內部的RDD併產生Job提交,RDD在action觸發以後,將job提交給jobManager中的JobQueue,又jobScheduler調度,JobScheduler將job提交到spark的job調度器,而後將job轉換成爲大量的任務分發給spark集羣執行。Job從outputStream中生成的,而後觸發反向回溯執行DStreamDAG。在流數據處理的過程當中,通常節點失效的處理比離線數據要複雜。Spark streamin在1.3以後能夠週期性的將DStream寫入HDFS,同時將offset也進行存儲,避免寫到zk。一旦主節點失效,會經過checkpoint的方式讀取以前的數據。當worknode節點失效,若是HDFS或文件做爲輸入源那Spark會根據依賴關係從新計算數據,若是是基於Kafka、Flume等網絡數據源spark會將手機的數據源在集羣中的不一樣節點進行備份,一旦有一個工做節點失效,系統可以根據另外一份還存在的數據從新計算,可是若是接受節點失效會丟失一部分數據,同時接受線程會在其餘的節點上從新啓動並接受數據。
三、Graphx
主要用於圖的計算。核心算法有PageRank、SVD奇異矩陣、TriangleConut等。
四、Spark SQL
是Spark新推出的交互式大數據SQL技術。把sql語句翻譯成Spark上的RDD操做能夠支持Hive、Json等類型的數據。
五、Spark R
經過R語言調用spark,目前不會擁有像Scala或者java那樣普遍的API,Spark經過RDD類提供Spark API,而且容許用戶使用R交互式方式在集羣中運行任務。同時集成了MLlib機器學習類庫。
六、MLBase
從上到下包括了MLOptimizer(給使用者)、MLI(給算法使用者)、MLlib(給算法開發者)、Spark。也能夠直接使用MLlib。ML Optimizer,一個優化機器學習選擇更合適的算法和相關參數的模塊,還有MLI進行特徵抽取和高級ML編程 抽象算法實現API平臺,MLlib分佈式機器學習庫,能夠不斷擴充算法。MLRuntime基於spark計算框架,將Spark的分佈式計算應用到機器學習領域。MLBase提供了一個簡單的聲明方法指定機器學習任務,而且動態地選擇最優的學習算法。
七、Tachyon
高容錯的分佈式文件系統。宣稱其性能是HDFS的3000多倍。有相似java的接口,也實現了HDFS接口,因此Spark和MR程序不須要任何的修改就能夠運行。目前支持HDFS、S3等。
八、Spark算子
一、Map。對原數據進行處理,相似於遍歷操做,轉換成MappedRDD,原分區不變。
二、flatMap。將原來的RDD中的每個元素經過函數轉換成新的元素,將RDD的每一個集合中的元素合併成一個集合。好比一個元素裏面多個list,經過這個函數都合併成一個大的list,最經典的就是wordcount中將每一行元素進行分詞之後成爲,經過flapMap變成一個個的單詞,line.flapMap(_.split(「 」)).map((_,1))若是經過map就會將一行的單詞變成一個list。
三、mapPartitions。對每一個分區進行迭代,生成MapPartitionsRDD。
四、Union。是將兩個RDD合併成一個。使用這個函數要保證兩個RDD元素的數據類型相同,返回的RDD的數據類型和被合併的RDD數據類型相同。
五、Filter。其功能是對元素進行過濾,對每一個元素調用f函數,返回值爲true的元素就保留在RDD中。
六、Distinct。對RDD中元素進行去重操做。
七、Subtract。對RDD1中取出RDD1與RDD2交集中的全部元素。
八、Sample。對RDD中的集合內元素進行採樣,第一個參數withReplacement是true表示有放回取樣,false表示無放回。第二個參數表示比例,第三個參數是隨機種子。如data.sample(true, 0.3,new Random().nextInt())。
九、takeSample。和sample用法相同,只不第二個參數換成了個數。返回也不是RDD,而是collect。
十、Cache。將RDD緩存到內存中。至關於persist(MEMORY_ONLY)。能夠經過參數設置緩存和運行內存之間的比例,若是數據量大於cache內存則會丟失。
十一、Persist。裏面參數能夠選擇DISK_ONLY/MEMORY_ONLY/MEMORY_AND_DISK等,其中的MEMORY_AND_DISK當緩存空間滿了後自動溢出到磁盤。
十二、MapValues。針對KV數據,對數據中的value進行map操做,而不對key進行處理。
1三、reduceByKey。針對KV數據將相同key的value聚合到一塊兒。與groupByKey不一樣,會進行一個相似mapreduce中的combine操做,減小相應的數據IO操做,加快效率。若是想進行一些非疊加操做,咱們能夠將value組合成字符串或其餘格式將相同key的value組合在一塊兒,再經過迭代,組合的數據拆開操做。
1四、partitionBy。能夠將RDD進行分區,從新生成一個ShuffleRDD,進行一個shuffle操做,對後面進行頻繁的shuffle操做能夠加快效率。
1五、randomSplit。對RDD進行隨機切分。如data.randomSplit(new double[]{0.7, 0.3})返回一個RDD的數組。
1六、Cogroup。對兩個RDD中的KV元素,每一個RDD中相同key中的元素分別聚合成一個集合。與reduceByKey不一樣的是針對兩個RDD中相同的key的元素進行合併。
1七、Join。至關於inner join。對兩個須要鏈接的RDD進行cogroup,而後對每一個key下面的list進行笛卡爾積的操做,輸出兩兩相交的兩個集合做爲value。 至關於sql中where a.key=b.key。
1八、leftOutJoin,rightOutJoin。在數據庫中左鏈接以左表爲座標將表中全部的數據列出來,右面不存在的用null填充。在這裏面對join的基礎上判斷左側的RDD元素是不是空,若是是空則填充。右鏈接則相反。
1九、saveAsTestFile。將數據輸出到HDFS的指定目錄。
20、saveAsObjectFile。寫入HDFS爲SequenceFile格式。
2一、Collect、collectAsMap。將RDD轉換成list或者Map。結果以List或者HashMap的方式輸出。
2二、Count。對RDD的元素進行統計,返回個數。
2三、Top(k)。返回最大的k個元素,返回List的形式。
2四、Take返回數據的前k個元素。
2五、takeOrdered。返回數據的最小的k個元素,並在返回中保持元素的順序。
9、Tips
1、RDD.repartition(n)能夠在最初對RDD進行分區操做,這個操做其實是一個shuffle,可能比較耗時,可是若是以後的action比較多的話,能夠減小下面操做的時間。其中的n值看cpu的個數,通常大於2倍cpu,小於1000。
2、Action不可以太多,每一次的action都會將以上的taskset劃分一個job,這樣當job增多,而其中task並不釋放,會佔用更多的內存,使得gc拉低效率。
3、在shuffle前面進行一個過濾,減小shuffle數據,而且過濾掉null值,以及空值。
4、groupBy儘可能經過reduceBy替代。reduceBy會在work節點作一次reduce,在總體進行reduce,至關於作了一次hadoop中的combine操做,而combine操做和reduceBy邏輯一致,這個groupBy不能保證。
5、作join的時候,儘可能用小RDD去join大RDD,用大RDD去join超大的RDD。
6、避免collect的使用。由於collect若是數據集超大的時候,會經過各個work進行收集,io增多,拉低性能,所以當數據集很大時要save到HDFS。
7、RDD若是後面使用迭代,建議cache,可是必定要估計好數據的大小,避免比cache設定的內存還要大,若是大過內存就會刪除以前存儲的cache,可能致使計算錯誤,若是想要徹底的存儲可使用persist(MEMORY_AND_DISK),由於cache就是persist(MEMORY_ONLY)。
8、設置spark.cleaner.ttl,定時清除task,由於job的緣由可能會緩存不少執行過去的task,因此定時回收可能避免集中gc操做拉低性能。
9、適當pre-partition,經過partitionBy()設定,每次partitionBy會生成一個RDD。