Scala是一種多範式的編程語言,其設計的初衷是要集成面向對象編程和函數式編程的各類特性。Scala運行於Java平臺(Java虛擬機),併兼容現有的Java程序。它也能運行於CLDC配置的Java ME中。目前還有另外一.NET平臺的實現,不過該版本更新有些滯後。Scala的編譯模型(獨立編譯,動態類加載)與Java和C#同樣,因此Scala代碼能夠調用Java類庫(對於.NET實現則可調用.NET類庫)。Scala包括編譯器和類庫,以及BSD許可證發佈。html
學習Scala編程語言,爲後續學習Spark奠基基礎。java
l 安裝JDKmysql
l 下載Scala:http://www.scala-lang.org/download/es6
l 安裝Scala:web
n 設置環境變量:SCALA_HOME和PATH路徑算法
l 驗證Scala sql
l REPL(Read Evaluate Print Loop):命令行shell
l IDE:圖形開發工具數據庫
n The Scala IDE (Based on Eclipse):http://scala-ide.org/apache
n IntelliJ IDEA with Scala plugin:http://www.jetbrains.com/idea/download/
n Netbeans IDE with the Scala plugin
注意:在Scala中,任何數據都是對象。例如:
① 數值類型:Byte,Short,Int,Long,Float,Double
l Byte: 8位有符號數字,從-128 到 127
l Short: 16位有符號數據,從-32768 到 32767
l Int: 32位有符號數據
l Long: 64位有符號數據
例如:
val a:Byte = 10
a+10
獲得:res9: Int = 20
這裏的res9是新生成變量的名字
val b:Short = 20
a+b
注意:在Scala中,定義變量能夠不指定類型,由於Scala會進行類型的自動推導。
② 字符類型和字符串類型:Char和String
對於字符串,在Scala中能夠進行插值操做。
注意:前面有個s;至關於執行:"My Name is " + s1
③ Unit類型:至關於Java中的void類型
④ Nothing類型:通常表示在執行過程當中,產生了Exception
例如,咱們定義一個函數以下:
l 使用val和var申明變量
例如:scala> val answer = 8 * 3 + 2
能夠在後續表達式中使用這些名稱
l val:定義的值實際是一個常量
要申明其值可變的變量:var
注意:能夠不用顯式指定變量的類型,Scala會進行自動的類型推到
l 可使用Scala的預約義函數
例如:求兩個值的最大值
l 也可使用def關鍵字自定義函數
語法:
示例:
Scala的if/else語法結構和Java或C++同樣。
不過,在Scala中,if/else是表達式,有值,這個值就是跟在if或else以後的表達式的值。
Scala擁有與Java和C++相同的while和do循環
Scala中,可使用for和foreach進行迭代
注意:
(*) <- 表示Scala中的generator,即:提取符
(*)第三種寫法是第二種寫法的簡寫
在上面的案例中,咱們將list集合中的每一個元素轉換成了大寫,而且使用yield關鍵字生成了一個新的集合。
注意:在上面的例子中,foreach接收了另外一個函數(println)做爲值
l Call By Value:對函數實參求值,且僅求一次
l Call By Name:函數實參每次在函數體內被用到時都會求值
咱們來分析一下,上面兩個調用執行的過程:
一份複雜一點的例子:
l 默認參數
l 代名參數
l 可變參數
當val被申明爲lazy時,它的初始化將被推遲,直到咱們首次對它取值。
一個更爲複雜一點的例子:讀取文件:
Scala異常的工做機制和Java或者C++同樣。直接使用throw關鍵字拋出異常。
使用try...catch...finally來捕獲和處理異常:
Scala數組的類型:
l 定長數組:使用關鍵字Array
l 變長數組:使用關鍵字ArrayBuffer
l 遍歷數組
l Scala數組的經常使用操做
l Scala的多維數組
l 和Java同樣,多維數組是經過數組的數組來實現的。
l 也能夠建立不規則的數組,每一行的長度各不相同。
映射就是Map集合,由一個(key,value)組成。
-> 操做符用來建立
例如:
val scores = Map(「Alice」 -> 10,」Bob」 -> 3,」Cindy」 -> 8)
映射的類型分爲:不可變Map和可變Map
映射的操做
l 獲取映射中的值
l 更新映射中的值(必須是可變Map)
l 迭代映射
元組是不一樣類型的值的彙集。
例如:val t = (1, 3.14, "Fred") // 類型爲Tuple3[Int, Double, java.lang.String]
這裏:Tuple是類型,3是表示元組中有三個元素。
元組的訪問和遍歷:
注意:要遍歷Tuple中的元素,須要首先生成對應的迭代器。不能直接使用for或者foreach。
把數據及對數據的操做方法放在一塊兒,做爲一個相互依存的總體——對象
面向對象的三大特徵:
u 封裝
u 繼承
u 多態
簡單類和無參方法:
案例:注意沒有class前面沒有public關鍵字修飾。
若是要開發main方法,須要將main方法定義在該類的伴生對象中,即:object對象中,(後續作詳細的討論)。
l 當定義屬性是private時候,scala會自動爲其生成對應的get和set方法
private var stuName:String = "Tom"
l 定義屬性:private var money:Int = 1000 但願money只有get方法,沒有set方法??
l private[this]的用法:該屬性只屬於該對象私有,就不會生成對應的set和get方法。若是這樣,就不能直接調用,例如:s1.stuName ---> 錯誤
咱們能夠在一個類的內部在定義一個類,以下:咱們在Student類中,再定義了一個Course類用於保存學生選修的課程。
開發一個測試程序進行測試:
類的構造器分爲:主構造器、輔助構造器
l 主構造器:和類的聲明結合在一塊兒,只能有一個主構造器
Student4(val stuName:String,val stuAge:Int)
(1) 定義類的主構造器:兩個參數
(2) 聲明瞭兩個屬性:stuName和stuAge 和對應的get和set方法
l 輔助構造器:能夠有多個輔助構造器,經過關鍵字this來實現
Scala沒有靜態的修飾符,但Object對象下的成員都是靜態的 ,如有同名的class,這其做爲它的伴生類。在Object中通常能夠爲伴生類作一些初始化等操做。
下面是Java中的靜態塊的例子。在這個例子中,咱們對JDBC進行了初始化。
而Scala中的Object就至關於Java中靜態塊。
Object對象的應用
u 單例對象
u 使用應用程序對象:能夠省略main方法;須要從父類App繼承。
遇到以下形式的表達式時,apply方法就會被調用:
Object(參數1,參數2,......,參數N)
一般,這樣一個apply方法返回的是伴生類的對象;其做用是爲了省略new關鍵字
Object的apply方法舉例:
Scala和Java同樣,使用extends關鍵字擴展類。
l 案例一:Employee類繼承Person類
l 案例二:在子類中重寫父類的方法
l 案例三:使用匿名子類
l 案例四:使用抽象類。抽象類中包含抽象方法,抽象類只能用來繼承。
l 案例五:使用抽象字段。抽象字段就是一個沒有初始值的字段
trait就是抽象類。trait跟抽象類最大的區別:trait支持多重繼承
十、包的使用
Scala的包和Java中的包或者C++中的命名空間的目的是相同的:管理大型程序中的名稱。
Scala中包的定義和使用:
u 包的定義
u 包的引入:Scala中依然使用import做爲引用包的關鍵字,例如
u 並且Scala中的import能夠寫在任意地方(Java中,import寫在最前面)
包能夠包含類、對象和特質,但不能包含函數或者變量的定義。很不幸,這是Java虛擬機的侷限。
把工具函數或者常量添加到包而不是某個Utils對象,這是更加合理的作法。Scala中,包對象的出現正是爲了解決這個侷限。
Scala中的包對象:常量,變量,方法,類,對象,trait(特質),包
l 讀取行
l 讀取字符
其實這裏的source就指向了這個文件中的每一個字符。
l 從URL或其餘源讀取:注意指定字符集UTF-8
l 讀取二進制文件:Scala中並不支持直接讀取二進制,但能夠經過調用Java的InputStream來進行讀入。
l 寫入文本文件
在Scala中,函數是「頭等公民」,就和數字同樣。能夠在變量中存放函數,即:將函數做爲變量的值(值函數)。
示例1:
(*)首先,定義一個最普通的函數
(*)再定義一個高階函數
(*)分析這個高階函數調用的過程
示例2:
在這個例子中,首先定義了一個普通的函數mytest,而後定義了一個高階函數myFunction;myFunction接收三個參數:第一個f是一個函數參數,第二個是x,第三個是y。而f是一個函數參數,自己接收兩個Int的參數,返回一個Int的值。
就是函數的嵌套,即:在一個函數定義中,包含另一個函數的定義;而且在內函數中能夠訪問外函數中的變量。
測試上面的函數:
柯里化函數(Curried Function)是把具備多個參數的函數轉換爲一條函數鏈,每一個節點上是單一參數。
一個簡單的例子:
示例1:
示例2:
示例3:
示例4:
示例5:
在這個例子中,能夠被2整除的被分到一個分區;不能被2整除的被分到另外一個分區。
示例6:
示例7:
示例8:
在這個例子中,分爲兩步:
(1)將(1,2,3)和(4,5,6)這兩個集合合併成一個集合
(2)再對每一個元素乘以2
l 可變集合
l 不可變集合:
n 集合從不改變,所以能夠安全地共享其引用。
n 甚至是在一個多線程的應用程序當中也沒問題。
集合的操做:
l 不可變列表(List)
不可變列表的相關操做:
l 可變列表(LinkedList):scala.collection.mutable
經常使用的序列有:Vector和Range
u Vector是ArrayBuffer的不可變版本,是一個帶下標的序列
u Range表示一個整數序列
l 集Set是不重複元素的集合
l 和列表不一樣,集並不保留元素插入的順序。默認以Hash集實現
示例1:建立集
示例2:集的操做
Scala有一個強大的模式匹配機制,能夠應用在不少場合:
u switch語句
u 類型檢查
Scala還提供了樣本類(case class),對模式匹配進行了優化
模式匹配示例:
l 更好的switch
l Scala的守衛
l 模式匹配中的變量
l 類型模式
l 匹配數組和列表
簡單的來講,Scala的case class就是在普通的類定義前加case這個關鍵字,而後你能夠對這些類來模式匹配。
case class帶來的最大的好處是它們支持模式識別。
首先,回顧一下前面的模式匹配:
其次,若是咱們想判斷一個對象是不是某個類的對象,跟Java同樣可使用isInstanceOf
下面這個好像有點問題
最後,在Scala中有一種更簡單的方式來判斷,就是case class
注意:須要在class前面使用case關鍵字。
和Java或者C++同樣,類和特質能夠帶類型參數。在Scala中,使用方括號來定義類型參數
測試程序:
函數和方法也能夠帶類型參數。和泛型類同樣,咱們須要把類型參數放在方法名以後。
注意:這裏的ClassTag是必須的,表示運行時的一些信息,好比類型。
類型的上界和下界,是用來定義類型變量的範圍。它們的含義以下:
l S <: T
這是類型上界的定義。也就是S必須是類型T的子類(或自己,本身也能夠認爲是本身的子類。
u U >: T
這是類型下界的定義。也就是U必須是類型T的父類(或自己,本身也能夠認爲是本身的父類)。
l 一個簡單的例子:
l 一個複雜一點的例子(上界):
l 再來看一個例子:
它比 <: 適用的範圍更廣,除了全部的子類型,還容許隱式轉換過去的類型。用 <% 表示。儘可能使用視圖界定,來取代泛型的上界,由於適用的範圍更加普遍。
示例:
l 上面寫過的一個列子。這裏因爲T的上界是String,當咱們傳遞100和200的時候,就會出現類型不匹配。
l 可是100和200是能夠轉成字符串的,因此咱們可使用視圖界定讓addTwoString方法接收更普遍的數據類型,即:字符串及其子類、能夠轉換成字符串的類型。
注意:使用的是 <%
l 但實際運行的時候,會出現錯誤:
這是由於:Scala並無定義如何將Int轉換成String的規則,因此要使用視圖界定,咱們就必須建立轉換的規則。
l 建立轉換規則
l 運行成功
l 協變:
Scala的類或特徵的範型定義中,若是在類型參數前面加入+符號,就可使類或特徵變爲協變了。
u 逆變:
在類或特徵的定義中,在類型參數以前加上一個-符號,就可定義逆變範型類和特徵了。
總結一下:Scala的協變:泛型變量的值能夠是自己類型或者其子類的類型
Scala的逆變:泛型變量的值能夠是自己類型或者其父類的類型
所謂隱式轉換函數指的是以implicit關鍵字申明的帶有單個參數的函數。
l 前面講視圖界定時候的一個例子:
l 再舉一個例子:咱們把Fruit對象轉換成了Monkey對象
使用implicit申明的函數參數叫作隱式參數。咱們也可使用隱式參數實現隱式的轉換
所謂隱式類: 就是對類增長implicit 限定的類,其做用主要是對類的功能增強!
翻譯:Spark是一個針對大規模數據處理的快速通用引擎。
Spark是一種快速、通用、可擴展的大數據分析引擎,2009年誕生於加州大學伯克利分校AMPLab,2010年開源,2013年6月成爲Apache孵化項目,2014年2月成爲Apache頂級項目。目前,Spark生態系統已經發展成爲一個包含多個子項目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子項目,Spark是基於內存計算的大數據並行計算框架。Spark基於內存計算,提升了在大數據環境下數據處理的實時性,同時保證了高容錯性和高可伸縮性,容許用戶將Spark部署在大量廉價硬件之上,造成集羣。Spark獲得了衆多大數據公司的支持,這些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、騰訊、京東、攜程、優酷土豆。當前百度的Spark已應用於鳳巢、大搜索、直達號、百度大數據等業務;阿里利用GraphX構建了大規模的圖計算和圖挖掘系統,實現了不少生產系統的推薦算法;騰訊Spark集羣達到8000臺的規模,是當前已知的世界上最大的Spark集羣。
(*)Hadoop的MapReduce計算模型存在的問題:
學習過Hadoop的MapReduce的學員都知道,MapReduce的核心是Shuffle(洗牌)。在整個Shuffle的過程當中,至少會產生6次的I/O。下圖是咱們在講MapReduce的時候,畫的Shuffle的過程。
中間結果輸出:基於MapReduce的計算引擎一般會將中間結果輸出到磁盤上,進行存儲和容錯。另外,當一些查詢(如:Hive)翻譯到MapReduce任務時,每每會產生多個Stage(階段),而這些串聯的Stage又依賴於底層文件系統(如HDFS)來存儲每個Stage的輸出結果,而I/O的效率每每較低,從而影響了MapReduce的運行速度。
(*)Spark的最大特色:基於內存
Spark是MapReduce的替代方案,並且兼容HDFS、Hive,可融入Hadoop的生態系統,以彌補MapReduce的不足。
(*)快
與Hadoop的MapReduce相比,Spark基於內存的運算速度要快100倍以上,即便,Spark基於硬盤的運算也要快10倍。Spark實現了高效的DAG執行引擎,從而能夠經過內存來高效處理數據流。
(*)易用
Spark支持Java、Python和Scala的API,還支持超過80種高級算法,使用戶能夠快速構建不一樣的應用。並且Spark支持交互式的Python和Scala的shell,能夠很是方便地在這些shell中使用Spark集羣來驗證解決問題的方法。
(*)通用
Spark提供了統一的解決方案。Spark能夠用於批處理、交互式查詢(Spark SQL)、實時流處理(Spark Streaming)、機器學習(Spark MLlib)和圖計算(GraphX)。這些不一樣類型的處理均可以在同一個應用中無縫使用。Spark統一的解決方案很是具備吸引力,畢竟任何公司都想用統一的平臺去處理遇到的問題,減小開發和維護的人力成本和部署平臺的物力成本。
另外Spark還能夠很好的融入Hadoop的體系結構中能夠直接操做HDFS,並提供Hive on Spark、Pig on Spark的框架集成Hadoop。
(*)兼容性
Spark能夠很是方便地與其餘的開源產品進行融合。好比,Spark可使用Hadoop的YARN和Apache Mesos做爲它的資源管理和調度器,器,而且能夠處理全部Hadoop支持的數據,包括HDFS、HBase和Cassandra等。這對於已經部署Hadoop集羣的用戶特別重要,由於不須要作任何數據遷移就可使用Spark的強大處理能力。Spark也能夠不依賴於第三方的資源管理和調度器,它實現了Standalone做爲其內置的資源管理和調度框架,這樣進一步下降了Spark的使用門檻,使得全部人均可以很是容易地部署和使用Spark。此外,Spark還提供了在EC2上部署Standalone的Spark集羣的工具。
官方的一張圖:
本身的一張圖:
Spark的安裝部署方式有如下幾種模式:
l Standalone
l YARN
l Mesos
l Amazon EC2
(*)Spark Standalone僞分佈的部署
l 配置文件:conf/spark-env.sh
n export JAVA_HOME=/root/training/jdk1.7.0_75
n export SPARK_MASTER_HOST=spark81
n export SPARK_MASTER_PORT=7077
n 下面的能夠不寫,默認
n export SPARK_WORKER_CORES=1
n export SPARK_WORKER_MEMORY=1024m
l 配置文件:conf/slave
n spark81
(*)Spark Standalone全分佈的部署
l 配置文件:conf/spark-env.sh
n export JAVA_HOME=/root/training/jdk1.7.0_75
n export SPARK_MASTER_HOST=spark82
n export SPARK_MASTER_PORT=7077
n 下面的能夠不寫,默認
n export SPARK_WORKER_CORES=1
n export SPARK_WORKER_MEMORY=1024m
l 配置文件:conf/slave
n spark83
n spark84
(*)啓動Spark集羣:start-all.sh
(*)基於文件系統的單點恢復
主要用於開發或測試環境。當spark提供目錄保存spark Application和worker的註冊信息,並將他們的恢復狀態寫入該目錄中,這時,一旦Master發生故障,就能夠經過從新啓動Master進程(sbin/start-master.sh),恢復已運行的spark Application和worker的註冊信息。
基於文件系統的單點恢復,主要是在spark-en.sh裏對SPARK_DAEMON_JAVA_OPTS設置
配置參數 |
參考值 |
spark.deploy.recoveryMode |
設置爲FILESYSTEM開啓單點恢復功能,默認值:NONE |
spark.deploy.recoveryDirectory |
Spark 保存恢復狀態的目錄 |
參考:
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/root/training/spark-2.1.0-bin-hadoop2.7/recovery"
測試:
一、在spark82上啓動Spark集羣
二、在spark83上啓動spark shell
MASTER=spark://spark82:7077 spark-shell
三、在spark82上中止master
stop-master.sh
四、觀察spark83上的輸出:
五、在spark82上重啓master
start-master.sh
(*)基於Zookeeper的Standby Masters
ZooKeeper提供了一個Leader Election機制,利用這個機制能夠保證雖然集羣存在多個Master,可是隻有一個是Active的,其餘的都是Standby。當Active的Master出現故障時,另外的一個Standby Master會被選舉出來。因爲集羣的信息,包括Worker, Driver和Application的信息都已經持久化到ZooKeeper,所以在切換的過程當中只會影響新Job的提交,對於正在進行的Job沒有任何的影響。加入ZooKeeper的集羣總體架構以下圖所示。
配置參數 |
參考值 |
spark.deploy.recoveryMode |
設置爲ZOOKEEPER開啓單點恢復功能,默認值:NONE |
spark.deploy.zookeeper.url |
ZooKeeper集羣的地址 |
spark.deploy.zookeeper.dir |
Spark信息在ZK中的保存目錄,默認:/spark |
l 參考:
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=bigdata12:2181,bigdata13:2181,bigdata14:2181 -Dspark.deploy.zookeeper.dir=/spark"
l 另外:每一個節點上,須要將如下兩行註釋掉。
l ZooKeeper中保存的信息
(*)示例程序:$SPARK_HOME/examples/jars/spark-examples_2.11-2.1.0.jar
(*)全部的示例程序:$EXAMPLE_HOME/examples/src/main有Java、Scala等等
(*)Demo:蒙特卡羅求PI
命令:
spark-submit --master spark://spark81:7077 --class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.11-2.1.0.jar 100
spark-shell是Spark自帶的交互式Shell程序,方便用戶進行交互式編程,用戶能夠在該命令行下用scala編寫spark程序。
(*)啓動Spark Shell:spark-shell
也可使用如下參數:
參數說明:
--master spark://spark81:7077 指定Master的地址
--executor-memory 2g 指定每一個worker可用內存爲2G
--total-executor-cores 2 指定整個集羣使用的cup核數爲2個
例如:
spark-shell --master spark://spark81:7077 --executor-memory 2g --total-executor-cores 2
(*)注意:
若是啓動spark shell時沒有指定master地址,可是也能夠正常啓動spark shell和執行spark shell中的程序,實際上是啓動了spark的local模式,該模式僅在本機啓動一個進程,沒有與集羣創建聯繫。
請注意local模式和集羣模式的日誌區別:
(*)在Spark Shell中編寫WordCount程序
程序以下:
sc.textFile("hdfs://192.168.88.111:9000/data/data.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://192.168.88.111:9000/output/spark/wc")
說明:
l sc是SparkContext對象,該對象時提交spark程序的入口
l textFile("hdfs://192.168.88.111:9000/data/data.txt")是hdfs中讀取數據
l flatMap(_.split(" "))先map在壓平
l map((_,1))將單詞和1構成元組
l reduceByKey(_+_)按照key進行reduce,並將value累加
l saveAsTextFile("hdfs://192.168.88.111:9000/output/spark/wc")將結果寫入到hdfs中
(*)須要的jar包:$SPARK_HOME/jars/*.jar
(*)建立Scala Project,並建立Scala Object、或者Java Class
(*)書寫源代碼,並打成jar包,上傳到Linux
==========================Scala版本==========================
(*)運行程序:
spark-submit --master spark://spark81:7077 --class mydemo.WordCount jars/wc.jar hdfs://192.168.88.111:9000/data/data.txt hdfs://192.168.88.111:9000/output/spark/wc1
====================Java版本(直接輸出在屏幕)====================
(*)運行程序:
spark-submit --master spark://spark81:7077 --class mydemo.JavaWordCount jars/wc.jar hdfs://192.168.88.111:9000/data/data.txt
====================Python版本(直接輸出在屏幕)====================
(*)運行程序:
等等
須要看源碼一步步看。
RDD(Resilient Distributed Dataset)叫作彈性分佈式數據集,是Spark中最基本的數據抽象,它表明一個不可變、可分區、裏面的元素可並行計算的集合。RDD具備數據流模型的特色:自動容錯、位置感知性調度和可伸縮性。RDD容許用戶在執行多個查詢時顯式地將工做集緩存在內存中,後續的查詢可以重用工做集,這極大地提高了查詢速度。
² 一組分片(Partition),即數據集的基本組成單位。對於RDD來講,每一個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶能夠在建立RDD時指定RDD的分片個數,若是沒有指定,那麼就會採用默認值。默認值就是程序所分配到的CPU Core的數目。
² 一個計算每一個分區的函數。Spark中RDD的計算是以分片爲單位的,每一個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行復合,不須要保存每次計算的結果。
² RDD之間的依賴關係。RDD的每次轉換都會生成一個新的RDD,因此RDD之間就會造成相似於流水線同樣的先後依賴關係。在部分分區數據丟失時,Spark能夠經過這個依賴關係從新計算丟失的分區數據,而不是對RDD的全部分區進行從新計算。
² 一個Partitioner,即RDD的分片函數。當前Spark中實現了兩種類型的分片函數,一個是基於哈希的HashPartitioner,另一個是基於範圍的RangePartitioner。只有對於於key-value的RDD,纔會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數不但決定了RDD自己的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。
² 一個列表,存儲存取每一個Partition的優先位置(preferred location)。對於一個HDFS文件來講,這個列表保存的就是每一個Partition所在的塊的位置。按照「移動數據不如移動計算」的理念,Spark在進行任務調度的時候,會盡量地將計算任務分配到其所要處理數據塊的存儲位置。
u RDD的建立方式
val rdd1 = sc.textFile(「hdfs://192.168.88.111:9000/data/data.txt」)
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
u RDD的基本原理
RDD中的全部轉換都是延遲加載的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動做。只有當發生一個要求返回結果給Driver的動做時,這些轉換纔會真正運行。這種設計讓Spark更加有效率地運行。
轉換 |
含義 |
map(func) |
返回一個新的RDD,該RDD由每個輸入元素通過func函數轉換後組成 |
filter(func) |
返回一個新的RDD,該RDD由通過func函數計算後返回值爲true的輸入元素組成 |
flatMap(func) |
相似於map,可是每個輸入元素能夠被映射爲0或多個輸出元素(因此func應該返回一個序列,而不是單一元素) |
mapPartitions(func) |
相似於map,但獨立地在RDD的每個分片上運行,所以在類型爲T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) |
相似於mapPartitions,但func帶有一個整數參數表示分片的索引值,所以在類型爲T的RDD上運行時,func的函數類型必須是(Int, Interator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) |
根據fraction指定的比例對數據進行採樣,能夠選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子 |
union(otherDataset) |
對源RDD和參數RDD求並集後返回一個新的RDD |
intersection(otherDataset) |
對源RDD和參數RDD求交集後返回一個新的RDD |
distinct([numTasks])) |
對源RDD進行去重後返回一個新的RDD |
groupByKey([numTasks]) |
在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) |
在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一塊兒,與groupByKey相似,reduce任務的個數能夠經過第二個可選的參數來設置 |
aggregateByKey(zeroValue)(seqOp,combOp,[numTasks]) |
|
sortByKey([ascending], [numTasks]) |
在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) |
與sortByKey相似,可是更靈活 |
join(otherDataset, [numTasks]) |
在類型爲(K,V)和(K,W)的RDD上調用,返回一個相同key對應的全部元素對在一塊兒的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) |
在類型爲(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD |
cartesian(otherDataset) |
笛卡爾積 |
pipe(command, [envVars]) |
|
coalesce(numPartitions) |
|
repartition(numPartitions) |
|
repartitionAndSortWithinPartitions(partitioner) |
|
動做 |
含義 |
reduce(func) |
經過func函數彙集RDD中的全部元素,這個功能必須是課交換且可並聯的 |
collect() |
在驅動程序中,以數組的形式返回數據集的全部元素 |
count() |
返回RDD的元素個數 |
first() |
返回RDD的第一個元素(相似於take(1)) |
take(n) |
返回一個由數據集的前n個元素組成的數組 |
takeSample(withReplacement,num, [seed]) |
返回一個數組,該數組由從數據集中隨機採樣的num個元素組成,能夠選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子 |
takeOrdered(n, [ordering]) |
|
saveAsTextFile(path) |
將數據集的元素以textfile的形式保存到HDFS文件系統或者其餘支持的文件系統,對於每一個元素,Spark將會調用toString方法,將它裝換爲文件中的文本 |
saveAsSequenceFile(path) |
將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可使HDFS或者其餘Hadoop支持的文件系統。 |
saveAsObjectFile(path) |
|
countByKey() |
針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每個key對應的元素個數。 |
foreach(func) |
在數據集的每個元素上,運行函數func進行更新。 |
Spark算子示例:
一、RDD的建立方式
經過外部的數據文件建立,如HDFS
val rdd1 = sc.textFile(「hdfs://192.168.88.111:9000/data/data.txt」)
經過sc.parallelize進行建立
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
二、RDD的算子
(1)Transformation算子:
(*)map(func)算子: 將輸入的每一個元素重寫組合成一個元組
val rdd2 = rdd1.map((_,"*"))
乘以10
val rdd2 = rdd1.map((_ * 10))
val rdd2 = rdd1.map((x:Int) = x + 10)
(*)filter(func):返回一個新的RDD,該RDD是通過func運算後返回true的元素
val rdd3 = rdd1.filter(_ > 5)
(*)flatMap(func) 壓平操做
val books = sc.parallelize(List("Hadoop","Hive","HDFS"))
books.flatMap(_.toList).collect
結果:res18: Array[Char] = Array(H, a, d, o, o, p, H, i, v, e, H, D, F, S)
val sen = sc.parallelize(List("I love Beijing","I love China","Beijing is the capital of China"))
(*)union(otherDataset):並集運算,注意類型要一致
val rdd4 = sc.parallelize(List(5,6,4,7))
val rdd5 = sc.parallelize(List(1,2,3,4))
val rdd6 = rdd4.union(rdd5)
(*)intersection(otherDataset):交集
val rdd7 = rdd5.intersection(rdd4)
(*)distinct([numTasks])):去掉重複數據
val rdd8 = sc.parallelize(List(5,6,4,7,5,5,5))
rdd8.distinct.collect
(*)groupByKey([numTasks]) :對於一個<k,v>的RDD,按照Key進行分組
val rdd = sc.parallelize(Array(("I",1),("love",2),("I",3)))
rdd.groupByKey.collect
結果:res38: Array[(String, Iterable[Int])] = Array((love,CompactBuffer(2)), (I,CompactBuffer(1, 3)))
複雜一點的例子:
val sen = sc.parallelize(List("I love Beijing","I love China","Beijing is the capital of China"))
sen.flatMap(_.split(" ")).map((_,1)).groupByKey.collect
(*)reduceByKey(func, [numTasks]):相似於groupByKey,區別是reduceByKey會有一個combiner的過程對每一個分區上的數據先作一次合併
畫圖說明,因此效率更高
(*)cartesian笛卡爾積
val rdd1 = sc.parallelize(List("tom", "jerry"))
val rdd2 = sc.parallelize(List("tom", "kitty", "shuke"))
val rdd3 = rdd1.cartesian(rdd2)
(2)Action算子:
val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
(*)collect
rdd1.collect
(*)reduce
val rdd2 = rdd1.reduce(_+_)
(*)count
rdd1.count
(*)top
rdd1.top(2)
(*)take
rdd1.take(2)
(*)first(similer to take(1))
rdd1.first
(*)takeOrdered
rdd1.takeOrdered(3)
RDD經過persist方法或cache方法能夠將前面的計算結果緩存,可是並非這兩個方法被調用時當即緩存,而是觸發後面的action時,該RDD將會被緩存在計算節點的內存中,並供後面重用。
經過查看源碼發現cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。
緩存有可能丟失,或者存儲存儲於內存的數據因爲內存不足而被刪除,RDD的緩存容錯機制保證了即便緩存丟失也能保證計算的正確執行。經過基於RDD的一系列轉換,丟失的數據會被重算,因爲RDD的各個Partition是相對獨立的,所以只須要計算丟失的部分便可,並不須要重算所有Partition。
l Demo示例:
l 經過UI進行監控:
檢查點(本質是經過將RDD寫入Disk作檢查點)是爲了經過lineage(血統)作容錯的輔助,lineage過長會形成容錯成本太高,這樣就不如在中間階段作檢查點容錯,若是以後有節點出現問題而丟失分區,從作檢查點的RDD開始重作Lineage,就會減小開銷。
設置checkpoint的目錄,能夠是本地的文件夾、也能夠是HDFS。通常是在具備容錯能力,高可靠的文件系統上(好比HDFS, S3等)設置一個檢查點路徑,用於保存檢查點數據。
分別舉例說明:
l 本地目錄
注意:這種模式,須要將spark-shell運行在本地模式上
l HDFS的目錄
注意:這種模式,須要將spark-shell運行在集羣模式上
l 源碼中的一段話
l RDD的依賴關係
RDD和它依賴的父RDD(s)的關係有兩種不一樣的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
總結:窄依賴咱們形象的比喻爲獨生子女
總結:窄依賴咱們形象的比喻爲超生
l Spark任務中的Stage
DAG(Directed Acyclic Graph)叫作有向無環圖,原始的RDD經過一系列的轉換就就造成了DAG,根據RDD之間的依賴關係的不一樣將DAG劃分紅不一樣的Stage,對於窄依賴,partition的轉換處理在Stage中完成計算。對於寬依賴,因爲有Shuffle的存在,只能在parent RDD處理完成後,才能開始接下來的計算,所以寬依賴是劃分Stage的依據。
//經過並行化生成rdd
val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
//對rdd1裏的每個元素乘2而後排序
val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true)
//過濾出大於等於十的元素
val rdd3 = rdd2.filter(_ >= 10)
//將元素以數組的方式在客戶端顯示
rdd3.collect
val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))
//將rdd1裏面的每個元素先切分在壓平
val rdd2 = rdd1.flatMap(_.split(' '))
rdd2.collect
val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//求並集
val rdd3 = rdd1.union(rdd2)
//求交集
val rdd4 = rdd1.intersection(rdd2)
//去重
rdd3.distinct.collect
rdd4.collect
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//求jion
val rdd3 = rdd1.join(rdd2)
rdd3.collect
//求並集
val rdd4 = rdd1 union rdd2
//按key進行分組
rdd4.groupByKey
rdd4.collect
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//cogroup
val rdd3 = rdd1.cogroup(rdd2)
//注意cogroup與groupByKey的區別
rdd3.collect
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))
//reduce聚合
val rdd2 = rdd1.reduce(_ + _)
rdd2.collect
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
val rdd3 = rdd1.union(rdd2)
//按key進行聚合
val rdd4 = rdd3.reduceByKey(_ + _)
rdd4.collect
//按value的降序排序
val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
rdd5.collect
把每一個partition中的分區號和對應的值拿出來
u 接收一個函數參數:
l 第一個參數:分區號
l 第二個參數:分區中的元素
u 示例:將每一個分區中的元素和分區號打印出來。
l val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
l 建立一個函數返回RDD中的每一個分區號和元素:
def func1(index:Int, iter:Iterator[Int]):Iterator[String] ={
iter.toList.map( x => "[PartID:" + index + ", value=" + x + "]" ).iterator
}
l 調用:rdd1.mapPartitionsWithIndex(func1).collect
先對局部聚合,再對全局聚合
示例:val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
u 查看每一個分區中的元素:
u 將每一個分區中的最大值求和,注意:初始值是0;
若是初始值時候10,則結果爲:30
u 若是是求和,注意:初始值是0:
若是初始值是10,則結果是:45
u 一個字符串的例子:
val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
修改一下剛纔的查看分區元素的函數
def func2(index: Int, iter: Iterator[(String)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
兩個分區中的元素:
[partID:0, val: a], [partID:0, val: b], [partID:0, val: c],
[partID:1, val: d], [partID:1, val: e], [partID:1, val: f]
運行結果:
u 更復雜一點的例子
val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
結果多是:」24」,也多是:」42」
val rdd4 = sc.parallelize(List("12","23","345",""),2)
rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
結果是:」10」,也多是」01」,
緣由:注意有個初始值」」,其長度0,而後0.toString變成字符串
val rdd5 = sc.parallelize(List("12","23","","345"),2)
rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
結果是:」11」,緣由同上。
n 準備數據:
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
def func3(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
n 兩個分區中的元素:
n 示例:
l 將每一個分區中的動物最多的個數求和
scala> pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
res69: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
l 將每種動物個數求和
scala> pairRDD.aggregateByKey(0)(_+_, _ + _).collect
res71: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
這個例子也可使用:reduceByKey
scala> pairRDD.reduceByKey(_+_).collect
res73: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
四、coalesce與repartition
n 都是將RDD中的分區進行重分區。
n 區別是:coalesce默認不會進行shuffle(false);而repartition會進行shuffle(true),即:會將數據真正經過網絡進行重分區。
n 示例:
def func4(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
下面兩句話是等價的:
val rdd2 = rdd1.repartition(3)
val rdd3 = rdd1.coalesce(3,true) --->若是是false,查看RDD的length依然是2
參考:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html
l Tomcat的訪問日誌
l 求出訪問量最高的兩個網頁
n 要求顯示:網頁名稱、訪問量
l 根據jsp文件的名字,將各自的訪問日誌放入到不一樣的分區文件中,以下:
n 生成的分區文件
n 例如:part-00000文件中的內容:只包含了web.jsp的訪問日誌
l 將RDD的數據保存到Oracle數據庫中
調用:
l 使用JdbcRDD:執行SQL語句
JdbcRDD參數說明:
參數名稱 |
類型 |
說明 |
sc |
org.apache.spark.SparkContext |
Spark Context對象 |
getConnection |
scala.Function0[java.sql.Connection] |
獲得一個數據庫Connection |
sql |
scala.Predef.String |
執行的SQL語句 |
lowerBound |
scala.Long |
下邊界值,即:SQL的第一個參數 |
upperBound |
scala.Long |
上邊界值,即:SQL的第二個參數 |
numPartitions |
scala.Int |
分區的個數,即:啓動多少個Executor |
mapRow |
scala.Function1[java.sql.ResultSet, T] |
獲得的結果集 |
JdbcRDD的缺點:從上面的參數說明能夠看出,JdbcRDD有如下兩個缺點:
1.執行的SQL必須有兩個參數,並類型都是Long
2.獲得的結果是ResultSet,即:只支持select操做
Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了一個編程抽象叫作DataFrame而且做爲分佈式SQL查詢引擎的做用。
爲何要學習Spark SQL?咱們已經學習了Hive,它是將Hive SQL轉換成MapReduce而後提交到集羣上執行,大大簡化了編寫MapReduce的程序的複雜性,因爲MapReduce這種計算模型執行效率比較慢。因此Spark SQL的應運而生,它是將Spark SQL轉換成RDD,而後提交到集羣執行,執行效率很是快!同時Spark SQL也支持從Hive中讀取數據。
Spark SQL的特色:
l 容易整合(集成)
l 統一的數據訪問方式
l 兼容Hive
l 標準的數據鏈接
u DataFrame
DataFrame是組織成命名列的數據集。它在概念上等同於關係數據庫中的表,但在底層具備更豐富的優化。DataFrames能夠從各類來源構建,
例如:
l 結構化數據文件
l hive中的表
l 外部數據庫或現有RDDs
DataFrame API支持的語言有Scala,Java,Python和R。
從上圖能夠看出,DataFrame多了數據的結構信息,即schema。RDD是分佈式的 Java對象的集合。DataFrame是分佈式的Row對象的集合。DataFrame除了提供了比RDD更豐富的算子之外,更重要的特色是提高執行效率、減小數據讀取以及執行計劃的優化
u Datasets
Dataset是數據的分佈式集合。Dataset是在Spark 1.6中添加的一個新接口,是DataFrame之上更高一級的抽象。它提供了RDD的優勢(強類型化,使用強大的lambda函數的能力)以及Spark SQL優化後的執行引擎的優勢。一個Dataset 能夠從JVM對象構造,而後使用函數轉換(map, flatMap,filter等)去操做。 Dataset API 支持Scala和Java。 Python不支持Dataset API。
使用員工表的數據,並已經將其保存到了HDFS上。
(*)經過Case Class建立DataFrames
① 定義case class(至關於表的結構:Schema)
注意:因爲mgr和comm列中包含null值,簡單起見,將對應的case class類型定義爲String
② 將HDFS上的數據讀入RDD,並將RDD與case Class關聯
③ 將RDD轉換成DataFrames
④ 經過DataFrames查詢數據
(*)使用SparkSession
① 什麼是SparkSession
Apache Spark 2.0引入了SparkSession,其爲用戶提供了一個統一的切入點來使用Spark的各項功能,而且容許用戶經過它調用DataFrame和Dataset相關API來編寫Spark程序。最重要的是,它減小了用戶須要瞭解的一些概念,使得咱們能夠很容易地與Spark交互。
在2.0版本以前,與Spark交互以前必須先建立SparkConf和SparkContext。然而在Spark 2.0中,咱們能夠經過SparkSession來實現一樣的功能,而不須要顯式地建立SparkConf, SparkContext 以及 SQLContext,由於這些對象已經封裝在SparkSession中。
② 建立StructType,來定義Schema結構信息
注意,須要:import org.apache.spark.sql.types._
③ 讀入數據而且切分數據
④ 將RDD中的數據映射成Row
注意,須要:import org.apache.spark.sql.Row
⑤ 建立DataFrames
val df = spark.createDataFrame(rowRDD,myschema)
再舉一個例子,使用JSon文件來建立DataFame
① 源文件:$SPARK_HOME/examples/src/main/resources/people.json
② val df = spark.read.json("源文件")
③ 查看數據和Schema信息
DataFrame操做也稱爲無類型的Dataset操做
(*)查詢全部的員工姓名
(*)查詢全部的員工姓名和薪水,並給薪水加100塊錢
(*)查詢工資大於2000的員工
(*)求每一個部門的員工人數
完整的例子,請參考:
http://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.sql.Dataset
(*)在DataFrame中使用SQL語句
① 將DataFrame註冊成表(視圖):df.createOrReplaceTempView("emp")
② 執行查詢:spark.sql("select * from emp").show
spark.sql("select * from emp where deptno=10").show
spark.sql("select deptno,sum(sal) from emp group by deptno").show
上面使用的是一個在Session生命週期中的臨時views。在Spark SQL中,若是你想擁有一個臨時的view,並想在不一樣的Session中共享,並且在application的運行週期內可用,那麼就須要建立一個全局的臨時view。並記得使用的時候加上global_temp做爲前綴來引用它,由於全局的臨時view是綁定到系統保留的數據庫global_temp上。
① 建立一個普通的view和一個全局的view
df.createOrReplaceTempView("emp1")
df.createGlobalTempView("emp2")
② 在當前會話中執行查詢,都可查詢出結果。
spark.sql("select * from emp1").show
spark.sql("select * from global_temp.emp2").show
③ 開啓一個新的會話,執行一樣的查詢
spark.newSession.sql("select * from emp1").show (運行出錯)
spark.newSession.sql("select * from global_temp.emp2").show
DataFrame的引入,可讓Spark更好的處理結構數據的計算,但其中一個主要的問題是:缺少編譯時類型安全。爲了解決這個問題,Spark採用新的Dataset API (DataFrame API的類型擴展)。
Dataset是一個分佈式的數據收集器。這是在Spark1.6以後新加的一個接口,兼顧了RDD的優勢(強類型,可使用功能強大的lambda)以及Spark SQL的執行器高效性的優勢。因此能夠把DataFrames當作是一種特殊的Datasets,即:Dataset(Row)
(*)建立DataSet,方式一:使用序列
一、定義case class
case class MyData(a:Int,b:String)
二、生成序列,並建立DataSet
val ds = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS
三、查看結果
(*)建立DataSet,方式二:使用JSON數據
一、定義case class
case class Person(name: String, gender: String)
二、經過JSON數據生成DataFrame
val df = spark.read.json(sc.parallelize("""{"gender": "Male", "name": "Tom"}""" :: Nil))
三、將DataFrame轉成DataSet
df.as[Person].show
df.as[Person].collect
(*)建立DataSet,方式三:使用HDFS數據
一、讀取HDFS數據,並建立DataSet
val linesDS = spark.read.text("hdfs://hadoop111:9000/data/data.txt").as[String]
二、對DataSet進行操做:分詞後,查詢長度大於3的單詞
val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3)
words.show
words.collect
三、執行WordCount程序
val result = linesDS.flatMap(_.split(" ")).map((_,1)).groupByKey(x => x._1).count
result.show
排序:result.orderBy($"value").show
(*)使用emp.json 生成DataFrame
val empDF = spark.read.json("/root/resources/emp.json")
查詢工資大於3000的員工
empDF.where($"sal" >= 3000).show
(*)建立case class
case class Emp(empno:Long,ename:String,job:String,hiredate:String,mgr:String,sal:Long,comm:String,deptno:Long)
(*)生成DataSets,並查詢數據
val empDS = empDF.as[Emp]
查詢工資大於3000的員工
empDS.filter(_.sal > 3000).show
查看10號部門的員工
empDS.filter(_.deptno == 10).show
(*)多表查詢
一、建立部門表
val deptRDD=sc.textFile("/root/temp/dept.csv").map(_.split(","))
case class Dept(deptno:Int,dname:String,loc:String)
val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS
二、建立員工表
case class Emp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)
val empRDD = sc.textFile("/root/temp/emp.csv").map(_.split(","))
val empDS = empRDD.map(x => Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt)).toDS
三、執行多表查詢:等值連接
val result = deptDS.join(empDS,"deptno")
另外一種寫法:注意有三個等號
val result = deptDS.joinWith(empDS,deptDS("deptno")=== empDS("deptno"))
joinWith和join的區別是鏈接後的新Dataset的schema會不同
(*)查看執行計劃:result.explain
(*)什麼是parquet文件?
Parquet是列式存儲格式的一種文件類型,列式存儲有如下的核心:
l 能夠跳過不符合條件的數據,只讀取須要的數據,下降IO數據量。
l 壓縮編碼能夠下降磁盤存儲空間。因爲同一列的數據類型是同樣的,可使用更高效的壓縮編碼(例如Run Length Encoding和Delta Encoding)進一步節約存儲空間。
l 只讀取須要的列,支持向量運算,可以獲取更好的掃描性能。
l Parquet格式是Spark SQL的默認數據源,可經過spark.sql.sources.default配置
(*)通用的Load/Save函數
l 讀取Parquet文件
val usersDF = spark.read.load("/root/resources/users.parquet")
l 查詢Schema和數據
l 查詢用戶的name和喜好顏色,並保存
usersDF.select($"name",$"favorite_color").write.save("/root/result/parquet")
l 驗證結果
(*)顯式指定文件格式:加載json格式
l 直接加載:val usersDF = spark.read.load("/root/resources/people.json") 會出錯
l val usersDF = spark.read.format("json").load("/root/resources/people.json")
(*)存儲模式(Save Modes)
能夠採用SaveMode執行存儲操做,SaveMode定義了對數據的處理模式。須要注意的是,這些保存模式不使用任何鎖定,不是原子操做。此外,當使用Overwrite方式執行時,在輸出新數據以前原數據就已經被刪除。SaveMode詳細介紹以下表:
Demo:
l usersDF.select($"name").write.save("/root/result/parquet1")
--> 出錯:由於/root/result/parquet1已經存在
l usersDF.select($"name").write.mode("overwrite").save("/root/result/parquet1")
(*)將結果保存爲表
l usersDF.select($"name").write.saveAsTable("table1")
也能夠進行分區、分桶等操做:partitionBy、bucketBy
Parquet是一個列格式並且用於多個數據處理系統中。Spark SQL提供支持對於Parquet文件的讀寫,也就是自動保存原始數據的schema。當寫Parquet文件時,全部的列被自動轉化爲nullable,由於兼容性的緣故。
(*)案例:
讀入json格式的數據,將其轉換成parquet格式,並建立相應的表來使用SQL進行查詢。
(*)Schema的合併:
Parquet支持Schema evolution(Schema演變,即:合併)。用戶能夠先定義一個簡單的Schema,而後逐漸的向Schema中增長列描述。經過這種方式,用戶能夠獲取多個有不一樣Schema但相互兼容的Parquet文件。
Demo:
Schema的合併
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("/root/myresult/test_table/key=1")
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("/root/myresult/test_table/key=2")
val df3 = spark.read.option("mergeSchema", "true").parquet("/root/myresult/test_table/")
df3.printSchema()
Spark SQL能自動解析JSON數據集的Schema,讀取JSON數據集爲DataFrame格式。讀取JSON數據集方法爲SQLContext.read().json()。該方法將String格式的RDD或JSON文件轉換爲DataFrame。
須要注意的是,這裏的JSON文件不是常規的JSON格式。JSON文件每一行必須包含一個獨立的、自知足有效的JSON對象。若是用多行描述一個JSON對象,會致使讀取出錯。讀取JSON數據集示例以下:
(*)Demo1:使用Spark自帶的示例文件 --> people.json 文件
定義路徑:
val path ="/root/resources/people.json"
讀取Json文件,生成DataFrame:
val peopleDF = spark.read.json(path)
打印Schema結構信息:
peopleDF.printSchema()
建立臨時視圖:
peopleDF.createOrReplaceTempView("people")
執行查詢
spark.sql("SELECT name FROM people WHERE age=19").show
Spark SQL一樣支持經過JDBC讀取其餘數據庫的數據做爲數據源。
Demo演示:使用Spark SQL讀取Oracle數據庫中的表。
l 啓動Spark Shell的時候,指定Oracle數據庫的驅動
spark-shell --master spark://spark81:7077 \\
--jars /root/temp/ojdbc6.jar \\
--driver-class-path /root/temp/ojdbc6.jar
l 讀取Oracle數據庫中的數據
(*)方式一:
val oracleDF = spark.read.format("jdbc").
option("url","jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com").
option("dbtable","scott.emp").
option("user","scott").
option("password","tiger").
load
(*)方式二:
導入須要的類:
import java.util.Properties
定義屬性:
val oracleprops = new Properties()
oracleprops.setProperty("user","scott")
oracleprops.setProperty("password","tiger")
讀取數據:
val oracleEmpDF =spark.read.jdbc("jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com",
"scott.emp",oracleprops)
注意:下面是讀取Oracle 10g(Windows上)的步驟
l 首先,搭建好Hive的環境(須要Hadoop)
l 配置Spark SQL支持Hive
n 只須要將如下文件拷貝到$SPARK_HOME/conf的目錄下,便可
u $HIVE_HOME/conf/hive-site.xml
u $HADOOP_CONF_DIR/core-site.xml
u $HADOOP_CONF_DIR/hdfs-site.xml
l 使用Spark Shell操做Hive
n 啓動Spark Shell的時候,須要使用--jars指定mysql的驅動程序
n 建立表
spark.sql("create table src (key INT, value STRING) row format delimited fields terminated by ','")
n 導入數據
spark.sql("load data local inpath '/root/temp/data.txt' into table src")
n 查詢數據
spark.sql("select * from src").show
l 使用spark-sql操做Hive
n 啓動spark-sql的時候,須要使用--jars指定mysql的驅動程序
n 操做Hive
u spark.sql("show tables").show
u spark.sql("select * from emp1").show
性能調優主要是將數據放入內存中操做。經過spark.cacheTable("tableName")或者dataFrame.cache()。使用spark.uncacheTable("tableName")來從內存中去除table。
u Demo案例:
(*)從Oracle數據庫中讀取數據,生成DataFrame
val oracleDF = spark.read.format("jdbc")
.option("url","jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com")
.option("dbtable","scott.emp")
.option("user","scott")
.option("password","tiger").load
(*)將DataFrame註冊成表: oracleDF.registerTempTable("emp")
(*)執行查詢,並經過Web Console監控執行的時間
spark.sql("select * from emp").show
(*)將表進行緩存,並查詢兩次,並經過Web Console監控執行的時間
spark.sqlContext.cacheTable("emp")
(*)清空緩存:
spark.sqlContext.cacheTable("emp")
spark.sqlContext.clearCache
l 將數據緩存到內存中的相關優化參數
n spark.sql.inMemoryColumnarStorage.compressed
u 默認爲 true
u Spark SQL 將會基於統計信息自動地爲每一列選擇一種壓縮編碼方式。
n spark.sql.inMemoryColumnarStorage.batchSize
u 默認值:10000
u 緩存批處理大小。緩存數據時, 較大的批處理大小能夠提升內存利用率和壓縮率,但同時也會帶來 OOM(Out Of Memory)的風險。
l 其餘性能相關的配置選項(不過不推薦手動修改,可能在後續版本自動的自適應修改)
n spark.sql.files.maxPartitionBytes
u 默認值:128 MB
u 讀取文件時單個分區可容納的最大字節數
n spark.sql.files.openCostInBytes
u 默認值:4M
u 打開文件的估算成本, 按照同一時間可以掃描的字節數來測量。當往一個分區寫入多個文件的時候會使用。高估更好, 這樣的話小文件分區將比大文件分區更快 (先被調度)。
l spark.sql.autoBroadcastJoinThreshold
n 默認值:10M
n 用於配置一個表在執行 join 操做時可以廣播給全部 worker 節點的最大字節大小。經過將這個值設置爲 -1 能夠禁用廣播。注意,當前數據統計僅支持已經運行了 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan 命令的 Hive Metastore 表。
l spark.sql.shuffle.partitions
n 默認值:200
n 用於配置 join 或聚合操做混洗(shuffle)數據時使用的分區數。
Spark Streaming是核心Spark API的擴展,可實現可擴展、高吞吐量、可容錯的實時數據流處理。數據能夠從諸如Kafka,Flume,Kinesis或TCP套接字等衆多來源獲取,而且可使用由高級函數(如map,reduce,join和window)開發的複雜算法進行流數據處理。最後,處理後的數據能夠被推送到文件系統,數據庫和實時儀表板。並且,您還能夠在數據流上應用Spark提供的機器學習和圖處理算法。
在內部,它的工做原理以下。Spark Streaming接收實時輸入數據流,並將數據切分紅批,而後由Spark引擎對其進行處理,最後生成「批」形式的結果流。
Spark Streaming將連續的數據流抽象爲discretizedstream或DStream。在內部,DStream 由一個RDD序列表示。
(1)因爲在本案例中須要使用netcat網絡工具,因此須要先安裝。
rpm -iUv ~/netcat-0.6.1-1.i386.rpm
(2)啓動netcat數據流服務器,並監聽端口:1234
命令:nc -l -p 9999
服務器端:
(3)啓動客戶端
bin/run-example streaming.NetworkWordCount localhost 1234
客戶端:
(必定注意):若是要執行本例,必須確保機器cpu核數大於2
(必定注意):
val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
官方的解釋:
l 初始化StreamingContext
n 方式一:從SparkConf對象中建立
n 從一個現有的SparkContext實例中建立
l 程序中的幾點說明:
n appName參數是應用程序在集羣UI上顯示的名稱。
n master是Spark,Mesos或YARN集羣的URL,或者一個特殊的「local [*]」字符串來讓程序以本地模式運行。
n 當在集羣上運行程序時,不須要在程序中硬編碼master參數,而是使用spark-submit提交應用程序並將master的URL以腳本參數的形式傳入。可是,對於本地測試和單元測試,您能夠經過「local[*]」來運行Spark Streaming程序(請確保本地系統中的cpu核心數夠用)。
n StreamingContext會內在的建立一個SparkContext的實例(全部Spark功能的起始點),你能夠經過ssc.sparkContext訪問到這個實例。
n 批處理的時間窗口長度必須根據應用程序的延遲要求和可用的集羣資源進行設置。
l 請務必記住如下幾點:
n 一旦一個StreamingContextt開始運做,就不能設置或添加新的流計算。
n 一旦一個上下文被中止,它將沒法從新啓動。
n 同一時刻,一個JVM中只能有一個StreamingContext處於活動狀態。
n StreamingContext上的stop()方法也會中止SparkContext。 要僅中止StreamingContext(保持SparkContext活躍),請將stop() 方法的可選參數stopSparkContext設置爲false。
n 只要前一個StreamingContext在下一個StreamingContext被建立以前中止(不中止SparkContext),SparkContext就能夠被重用來建立多個StreamingContext。
l DiscretizedStream或DStream 是Spark Streaming對流式數據的基本抽象。它表示連續的數據流,這些連續的數據流能夠是從數據源接收的輸入數據流,也能夠是經過對輸入數據流執行轉換操做而生成的經處理的數據流。在內部,DStream由一系列連續的RDD表示,以下圖:
l 舉例分析:在以前的NetworkWordCount的例子中,咱們將一行行文本組成的流轉換爲單詞流,具體作法爲:將flatMap操做應用於名爲lines的 DStream中的每一個RDD上,以生成words DStream的RDD。以下圖所示:
可是DStream和RDD也有區別,下面畫圖說明:
最後兩個transformation算子須要重點介紹一下:
n transform(func)
u 經過RDD-to-RDD函數做用於源DStream中的各個RDD,能夠是任意的RDD操做,從而返回一個新的RDD
u 舉例:在NetworkWordCount中,也可使用transform來生成元組對
n updateStateByKey(func)
u 操做容許不斷用新信息更新它的同時保持任意狀態。
l 定義狀態-狀態能夠是任何的數據類型
l 定義狀態更新函數-怎樣利用更新前的狀態和從輸入流裏面獲取的新值更新狀態
l 注意:須要設置檢查點
u 重寫NetworkWordCount程序,累計每一個單詞出現的頻率(注意:累計)
u 輸出結果:
n 注意:若是在IDEA中,不想輸出log4j的日誌信息,能夠將log4j.properties文件(放在src的目錄下)的第一行改成:
log4j.rootCategory=ERROR, console
Spark Streaming還提供了窗口計算功能,容許您在數據的滑動窗口上應用轉換操做。下圖說明了滑動窗口的工做方式:
如圖所示,每當窗口滑過originalDStream時,落在窗口內的源RDD被組合並被執行操做以產生windowed DStream的RDD。在上面的例子中,操做應用於最近3個時間單位的數據,並以2個時間單位滑動。這代表任何窗口操做都須要指定兩個參數。
l 窗口長度(windowlength) - 窗口的時間長度(上圖的示例中爲:3)。
l 滑動間隔(slidinginterval) - 兩次相鄰的窗口操做的間隔(即每次滑動的時間長度)(上圖示例中爲:2)。
這兩個參數必須是源DStream的批間隔的倍數(上圖示例中爲:1)。
咱們以一個例子來講明窗口操做。 假設您但願對以前的單詞計數的示例進行擴展,每10秒鐘對過去30秒的數據進行wordcount。爲此,咱們必須在最近30秒的pairs DStream數據中對(word, 1)鍵值對應用reduceByKey操做。這是經過使用reduceByKeyAndWindow操做完成的。
須要注意:滑動的距離必須是採樣時間的整數倍。
一些常見的窗口操做以下表所示。全部這些操做都用到了上述兩個參數 - windowLength和slideInterval。
u window(windowLength, slideInterval)
l 基於源DStream產生的窗口化的批數據計算一個新的DStream
u countByWindow(windowLength, slideInterval)
l 返回流中元素的一個滑動窗口數
u reduceByWindow(func, windowLength, slideInterval)
l 返回一個單元素流。利用函數func彙集滑動時間間隔的流的元素建立這個單元素流。函數必須是相關聯的以使計算可以正確的並行計算。
u reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
l 應用到一個(K,V)對組成的DStream上,返回一個由(K,V)對組成的新的DStream。每個key的值均由給定的reduce函數彙集起來。注意:在默認狀況下,這個算子利用了Spark默認的併發任務數去分組。你能夠用numTasks參數設置不一樣的任務數
u reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
l 上述reduceByKeyAndWindow() 的更高效的版本,其中使用前一窗口的reduce計算結果遞增地計算每一個窗口的reduce值。這是經過對進入滑動窗口的新數據進行reduce操做,以及「逆減(inverse reducing)」離開窗口的舊數據來完成的。一個例子是當窗口滑動時對鍵對應的值進行「一加一減」操做。可是,它僅適用於「可逆減函數(invertible reduce functions)」,即具備相應「反減」功能的減函數(做爲參數invFunc)。 像reduceByKeyAndWindow同樣,經過可選參數能夠配置reduce任務的數量。 請注意,使用此操做必須啓用檢查點。
u countByValueAndWindow(windowLength, slideInterval, [numTasks])
l 應用到一個(K,V)對組成的DStream上,返回一個由(K,V)對組成的新的DStream。每一個key的值都是它們在滑動窗口中出現的頻率。
輸入DStreams表示從數據源獲取輸入數據流的DStreams。在NetworkWordCount例子中,lines表示輸入DStream,它表明從netcat服務器獲取的數據流。每個輸入流DStream和一個Receiver對象相關聯,這個Receiver從源中獲取數據,並將數據存入內存中用於處理。
輸入DStreams表示從數據源獲取的原始數據流。Spark Streaming擁有兩類數據源:
l 基本源(Basic sources):這些源在StreamingContext API中直接可用。例如文件系統、套接字鏈接、Akka的actor等
l 高級源(Advanced sources):這些源包括Kafka,Flume,Kinesis,Twitter等等。
下面經過具體的案例,詳細說明:
須要注意的是:
① 這些文件具備相同的格式
② 這些文件經過原子移動或重命名文件的方式在dataDirectory建立
③ 若是在文件中追加內容,這些追加的新數據也不會被讀取。
注意:要演示成功,須要在原文件中編輯,而後拷貝一份。
使用streamingContext.queueStream(queueOfRDD)建立基於RDD隊列的DStream,用於調試Spark Streaming應用程序。
輸出操做容許DStream的操做推到如數據庫、文件系統等外部系統中。由於輸出操做其實是容許外部系統消費轉換後的數據,它們觸發的實際操做是DStream轉換。目前,定義了下面幾種輸出操做:
l foreachRDD的設計模式
DStream.foreachRDD是一個強大的原語,發送數據到外部系統中。
出現如下Exception:
緣由是:Connection對象不是一個可被序列化的對象,不能RDD的每一個Worker上運行;即:Connection不能在RDD分佈式環境中的每一個分區上運行,由於不一樣的分區可能運行在不一樣的Worker上。因此須要在每一個RDD分區上單首創建Connection對象。
咱們能夠很方便地使用DataFrames和SQL操做來處理流數據。您必須使用當前的StreamingContext對應的SparkContext建立一個SparkSession。此外,必須這樣作的另外一個緣由是使得應用能夠在driver程序故障時得以從新啓動,這是經過建立一個能夠延遲實例化的單例SparkSession來實現的。
在下面的示例中,咱們使用DataFrames和SQL來修改以前的wordcount示例並對單詞進行計數。咱們將每一個RDD轉換爲DataFrame,並註冊爲臨時表,而後在這張表上執行SQL查詢。
與RDD相似,DStreams還容許開發人員將流數據保留在內存中。也就是說,在DStream上調用persist() 方法會自動將該DStream的每一個RDD保留在內存中。若是DStream中的數據將被屢次計算(例如,相同數據上執行多個操做),這個操做就會頗有用。對於基於窗口的操做,如reduceByWindow和reduceByKeyAndWindow以及基於狀態的操做,如updateStateByKey,數據會默認進行持久化。 所以,基於窗口的操做生成的DStream會自動保存在內存中,而不須要開發人員調用persist()。
對於經過網絡接收數據(例如Kafka,Flume,sockets等)的輸入流,默認持久化級別被設置爲將數據複製到兩個節點進行容錯。
請注意,與RDD不一樣,DStreams的默認持久化級別將數據序列化保存在內存中。
流數據處理程序一般都是全天候運行,所以必須對應用中邏輯無關的故障(例如,系統故障,JVM崩潰等)具備彈性。爲了實現這一特性,Spark Streaming須要checkpoint足夠的信息到容錯存儲系統,以即可以從故障中恢復。
① 通常會對兩種類型的數據使用檢查點:
1) 元數據檢查點(Metadatacheckpointing) - 將定義流計算的信息保存到容錯存儲中(如HDFS)。這用於從運行streaming程序的driver程序的節點的故障中恢復。元數據包括如下幾種:
l 配置(Configuration) - 用於建立streaming應用程序的配置信息。
l DStream操做(DStream operations) - 定義streaming應用程序的DStream操做集合。
l 不完整的batch(Incomplete batches) - jobs還在隊列中但還沒有完成的batch。
2) 數據檢查點(Datacheckpointing) - 將生成的RDD保存到可靠的存儲層。對於一些須要將多個批次之間的數據進行組合的stateful變換操做,設置數據檢查點是必需的。在這些轉換操做中,當前生成的RDD依賴於先前批次的RDD,這致使依賴鏈的長度隨時間而不斷增長,由此也會致使基於血統機制的恢復時間無限增長。爲了不這種狀況,stateful轉換的中間RDD將按期設置檢查點並保存到到可靠的存儲層(例如HDFS)以切斷依賴關係鏈。
總而言之,元數據檢查點主要用於從driver程序故障中恢復,而數據或RDD檢查點在任何使用stateful轉換時是必需要有的。
② 什麼時候啓用檢查點:
對於具備如下任一要求的應用程序,必須啓用檢查點:
1) 使用狀態轉:若是在應用程序中使用updateStateByKey或reduceByKeyAndWindow(具備逆函數),則必須提供檢查點目錄以容許按期保存RDD檢查點。
2) 從運行應用程序的driver程序的故障中恢復:元數據檢查點用於使用進度信息進行恢復。
③ 如何配置檢查點:
能夠經過在一些可容錯、高可靠的文件系統(例如,HDFS,S3等)中設置保存檢查點信息的目錄來啓用檢查點。這是經過使用streamingContext.checkpoint(checkpointDirectory)完成的。設置檢查點後,您就可使用上述的有狀態轉換操做。此外,若是要使應用程序從驅動程序故障中恢復,您應該重寫streaming應用程序以使程序具備如下行爲:
1) 當程序第一次啓動時,它將建立一個新的StreamingContext,設置好全部流數據源,而後調用start()方法。
2) 當程序在失敗後從新啓動時,它將從checkpoint目錄中的檢查點數據從新建立一個StreamingContext。
使用StreamingContext.getOrCreate能夠簡化此行爲
④ 改寫以前的WordCount程序,使得每次計算的結果和狀態都保存到檢查點目錄下
經過查看HDFS中的信息,能夠看到相關的檢查點信息,以下:
須要注意:若是使用Scala IDE(eclipse)開發程序集成Flume的話,直接加入Flume的jar包,會存在如下兩個問題
(1)問題1:
存在兩個scala-library(scala-library-2.11.8.jar和scala-library-2.10.5.jar),刪除第二個。
(2)問題2:
spark-streaming-flume_2.10-2.1.0.jar of SparkProject build path is cross-compiled with an incompatible version of Scala (2.10.0). In case this report is mistaken, this check can be disabled in the compiler preference page.SparkProject Unknown Scala Version Problem
l 基於Flume的Push模式
Flume被用於在Flume agents之間推送數據.在這種方式下,Spark Streaming能夠很方便的創建一個receiver,起到一個Avro agent的做用.Flume能夠將數據推送到改receiver.
n 啓動Spark Streaming程序
n 啓動Flume
n 拷貝日誌文件到/root/training/logs目錄
n 觀察輸出,採集到數據
l 基於Custom Sink的Pull模式
不一樣於Flume直接將數據推送到Spark Streaming中,第二種模式經過如下條件運行一個正常的Flume sink。Flume將數據推送到sink中,而且數據保持buffered狀態。Spark Streaming使用一個可靠的Flume接收器和轉換器從sink拉取數據。只要當數據被接收而且被Spark Streaming備份後,轉換器才運行成功。
這樣,與第一種模式相比,保證了很好的健壯性和容錯能力。然而,這種模式須要爲Flume配置一個正常的sink。
如下爲配置步驟:
n 將Spark的jar包拷貝到Flume的lib目錄下
n 下面的這個jar包也須要拷貝到Flume的lib目錄下,同時加入IDEA工程的classpath
n 啓動Flume
n 在IDEA中啓動FlumeLogPull
n 將測試數據拷貝到/root/training/logs
n 觀察IDEA中的輸出
Apache Kafka是一種高吞吐量的分佈式發佈訂閱消息系統。
搭建ZooKeeper(Standalone):
(*)配置/root/training/zookeeper-3.4.10/conf/zoo.cfg文件
dataDir=/root/training/zookeeper-3.4.10/tmp
server.1=spark81:2888:3888
(*)在/root/training/zookeeper-3.4.10/tmp目錄下建立一個myid的空文件
echo 1 > /root/training/zookeeper-3.4.6/tmp/myid
搭建Kafka環境(單機單broker):
(*)修改server.properties文件
(*)啓動Kafka
bin/kafka-server-start.sh config/server.properties &
出現如下錯誤:
須要修改bin/kafka-run-class.sh文件,將這個選項註釋掉。
(*)測試Kafka
l 建立Topic
bin/kafka-topics.sh --create --zookeeper spark81:2181 -replication-factor 1 --partitions 3 --topic mydemo1
l 發送消息
bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1
l 接收消息
bin/kafka-console-consumer.sh --zookeeper spark81:2181 --topic mydemo1
l 搭建Spark Streaming和Kafka的集成開發環境
因爲Spark Streaming和Kafka集成的時候,依賴的jar包比較多,並且還會產生衝突。強烈建議使用Maven的方式來搭建項目工程。
下面是依賴的pom.xml文件:
l 基於Receiver的方式
這個方法使用了Receivers來接收數據。Receivers的實現使用到Kafka高層次的消費者API。對於全部的Receivers,接收到的數據將會保存在Spark executors中,而後由Spark Streaming啓動的Job來處理這些數據。
n 啓動Kafka消息的生產者
bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1
n 在IDEA中啓動任務,接收Kafka消息
l 直接讀取方式
和基於Receiver接收數據不同,這種方式按期地從Kafka的topic+partition中查詢最新的偏移量,再根據定義的偏移量範圍在每一個batch裏面處理數據。看成業須要處理的數據來臨時,spark經過調用Kafka的簡單消費者API讀取必定範圍的數據。
n 啓動Kafka消息的生產者
bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1
n 在IDEA中啓動任務,接收Kafka消息
在Spark中有幾個優化能夠減小批處理的時間:
① 數據接收的並行水平
經過網絡(如kafka,flume,socket等)接收數據須要這些數據反序列化並被保存到Spark中。若是數據接收成爲系統的瓶頸,就要考慮並行地接收數據。注意,每一個輸入DStream建立一個receiver(運行在worker機器上)接收單個數據流。建立多個輸入DStream並配置它們能夠從源中接收不一樣分區的數據流,從而實現多數據流接收。例如,接收兩個topic數據的單個輸入DStream能夠被切分爲兩個kafka輸入流,每一個接收一個topic。這將在兩個worker上運行兩個receiver,所以容許數據並行接收,提升總體的吞吐量。多個DStream能夠被合併生成單個DStream,這樣運用在單個輸入DStream的transformation操做能夠運用在合併的DStream上。
② 數據處理的並行水平
若是運行在計算stage上的併發任務數不足夠大,就不會充分利用集羣的資源。默認的併發任務數經過配置屬性來肯定spark.default.parallelism。
③ 數據序列化
能夠經過改變序列化格式來減小數據序列化的開銷。在流式傳輸的狀況下,有兩種類型的數據會被序列化:
l 輸入數據
l 由流操做生成的持久RDD
在上述兩種狀況下,使用Kryo序列化格式能夠減小CPU和內存開銷。
爲了Spark Streaming應用程序可以在集羣中穩定運行,系統應該可以以足夠的速度處理接收的數據(即處理速度應該大於或等於接收數據的速度)。這能夠經過流的網絡UI觀察獲得。批處理時間應該小於批間隔時間。
根據流計算的性質,批間隔時間可能顯著的影響數據處理速率,這個速率能夠經過應用程序維持。能夠考慮WordCountNetwork這個例子,對於一個特定的數據處理速率,系統可能能夠每2秒打印一次單詞計數(批間隔時間爲2秒),但沒法每500毫秒打印一次單詞計數。因此,爲了在生產環境中維持指望的數據處理速率,就應該設置合適的批間隔時間(即批數據的容量)。
找出正確的批容量的一個好的辦法是用一個保守的批間隔時間(5-10,秒)和低數據速率來測試你的應用程序。
在這一節,咱們重點介紹幾個強烈推薦的自定義選項,它們能夠減小Spark Streaming應用程序垃圾回收的相關暫停,得到更穩定的批處理時間。
l Default persistence level of DStreams:和RDDs不一樣的是,默認的持久化級別是序列化數據到內存中(DStream是StorageLevel.MEMORY_ONLY_SER,RDD是StorageLevel.MEMORY_ONLY)。即便保存數據爲序列化形態會增長序列化/反序列化的開銷,可是能夠明顯的減小垃圾回收的暫停。
l Clearing persistent RDDs:默認狀況下,經過Spark內置策略(LUR),Spark Streaming生成的持久化RDD將會從內存中清理掉。若是spark.cleaner.ttl已經設置了,比這個時間存在更老的持久化RDD將會被定時的清理掉。正如前面提到的那樣,這個值須要根據Spark Streaming應用程序的操做當心設置。然而,能夠設置配置選項spark.streaming.unpersist爲true來更智能的去持久化(unpersist)RDD。這個配置使系統找出那些不須要常常保有的RDD,而後去持久化它們。這能夠減小Spark RDD的內存使用,也可能改善垃圾回收的行爲。
l Concurrent garbage collector:使用併發的標記-清除垃圾回收能夠進一步減小垃圾回收的暫停時間。儘管併發的垃圾回收會減小系統的總體吞吐量,可是仍然推薦使用它以得到更穩定的批處理時間。