Spark Streaming是核心Spark API的擴展,可實現可擴展、高吞吐量、可容錯的實時數據流處理。數據能夠從諸如Kafka,Flume,Kinesis或TCP套接字等衆多來源獲取,而且可使用由高級函數(如map,reduce,join和window)開發的複雜算法進行流數據處理。最後,處理後的數據能夠被推送到文件系統,數據庫和實時儀表板。並且,您還能夠在數據流上應用Spark提供的機器學習和圖處理算法。html
便於使用算法
經過高級操做員構建應用程序。sql
Spark Streaming將Apache Spark的 語言集成API 引入流處理,使您能夠像編寫批處理做業同樣編寫流式做業。它支持Java,Scala和Python。數據庫
容錯apache
開箱即用的有狀態的一次性語義。設計模式
Spark Streaming能夠開箱即用,恢復丟失的工做和操做員狀態(例如滑動窗口),而無需任何額外的代碼。
緩存
Spark集成性能優化
將流式傳輸與批量和交互式查詢相結合。服務器
經過在Spark上運行,Spark Streaming容許您重複使用相同的代碼進行批處理,將流加入歷史數據,或者在流狀態下運行即席查詢。構建強大的交互式應用程序,而不僅是分析。網絡
在內部,它的工做原理以下。Spark Streaming接收實時輸入數據流,並將數據切分紅批,而後由Spark引擎對其進行處理,最後生成「批」形式的結果流。
Spark Streaming將連續的數據流抽象爲discretizedstream或DStream。在內部,DStream 由一個RDD序列表示。
(1)因爲在本案例中須要使用netcat網絡工具,因此須要先安裝。
rpm -iUv ~/netcat-0.6.1-1.i386.rpm
(2)啓動netcat數據流服務器,並監聽端口:1234
命令:nc -l -p 9999
服務器端:
(3)啓動客戶端
bin/run-example streaming.NetworkWordCount localhost 1234
客戶端:
(必定注意):若是要執行本例,必須確保機器cpu核數大於 2
(必定注意):
val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
官方的解釋:
初始化StreamingContext
方式一:從SparkConf對象中建立
從一個現有的SparkContext實例中建立
程序中的幾點說明:
appName參數是應用程序在集羣UI上顯示的名稱。
master是Spark,Mesos或YARN集羣的URL,或者一個特殊的「local [*]」字符串來讓程序以本地模式運行。
當在集羣上運行程序時,不須要在程序中硬編碼master參數,而是使用spark-submit提交應用程序並將master的URL以腳本參數的形式傳入。可是,對於本地測試和單元測試,您能夠經過「local[*]」來運行Spark Streaming程序(請確保本地系統中的cpu核心數夠用)。
StreamingContext會內在的建立一個SparkContext的實例(全部Spark功能的起始點),你能夠經過ssc.sparkContext訪問到這個實例。
批處理的時間窗口長度必須根據應用程序的延遲要求和可用的集羣資源進行設置。
請務必記住如下幾點:
一旦一個StreamingContextt開始運做,就不能設置或添加新的流計算。
一旦一個上下文被中止,它將沒法從新啓動。
同一時刻,一個JVM中只能有一個StreamingContext處於活動狀態。
StreamingContext上的stop()方法也會中止SparkContext。 要僅中止StreamingContext(保持SparkContext活躍),請將stop() 方法的可選參數stopSparkContext設置爲false。
只要前一個StreamingContext在下一個StreamingContext被建立以前中止(不中止SparkContext),SparkContext就能夠被重用來建立多個StreamingContext。
l DiscretizedStream或DStream 是Spark Streaming對流式數據的基本抽象。它表示連續的數據流,這些連續的數據流能夠是從數據源接收的輸入數據流,也能夠是經過對輸入數據流執行轉換操做而生成的經處理的數據流。在內部,DStream由一系列連續的RDD表示,以下圖:
l 舉例分析:在以前的NetworkWordCount的例子中,咱們將一行行文本組成的流轉換爲單詞流,具體作法爲:將flatMap操做應用於名爲lines的 DStream中的每一個RDD上,以生成words DStream的RDD。以下圖所示:
可是DStream和RDD也有區別,下面畫圖說明:
最後兩個transformation算子須要重點介紹一下:
1.transform(func)
經過RDD-to-RDD函數做用於源DStream中的各個RDD,能夠是任意的RDD操做,從而返回一個新的RDD
舉例:在NetworkWordCount中,也可使用transform來生成元組對
2.updateStateByKey(func)
操做容許不斷用新信息更新它的同時保持任意狀態。
定義狀態-狀態能夠是任何的數據類型
定義狀態更新函數-怎樣利用更新前的狀態和從輸入流裏面獲取的新值更新狀態
重寫NetworkWordCount程序,累計每一個單詞出現的頻率(注意:累計)
package demo import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} object MyTotalNetworkWordCount { def main(args: Array[String]): Unit = { //建立一個Context對象: StreamingContext (SparkContext, SQLContext) //指定批處理的時間間隔 val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]") val ssc = new StreamingContext(conf,Seconds(5)) //設置檢查點 ssc.checkpoint("hdfs://192.168.157.11:9000/spark/checkpoint") //建立一個DStream,處理數據 val lines = ssc.socketTextStream("192.168.157.81",7788,StorageLevel.MEMORY_AND_DISK_SER) //執行wordcount val words = lines.flatMap(_.split(" ")) //定義函數用於累計每一個單詞的總頻率 val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => { //經過Spark內部的reduceByKey按key規約,而後這裏傳入某key當前批次的Seq/List,再計算當前批次的總和 val currentCount = currValues.sum // 已累加的值 val previousCount = prevValueState.getOrElse(0) // 返回累加後的結果,是一個Option[Int]類型 Some(currentCount + previousCount) } val pairs = words.map(word => (word, 1)) val totalWordCounts = pairs.updateStateByKey[Int](addFunc) totalWordCounts.print() ssc.start() ssc.awaitTermination() } }
輸出結果:
n 注意:若是在IDEA中,不想輸出log4j的日誌信息,能夠將log4j.properties文件(放在src的目錄下)的第一行改成:
log4j.rootCategory=ERROR, console
Spark Streaming還提供了窗口計算功能,容許您在數據的滑動窗口上應用轉換操做。下圖說明了滑動窗口的工做方式:
如圖所示,每當窗口滑過originalDStream時,落在窗口內的源RDD被組合並被執行操做以產生windowed DStream的RDD。在上面的例子中,操做應用於最近3個時間單位的數據,並以2個時間單位滑動。這代表任何窗口操做都須要指定兩個參數。
窗口長度(windowlength) - 窗口的時間長度(上圖的示例中爲:3)。
滑動間隔(slidinginterval) - 兩次相鄰的窗口操做的間隔(即每次滑動的時間長度)(上圖示例中爲:2)。
這兩個參數必須是源DStream的批間隔的倍數(上圖示例中爲:1)。
咱們以一個例子來講明窗口操做。 假設您但願對以前的單詞計數的示例進行擴展,每10秒鐘對過去30秒的數據進行wordcount。爲此,咱們必須在最近30秒的pairs DStream數據中對(word, 1)鍵值對應用reduceByKey操做。這是經過使用reduceByKeyAndWindow操做完成的。
一些常見的窗口操做以下表所示。全部這些操做都用到了上述兩個參數 - windowLength和slideInterval。
window(windowLength, slideInterval)
基於源DStream產生的窗口化的批數據計算一個新的DStream
countByWindow(windowLength, slideInterval)
返回流中元素的一個滑動窗口數
reduceByWindow(func, windowLength, slideInterval)
返回一個單元素流。利用函數func彙集滑動時間間隔的流的元素建立這個單元素流。函數必須是相關聯的以使計算可以正確的並行計算。
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
應用到一個(K,V)對組成的DStream上,返回一個由(K,V)對組成的新的DStream。每個key的值均由給定的reduce函數彙集起來。注意:在默認狀況下,這個算子利用了Spark默認的併發任務數去分組。你能夠用numTasks參數設置不一樣的任務數
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
上述reduceByKeyAndWindow() 的更高效的版本,其中使用前一窗口的reduce計算結果遞增地計算每一個窗口的reduce值。這是經過對進入滑動窗口的新數據進行reduce操做,以及「逆減(inverse reducing)」離開窗口的舊數據來完成的。一個例子是當窗口滑動時對鍵對應的值進行「一加一減」操做。可是,它僅適用於「可逆減函數(invertible reduce functions)」,即具備相應「反減」功能的減函數(做爲參數invFunc)。 像reduceByKeyAndWindow同樣,經過可選參數能夠配置reduce任務的數量。 請注意,使用此操做必須啓用檢查點。
countByValueAndWindow(windowLength, slideInterval, [numTasks])
應用到一個(K,V)對組成的DStream上,返回一個由(K,V)對組成的新的DStream。每一個key的值都是它們在滑動窗口中出現的頻率。
輸入DStreams表示從數據源獲取輸入數據流的DStreams。在NetworkWordCount例子中,lines表示輸入DStream,它表明從netcat服務器獲取的數據流。每個輸入流DStream和一個Receiver對象相關聯,這個Receiver從源中獲取數據,並將數據存入內存中用於處理。
輸入DStreams表示從數據源獲取的原始數據流。Spark Streaming擁有兩類數據源:
基本源(Basic sources):這些源在StreamingContext API中直接可用。例如文件系統、套接字鏈接、Akka的actor等
高級源(Advanced sources):這些源包括Kafka,Flume,Kinesis,Twitter等等。
下面經過具體的案例,詳細說明:
須要注意的是:
① 這些文件具備相同的格式
② 這些文件經過原子移動或重命名文件的方式在dataDirectory建立
③ 若是在文件中追加內容,這些追加的新數據也不會被讀取。
注意:要演示成功,須要在原文件中編輯,而後拷貝一份。
RDD隊列流
使用streamingContext.queueStream(queueOfRDD)建立基於RDD隊列的DStream,用於調試Spark Streaming應用程序。
package demo import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable object RDDQueueStream { def main(args: Array[String]) { val conf = new SparkConf().setMaster("local").setAppName("queueStream") //每1秒對數據進行處理 val ssc = new StreamingContext(conf,Seconds(1)) //建立一個可以push到QueueInputDStream的RDDs隊列 val rddQueue = new mutable.Queue[RDD[Int]]() //基於一個RDD隊列建立一個輸入源 val inputStream = ssc.queueStream(rddQueue) //將接收到的數據乘以10 val mappedStream = inputStream.map(x => (x,x*10)) mappedStream.print() ssc.start() for(i <- 1 to 3){ rddQueue += ssc.sparkContext.makeRDD(1 to 10) //建立RDD,並分配兩個核數 Thread.sleep(1000) } ssc.stop() } }
套接字流:經過監聽Socket端口來接收數據
package demo import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} object ScoketStreaming { def main(args: Array[String]) { //建立一個本地的StreamingContext,含2個工做線程 val conf = new SparkConf().setMaster("local[4]").setAppName("ScoketStreaming") val sc = new StreamingContext(conf,Seconds(5)) //每隔10秒統計一次字符總數 //建立珍一個DStream,鏈接127.0.0.1 :7788 val lines = sc.socketTextStream("127.0.0.1",7788) //打印數據 lines.print() sc.start() //開始計算 sc.awaitTermination() //經過手動終止計算,不然一直運行下去 } }
輸出操做容許DStream的操做推到如數據庫、文件系統等外部系統中。由於輸出操做其實是容許外部系統消費轉換後的數據,它們觸發的實際操做是DStream轉換。目前,定義了下面幾種輸出操做:
foreachRDD的設計模式
DStream.foreachRDD是一個強大的原語,發送數據到外部系統中。
第一步:建立鏈接,將數據寫入外部數據庫(使用以前的NetworkWordCount,改寫以前輸出結果的部分,以下)
出現如下Exception:
緣由是:Connection對象不是一個可被序列化的對象,不能RDD的每一個Worker上運行;即:Connection不能在RDD分佈式環境中的每一個分區上運行,由於不一樣的分區可能運行在不一樣的Worker上。因此須要在每一個RDD分區上單首創建Connection對象。
第二步:在每一個RDD分區上單首創建Connection對象,以下:
咱們能夠很方便地使用DataFrames和SQL操做來處理流數據。您必須使用當前的StreamingContext對應的SparkContext建立一個SparkSession。此外,必須這樣作的另外一個緣由是使得應用能夠在driver程序故障時得以從新啓動,這是經過建立一個能夠延遲實例化的單例SparkSession來實現的。
在下面的示例中,咱們使用DataFrames和SQL來修改以前的wordcount示例並對單詞進行計數。咱們將每一個RDD轉換爲DataFrame,並註冊爲臨時表,而後在這張表上執行SQL查詢。
package demo import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} object MyNetworkWordCountDataFrame { def main(args: Array[String]): Unit = { //建立一個Context對象: StreamingContext (SparkContext, SQLContext) //指定批處理的時間間隔 val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(5)) //建立一個DStream,處理數據 val lines = ssc.socketTextStream("192.168.157.81", 7788, StorageLevel.MEMORY_AND_DISK_SER) //執行wordcount val words = lines.flatMap(_.split(" ")) //使用Spark SQL來查詢Spark Streaming處理的數據 words.foreachRDD { rdd => //使用單列模式,建立SparkSession對象 val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate() import spark.implicits._ // 將RDD[String]轉換爲DataFrame val wordsDataFrame = rdd.toDF("word") // 建立臨時視圖 wordsDataFrame.createOrReplaceTempView("words") // 執行SQL val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() } //啓動StreamingContext ssc.start() //等待計算完成 ssc.awaitTermination() } }
與RDD相似,DStreams還容許開發人員將流數據保留在內存中。也就是說,在DStream上調用persist() 方法會自動將該DStream的每一個RDD保留在內存中。若是DStream中的數據將被屢次計算(例如,相同數據上執行多個操做),這個操做就會頗有用。對於基於窗口的操做,如reduceByWindow和reduceByKeyAndWindow以及基於狀態的操做,如updateStateByKey,數據會默認進行持久化。 所以,基於窗口的操做生成的DStream會自動保存在內存中,而不須要開發人員調用persist()。
對於經過網絡接收數據(例如Kafka,Flume,sockets等)的輸入流,默認持久化級別被設置爲將數據複製到兩個節點進行容錯。
請注意,與RDD不一樣,DStreams的默認持久化級別將數據序列化保存在內存中。
流數據處理程序一般都是全天候運行,所以必須對應用中邏輯無關的故障(例如,系統故障,JVM崩潰等)具備彈性。爲了實現這一特性,Spark Streaming須要checkpoint足夠的信息到容錯存儲系統,以即可以從故障中恢復。
① 通常會對兩種類型的數據使用檢查點:
1) 元數據檢查點(Metadatacheckpointing) - 將定義流計算的信息保存到容錯存儲中(如HDFS)。這用於從運行streaming程序的driver程序的節點的故障中恢復。元數據包括如下幾種:
l 配置(Configuration) - 用於建立streaming應用程序的配置信息。
l DStream操做(DStream operations) - 定義streaming應用程序的DStream操做集合。
l 不完整的batch(Incomplete batches) - jobs還在隊列中但還沒有完成的batch。
2) 數據檢查點(Datacheckpointing) - 將生成的RDD保存到可靠的存儲層。對於一些須要將多個批次之間的數據進行組合的stateful變換操做,設置數據檢查點是必需的。在這些轉換操做中,當前生成的RDD依賴於先前批次的RDD,這致使依賴鏈的長度隨時間而不斷增長,由此也會致使基於血統機制的恢復時間無限增長。爲了不這種狀況,stateful轉換的中間RDD將按期設置檢查點並保存到到可靠的存儲層(例如HDFS)以切斷依賴關係鏈。
總而言之,元數據檢查點主要用於從driver程序故障中恢復,而數據或RDD檢查點在任何使用stateful轉換時是必需要有的。
② 什麼時候啓用檢查點:
對於具備如下任一要求的應用程序,必須啓用檢查點:
1) 使用狀態轉:若是在應用程序中使用updateStateByKey或reduceByKeyAndWindow(具備逆函數),則必須提供檢查點目錄以容許按期保存RDD檢查點。
2) 從運行應用程序的driver程序的故障中恢復:元數據檢查點用於使用進度信息進行恢復。
③ 如何配置檢查點:
能夠經過在一些可容錯、高可靠的文件系統(例如,HDFS,S3等)中設置保存檢查點信息的目錄來啓用檢查點。這是經過使用streamingContext.checkpoint(checkpointDirectory)完成的。設置檢查點後,您就可使用上述的有狀態轉換操做。此外,若是要使應用程序從驅動程序故障中恢復,您應該重寫streaming應用程序以使程序具備如下行爲:
1) 當程序第一次啓動時,它將建立一個新的StreamingContext,設置好全部流數據源,而後調用start()方法。
2) 當程序在失敗後從新啓動時,它將從checkpoint目錄中的檢查點數據從新建立一個StreamingContext。
使用StreamingContext.getOrCreate能夠簡化此行爲
④ 改寫以前的WordCount程序,使得每次計算的結果和狀態都保存到檢查點目錄下
package demo import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} object MyCheckpointNetworkWordCount { def main(args: Array[String]): Unit = { //在主程序中,建立一個Streaming Context對象 //一、讀取一個檢查點的目錄 //二、若是該目錄下已經存有以前的檢查點信息,從已有的信息上建立這個Streaming Context對象 //三、若是該目錄下沒有信息,建立一個新的Streaming Context val context = StreamingContext.getOrCreate("hdfs://192.168.157.111:9000/spark_checkpoint",createStreamingContext) //啓動任務 context.start() context.awaitTermination() } //建立一個StreamingContext對象,而且設置檢查點目錄,執行WordCount程序(記錄以前的狀態信息) def createStreamingContext():StreamingContext = { val conf = new SparkConf().setAppName("MyCheckpointNetworkWordCount").setMaster("local[2]") //建立這個StreamingContext對象 val ssc = new StreamingContext(conf,Seconds(3)) //設置檢查點目錄 ssc.checkpoint("hdfs://192.168.157.111:9000/spark_checkpoint") //建立一個DStream,執行WordCount val lines = ssc.socketTextStream("192.168.157.81",7788,StorageLevel.MEMORY_AND_DISK_SER) //分詞操做 val words = lines.flatMap(_.split(" ")) //每一個單詞記一次數 val wordPair = words.map(x=> (x,1)) //執行單詞計數 //定義一個新的函數:把當前的值跟以前的結果進行一個累加 val addFunc = (currValues:Seq[Int],preValueState:Option[Int]) => { //當前當前批次的值 val currentCount = currValues.sum //獲得已經累加的值。若是是第一次求和,以前沒有數值,從0開始計數 val preValueCount = preValueState.getOrElse(0) //進行累加,而後累加後結果,是Option[Int] Some(currentCount + preValueCount) } //要把新的單詞個數跟以前的結果進行疊加(累計) val totalCount = wordPair.updateStateByKey[Int](addFunc) //輸出結果 totalCount.print() //返回這個對象 ssc } }
經過查看HDFS中的信息,能夠看到相關的檢查點信息,以下:
基於Flume的Push模式
Flume被用於在Flume agents之間推送數據.在這種方式下,Spark Streaming能夠很方便的創建一個receiver,起到一個Avro agent的做用.Flume能夠將數據推送到改receiver.
#bin/flume-ng agent -n a4 -f myagent/a4.conf -c conf -Dflume.root.logger=INFO,console #定義agent名, source、channel、sink的名稱 a4.sources = r1 a4.channels = c1 a4.sinks = k1 #具體定義source a4.sources.r1.type = spooldir a4.sources.r1.spoolDir = /root/training/logs #具體定義channel a4.channels.c1.type = memory a4.channels.c1.capacity = 10000 a4.channels.c1.transactionCapacity = 100 #具體定義sink a4.sinks = k1 a4.sinks.k1.type = avro a4.sinks.k1.channel = c1 a4.sinks.k1.hostname = 192.168.157.1 a4.sinks.k1.port = 1234 #組裝source、channel、sink a4.sources.r1.channels = c1 a4.sinks.k1.channel = c1
package demo import org.apache.spark.SparkConf import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.streaming.{Seconds, StreamingContext} object MyFlumeStream { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(5)) //建立FlumeEvent的DStream val flumeEvent = FlumeUtils.createStream(ssc,"192.168.157.1",1234) //將FlumeEvent中的事件轉成字符串 val lineDStream = flumeEvent.map( e => { new String(e.event.getBody.array) }) //輸出結果 lineDStream.print() ssc.start() ssc.awaitTermination(); } }
啓動Spark Streaming程序
啓動Flume
拷貝日誌文件到/root/training/logs目錄
觀察輸出,採集到數據
基於Custom Sink的Pull模式
不一樣於Flume直接將數據推送到Spark Streaming中,第二種模式經過如下條件運行一個正常的Flume sink。Flume將數據推送到sink中,而且數據保持buffered狀態。Spark Streaming使用一個可靠的Flume接收器和轉換器從sink拉取數據。只要當數據被接收而且被Spark Streaming備份後,轉換器才運行成功。
這樣,與第一種模式相比,保證了很好的健壯性和容錯能力。然而,這種模式須要爲Flume配置一個正常的sink。
如下爲配置步驟:
#bin/flume-ng agent -n a1 -f myagent/a1.conf -c conf -Dflume.root.logger=INFO,console a1.channels = c1 a1.sinks = k1 a1.sources = r1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /root/training/logs a1.channels.c1.type = memory a1.channels.c1.capacity = 100000 a1.channels.c1.transactionCapacity = 100000 a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 192.168.157.81 a1.sinks.k1.port = 1234 #組裝source、channel、sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
package demo import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.streaming.{Seconds, StreamingContext} object FlumeLogPull { def main(args: Array[String]) { val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(10)) //建立FlumeEvent的DStream val flumeEvent = FlumeUtils.createPollingStream(ssc,"192.168.157.81",1234,StorageLevel.MEMORY_ONLY_SER_2) //將FlumeEvent中的事件轉成字符串 val lineDStream = flumeEvent.map( e => { new String(e.event.getBody.array) }) //輸出結果 lineDStream.print() ssc.start() ssc.awaitTermination(); } }
spark-streaming-flume-sink_2.10-2.1.0.jar
將Spark的jar包拷貝到Flume的lib目錄下
下面的這個jar包也須要拷貝到Flume的lib目錄下,同時加入IDEA工程的classpath
啓動Flume
在IDEA中啓動FlumeLogPull
將測試數據拷貝到/root/training/logs
觀察IDEA中的輸出
Apache Kafka是一種高吞吐量的分佈式發佈訂閱消息系統。
搭建ZooKeeper(Standalone):
(*)配置/root/training/zookeeper-3.4.10/conf/zoo.cfg文件
dataDir=/root/training/zookeeper-3.4.10/tmp
server.1=spark81:2888:3888
(*)在/root/training/zookeeper-3.4.10/tmp目錄下建立一個myid的空文件
echo 1 > /root/training/zookeeper-3.4.6/tmp/myid
搭建Kafka環境(單機單broker):
(*)修改server.properties文件
(*)啓動Kafka
bin/kafka-server-start.sh config/server.properties &
出現如下錯誤:
須要修改bin/kafka-run-class.sh文件,將這個選項註釋掉。
(*)測試Kafka
建立Topic
bin/kafka-topics.sh --create --zookeeper spark81:2181 -replication-factor 1 --partitions 3 --topic mydemo1
發送消息
bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1
接收消息
bin/kafka-console-consumer.sh --zookeeper spark81:2181 --topic mydemo1
搭建Spark Streaming和Kafka的集成開發環境
因爲Spark Streaming和Kafka集成的時候,依賴的jar包比較多,並且還會產生衝突。強烈建議使用Maven的方式來搭建項目工程。
下面是依賴的pom.xml文件:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>ZDemo5</groupId> <artifactId>ZDemo5</artifactId> <version>1.0-SNAPSHOT</version> <properties> <spark.version>2.1.0</spark.version> <scala.version>2.11</scala.version> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.1.1</version> </dependency> </dependencies> </project>
基於Receiver的方式
這個方法使用了Receivers來接收數據。Receivers的實現使用到Kafka高層次的消費者API。對於全部的Receivers,接收到的數據將會保存在Spark executors中,而後由Spark Streaming啓動的Job來處理這些數據。
package demo import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} object KafkaWordCount { def main(args: Array[String]) { val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(10)) //建立topic名稱,1表示一次從這個topic中獲取一條記錄 val topics = Map("mydemo1" ->1) //建立Kafka的輸入流,指定ZooKeeper的地址 val kafkaStream = KafkaUtils.createStream(ssc,"192.168.157.81:2181","mygroup",topics) //處理每次接收到的數據 val lineDStream = kafkaStream.map(e => { new String(e.toString()) }) //輸出結果 lineDStream.print() ssc.start() ssc.awaitTermination(); } }
啓動Kafka消息的生產者
bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1
在IDEA中啓動任務,接收Kafka消息
l 直接讀取方式
和基於Receiver接收數據不同,這種方式按期地從Kafka的topic+partition中查詢最新的偏移量,再根據定義的偏移量範圍在每一個batch裏面處理數據。看成業須要處理的數據來臨時,spark經過調用Kafka的簡單消費者API讀取必定範圍的數據。
package demo import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka.KafkaUtils object DirectKafkaWordCount { def main(args: Array[String]) { val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(10)) //建立topic名稱,1表示一次從這個topic中獲取一條記錄 val topics = Set("mydemo1") //指定Kafka的broker地址 val kafkaParams = Map[String, String]("metadata.broker.list" -> "192.168.157.81:9092") //建立DStream,接收Kafka的數據 val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) //處理每次接收到的數據 val lineDStream = kafkaStream.map(e => { new String(e.toString()) }) //輸出結果 lineDStream.print() ssc.start() ssc.awaitTermination(); } }
啓動Kafka消息的生產者
bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1
在IDEA中啓動任務,接收Kafka消息
在Spark中有幾個優化能夠減小批處理的時間:
① 數據接收的並行水平
經過網絡(如kafka,flume,socket等)接收數據須要這些數據反序列化並被保存到Spark中。若是數據接收成爲系統的瓶頸,就要考慮並行地接收數據。注意,每一個輸入DStream建立一個receiver(運行在worker機器上)接收單個數據流。建立多個輸入DStream並配置它們能夠從源中接收不一樣分區的數據流,從而實現多數據流接收。例如,接收兩個topic數據的單個輸入DStream能夠被切分爲兩個kafka輸入流,每一個接收一個topic。這將在兩個worker上運行兩個receiver,所以容許數據並行接收,提升總體的吞吐量。多個DStream能夠被合併生成單個DStream,這樣運用在單個輸入DStream的transformation操做能夠運用在合併的DStream上。
② 數據處理的並行水平
若是運行在計算stage上的併發任務數不足夠大,就不會充分利用集羣的資源。默認的併發任務數經過配置屬性來肯定spark.default.parallelism。
③ 數據序列化
能夠經過改變序列化格式來減小數據序列化的開銷。在流式傳輸的狀況下,有兩種類型的數據會被序列化:
輸入數據
由流操做生成的持久RDD
在上述兩種狀況下,使用Kryo序列化格式能夠減小CPU和內存開銷。
爲了Spark Streaming應用程序可以在集羣中穩定運行,系統應該可以以足夠的速度處理接收的數據(即處理速度應該大於或等於接收數據的速度)。這能夠經過流的網絡UI觀察獲得。批處理時間應該小於批間隔時間。
根據流計算的性質,批間隔時間可能顯著的影響數據處理速率,這個速率能夠經過應用程序維持。能夠考慮WordCountNetwork這個例子,對於一個特定的數據處理速率,系統可能能夠每2秒打印一次單詞計數(批間隔時間爲2秒),但沒法每500毫秒打印一次單詞計數。因此,爲了在生產環境中維持指望的數據處理速率,就應該設置合適的批間隔時間(即批數據的容量)。
找出正確的批容量的一個好的辦法是用一個保守的批間隔時間(5-10,秒)和低數據速率來測試你的應用程序。
在這一節,咱們重點介紹幾個強烈推薦的自定義選項,它們能夠減小Spark Streaming應用程序垃圾回收的相關暫停,得到更穩定的批處理時間。
Default persistence level of DStreams:和RDDs不一樣的是,默認的持久化級別是序列化數據到內存中(DStream是StorageLevel.MEMORY_ONLY_SER,RDD是StorageLevel.MEMORY_ONLY)。即便保存數據爲序列化形態會增長序列化/反序列化的開銷,可是能夠明顯的減小垃圾回收的暫停。
Clearing persistent RDDs:默認狀況下,經過Spark內置策略(LUR),Spark Streaming生成的持久化RDD將會從內存中清理掉。若是spark.cleaner.ttl已經設置了,比這個時間存在更老的持久化RDD將會被定時的清理掉。正如前面提到的那樣,這個值須要根據Spark Streaming應用程序的操做當心設置。然而,能夠設置配置選項spark.streaming.unpersist爲true來更智能的去持久化(unpersist)RDD。這個配置使系統找出那些不須要常常保有的RDD,而後去持久化它們。這能夠減小Spark RDD的內存使用,也可能改善垃圾回收的行爲。
Concurrent garbage collector:使用併發的標記-清除垃圾回收能夠進一步減小垃圾回收的暫停時間。儘管併發的垃圾回收會減小系統的總體吞吐量,可是仍然推薦使用它以得到更穩定的批處理時間。