Spark學習之Spark Streaming

Spark Streaming實時數據流處理

1、Spark Streaming基礎

1Spark Streaming簡介
http://spark.apache.org/docs/latest/streaming-programming-guide.html

Spark Streaming是核心Spark API的擴展,可實現可擴展、高吞吐量、可容錯的實時數據流處理。數據能夠從諸如KafkaFlumeKinesisTCP套接字等衆多來源獲取,而且可使用由高級函數(如mapreducejoinwindow)開發的複雜算法進行流數據處理。最後,處理後的數據能夠被推送到文件系統,數據庫和實時儀表板。並且,您還能夠在數據流上應用Spark提供的機器學習和圖處理算法。
html

2Spark Streaming的特色

便於使用算法

 

經過高級操做員構建應用程序。sql

 

Spark Streaming將Apache Spark的 語言集成API 引入流處理,使您能夠像編寫批處理做業同樣編寫流式做業。它支持Java,Scala和Python。數據庫

容錯apache

開箱即用的有狀態的一次性語義。設計模式

Spark Streaming能夠開箱即用,恢復丟失的工做和操做員狀態(例如滑動窗口),而無需任何額外的代碼。

緩存

Spark集成性能優化

將流式傳輸與批量和交互式查詢相結合。服務器

經過在Spark上運行,Spark Streaming容許您重複使用相同的代碼進行批處理,將流加入歷史數據,或者在流狀態下運行即席查詢。構建強大的交互式應用程序,而不僅是分析。網絡


3Spark Streaming的內部結構

在內部,它的工做原理以下。Spark Streaming接收實時輸入數據流,並將數據切分紅批,而後由Spark引擎對其進行處理,最後生成「批」形式的結果流。

Spark Streaming將連續的數據流抽象爲discretizedstreamDStream。在內部,DStream 由一個RDD序列表示。

4、第一個小案例:NetworkWordCount

1)因爲在本案例中須要使用netcat網絡工具,因此須要先安裝。 

  rpm -iUv ~/netcat-0.6.1-1.i386.rpm

 

2)啓動netcat數據流服務器,並監聽端口:1234

  命令:nc -l -p 9999

  服務器端:

3)啓動客戶端

bin/run-example streaming.NetworkWordCount localhost 1234

客戶端:
 

(必定注意):若是要執行本例,必須確保機器cpu核數大於 2

5、開發本身的NetworkWordCount
 

(必定注意):

val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")

 

官方的解釋:

2、Spark Streaming進階

1StreamingContext對象詳解

  初始化StreamingContext

  方式一:從SparkConf對象中建立
 

  從一個現有的SparkContext實例中建立
 

 程序中的幾點說明:

  appName參數是應用程序在集羣UI上顯示的名稱。

  masterSparkMesosYARN集羣的URL,或者一個特殊的「local [*]」字符串來讓程序以本地模式運行。

  當在集羣上運行程序時,不須要在程序中硬編碼master參數,而是使用spark-submit提交應用程序並將masterURL以腳本參數的形式傳入。可是,對於本地測試和單元測試,您能夠經過「local[*]」來運行Spark Streaming程序(請確保本地系統中的cpu核心數夠用)。

  StreamingContext會內在的建立一個SparkContext的實例(全部Spark功能的起始點),你能夠經過ssc.sparkContext訪問到這個實例。

  批處理的時間窗口長度必須根據應用程序的延遲要求和可用的集羣資源進行設置。

 

 請務必記住如下幾點:

  一旦一個StreamingContextt開始運做,就不能設置或添加新的流計算。

  一旦一個上下文被中止,它將沒法從新啓動。

  同一時刻,一個JVM中只能有一個StreamingContext處於活動狀態。

  StreamingContext上的stop()方法也會中止SparkContext。 要僅中止StreamingContext(保持SparkContext活躍),請將stop() 方法的可選參數stopSparkContext設置爲false

  只要前一個StreamingContext在下一個StreamingContext被建立以前中止(不中止SparkContext),SparkContext就能夠被重用來建立多個StreamingContext

2、離散流(DStreams):Discretized Streams

l DiscretizedStreamDStream Spark Streaming對流式數據的基本抽象。它表示連續的數據流,這些連續的數據流能夠是從數據源接收的輸入數據流,也能夠是經過對輸入數據流執行轉換操做而生成的經處理的數據流。在內部,DStream由一系列連續的RDD表示,以下圖:

舉例分析:在以前的NetworkWordCount的例子中,咱們將一行行文本組成的流轉換爲單詞流,具體作法爲:將flatMap操做應用於名爲linesDStream中的每一個RDD上,以生成words DStreamRDD。以下圖所示:

可是DStream和RDD也有區別,下面畫圖說明:

 

3DStream中的轉換操做(transformation
 

最後兩個transformation算子須要重點介紹一下: 

1.transform(func)

  經過RDD-to-RDD函數做用於源DStream中的各個RDD,能夠是任意的RDD操做,從而返回一個新的RDD

  舉例:在NetworkWordCount中,也可使用transform來生成元組對

2.updateStateByKey(func)

  操做容許不斷用新信息更新它的同時保持任意狀態。

    定義狀態-狀態能夠是任何的數據類型

    定義狀態更新函數-怎樣利用更新前的狀態和從輸入流裏面獲取的新值更新狀態

  重寫NetworkWordCount程序,累計每一個單詞出現的頻率(注意:累計)

package demo

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

object MyTotalNetworkWordCount {
  def main(args: Array[String]): Unit = {
    //建立一個Context對象: StreamingContext  (SparkContext, SQLContext)
    //指定批處理的時間間隔
    val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(conf,Seconds(5))

    //設置檢查點
    ssc.checkpoint("hdfs://192.168.157.11:9000/spark/checkpoint")

    //建立一個DStream,處理數據
    val lines  = ssc.socketTextStream("192.168.157.81",7788,StorageLevel.MEMORY_AND_DISK_SER)

    //執行wordcount
    val words = lines.flatMap(_.split(" "))

    //定義函數用於累計每一個單詞的總頻率
    val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {
      //經過Spark內部的reduceByKey按key規約,而後這裏傳入某key當前批次的Seq/List,再計算當前批次的總和
      val currentCount = currValues.sum
      // 已累加的值
      val previousCount = prevValueState.getOrElse(0)
      // 返回累加後的結果,是一個Option[Int]類型
      Some(currentCount + previousCount)
    }

    val pairs = words.map(word => (word, 1))

    val totalWordCounts = pairs.updateStateByKey[Int](addFunc)
    totalWordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}
View Code 

  輸出結果:

注意:若是在IDEA中,不想輸出log4j的日誌信息,能夠將log4j.properties文件(放在src的目錄下)的第一行改成:

log4j.rootCategory=ERROR, console

4、窗口操做

Spark Streaming還提供了窗口計算功能,容許您在數據的滑動窗口上應用轉換操做。下圖說明了滑動窗口的工做方式:

如圖所示,每當窗口滑過originalDStream時,落在窗口內的源RDD被組合並被執行操做以產生windowed DStreamRDD。在上面的例子中,操做應用於最近3個時間單位的數據,並以2個時間單位滑動。這代表任何窗口操做都須要指定兩個參數。

  窗口長度(windowlength- 窗口的時間長度(上圖的示例中爲:3)。

  滑動間隔(slidinginterval- 兩次相鄰的窗口操做的間隔(即每次滑動的時間長度)(上圖示例中爲:2)。

這兩個參數必須是源DStream的批間隔的倍數(上圖示例中爲:1)。

 

咱們以一個例子來講明窗口操做。 假設您但願對以前的單詞計數的示例進行擴展,每10秒鐘對過去30秒的數據進行wordcount。爲此,咱們必須在最近30秒的pairs DStream數據中對(word, 1)鍵值對應用reduceByKey操做。這是經過使用reduceByKeyAndWindow操做完成的。

 

 

 

 

一些常見的窗口操做以下表所示。全部這些操做都用到了上述兩個參數 - windowLengthslideInterval

  window(windowLength, slideInterval)

    基於源DStream產生的窗口化的批數據計算一個新的DStream

 

  countByWindow(windowLength, slideInterval)

    返回流中元素的一個滑動窗口數

 

  reduceByWindow(func, windowLength, slideInterval)

    返回一個單元素流。利用函數func彙集滑動時間間隔的流的元素建立這個單元素流。函數必須是相關聯的以使計算可以正確的並行計算。

 

 

  reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])

    應用到一個(K,V)對組成的DStream上,返回一個由(K,V)對組成的新的DStream。每個key的值均由給定的reduce函數彙集起來。注意:在默認狀況下,這個算子利用了Spark默認的併發任務數去分組。你能夠用numTasks參數設置不一樣的任務數

 

  reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

    上述reduceByKeyAndWindow() 的更高效的版本,其中使用前一窗口的reduce計算結果遞增地計算每一個窗口的reduce值。這是經過對進入滑動窗口的新數據進行reduce操做,以及「逆減(inverse reducing)」離開窗口的舊數據來完成的。一個例子是當窗口滑動時對鍵對應的值進行「一加一減」操做。可是,它僅適用於「可逆減函數(invertible reduce functions)」,即具備相應「反減」功能的減函數(做爲參數invFunc)。 像reduceByKeyAndWindow同樣,經過可選參數能夠配置reduce任務的數量。 請注意,使用此操做必須啓用檢查點。

 

  countByValueAndWindow(windowLength, slideInterval, [numTasks])

    應用到一個(K,V)對組成的DStream上,返回一個由(K,V)對組成的新的DStream。每一個key的值都是它們在滑動窗口中出現的頻率。

5、輸入DStreams和接收器

輸入DStreams表示從數據源獲取輸入數據流的DStreams。在NetworkWordCount例子中,lines表示輸入DStream,它表明從netcat服務器獲取的數據流。每個輸入流DStream和一個Receiver對象相關聯,這個Receiver從源中獲取數據,並將數據存入內存中用於處理。

輸入DStreams表示從數據源獲取的原始數據流。Spark Streaming擁有兩類數據源:

  基本源(Basic sources):這些源在StreamingContext API中直接可用。例如文件系統、套接字鏈接、Akkaactor

  高級源(Advanced sources):這些源包括Kafka,Flume,Kinesis,Twitter等等。

 

下面經過具體的案例,詳細說明:

 

  • 文件流:經過監控文件系統的變化,如有新文件添加,則將它讀入並做爲數據流

須要注意的是:

① 這些文件具備相同的格式

② 這些文件經過原子移動或重命名文件的方式在dataDirectory建立

③ 若是在文件中追加內容,這些追加的新數據也不會被讀取。

注意:要演示成功,須要在原文件中編輯,而後拷貝一份。

  RDD隊列流

使用streamingContext.queueStream(queueOfRDD)建立基於RDD隊列的DStream,用於調試Spark Streaming應用程序。

package demo

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object RDDQueueStream {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("queueStream")
    //每1秒對數據進行處理
    val ssc = new StreamingContext(conf,Seconds(1))

    //建立一個可以push到QueueInputDStream的RDDs隊列
    val rddQueue = new mutable.Queue[RDD[Int]]()

    //基於一個RDD隊列建立一個輸入源
    val inputStream = ssc.queueStream(rddQueue)

    //將接收到的數據乘以10
    val mappedStream = inputStream.map(x => (x,x*10))
    mappedStream.print()

    ssc.start()

    for(i <- 1 to 3){
      rddQueue += ssc.sparkContext.makeRDD(1 to 10)   //建立RDD,並分配兩個核數
      Thread.sleep(1000)
    }
    ssc.stop()
  }
}
View Code

 

  套接字流:經過監聽Socket端口來接收數據

package demo
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

object ScoketStreaming {

  def main(args: Array[String]) {
    //建立一個本地的StreamingContext,含2個工做線程
    val conf = new SparkConf().setMaster("local[4]").setAppName("ScoketStreaming")
    val sc = new StreamingContext(conf,Seconds(5))   //每隔10秒統計一次字符總數

    //建立珍一個DStream,鏈接127.0.0.1 :7788
    val lines = sc.socketTextStream("127.0.0.1",7788)
    //打印數據
    lines.print()

    sc.start()         //開始計算
    sc.awaitTermination()   //經過手動終止計算,不然一直運行下去
  }
}
View Code  

6DStreams的輸出操做

輸出操做容許DStream的操做推到如數據庫、文件系統等外部系統中。由於輸出操做其實是容許外部系統消費轉換後的數據,它們觸發的實際操做是DStream轉換。目前,定義了下面幾種輸出操做:
  

foreachRDD的設計模式

DStream.foreachRDD是一個強大的原語,發送數據到外部系統中。

  第一步:建立鏈接,將數據寫入外部數據庫(使用以前的NetworkWordCount,改寫以前輸出結果的部分,以下)

  •  

    出現如下Exception

    緣由是:Connection對象不是一個可被序列化的對象,不能RDD的每一個Worker上運行;即:Connection不能在RDD分佈式環境中的每一個分區上運行,由於不一樣的分區可能運行在不一樣的Worker上。因此須要在每一個RDD分區上單首創建Connection對象。

   第二步:在每一個RDD分區上單首創建Connection對象,以下:

View Code

 

7DataFrameSQL操做

咱們能夠很方便地使用DataFramesSQL操做來處理流數據。您必須使用當前的StreamingContext對應的SparkContext建立一個SparkSession。此外,必須這樣作的另外一個緣由是使得應用能夠在driver程序故障時得以從新啓動,這是經過建立一個能夠延遲實例化的單例SparkSession來實現的。

在下面的示例中,咱們使用DataFramesSQL來修改以前的wordcount示例並對單詞進行計數。咱們將每一個RDD轉換爲DataFrame,並註冊爲臨時表,而後在這張表上執行SQL查詢。

package demo

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

object MyNetworkWordCountDataFrame {
  def main(args: Array[String]): Unit = {
    //建立一個Context對象: StreamingContext  (SparkContext, SQLContext)
    //指定批處理的時間間隔
    val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(5))

    //建立一個DStream,處理數據
    val lines = ssc.socketTextStream("192.168.157.81", 7788, StorageLevel.MEMORY_AND_DISK_SER)

    //執行wordcount
    val words = lines.flatMap(_.split(" "))

    //使用Spark SQL來查詢Spark Streaming處理的數據
    words.foreachRDD { rdd =>
      //使用單列模式,建立SparkSession對象
      val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()

      import spark.implicits._
      // 將RDD[String]轉換爲DataFrame
      val wordsDataFrame = rdd.toDF("word")

      // 建立臨時視圖
      wordsDataFrame.createOrReplaceTempView("words")

      // 執行SQL
      val wordCountsDataFrame =   spark.sql("select word, count(*) as total from words group by word")
      wordCountsDataFrame.show()
    }

    //啓動StreamingContext
    ssc.start()

    //等待計算完成
    ssc.awaitTermination()
  }
}
View Code 

8、緩存/持久化

RDD相似,DStreams還容許開發人員將流數據保留在內存中。也就是說,在DStream上調用persist() 方法會自動將該DStream的每一個RDD保留在內存中。若是DStream中的數據將被屢次計算(例如,相同數據上執行多個操做),這個操做就會頗有用。對於基於窗口的操做,如reduceByWindowreduceByKeyAndWindow以及基於狀態的操做,如updateStateByKey,數據會默認進行持久化。 所以,基於窗口的操做生成的DStream會自動保存在內存中,而不須要開發人員調用persist()

 

對於經過網絡接收數據(例如KafkaFlumesockets等)的輸入流,默認持久化級別被設置爲將數據複製到兩個節點進行容錯。

請注意,與RDD不一樣,DStreams的默認持久化級別將數據序列化保存在內存中。

9、檢查點支持

流數據處理程序一般都是全天候運行,所以必須對應用中邏輯無關的故障(例如,系統故障,JVM崩潰等)具備彈性。爲了實現這一特性,Spark Streaming須要checkpoint足夠的信息到容錯存儲系統,以即可以從故障中恢復。

 

① 通常會對兩種類型的數據使用檢查點:

1) 元數據檢查點(Metadatacheckpointing - 將定義流計算的信息保存到容錯存儲中(如HDFS)。這用於從運行streaming程序的driver程序的節點的故障中恢復。元數據包括如下幾種:

配置(Configuration- 用於建立streaming應用程序的配置信息。

l DStream操做(DStream operations- 定義streaming應用程序的DStream操做集合。

不完整的batchIncomplete batches- jobs還在隊列中但還沒有完成的batch

 

2) 數據檢查點(Datacheckpointing- 將生成的RDD保存到可靠的存儲層。對於一些須要將多個批次之間的數據進行組合的stateful變換操做,設置數據檢查點是必需的。在這些轉換操做中,當前生成的RDD依賴於先前批次的RDD,這致使依賴鏈的長度隨時間而不斷增長,由此也會致使基於血統機制的恢復時間無限增長。爲了不這種狀況,stateful轉換的中間RDD將按期設置檢查點並保存到到可靠的存儲層(例如HDFS)以切斷依賴關係鏈。

 

總而言之,元數據檢查點主要用於從driver程序故障中恢復,而數據或RDD檢查點在任何使用stateful轉換時是必需要有的。

 

② 什麼時候啓用檢查點:

對於具備如下任一要求的應用程序,必須啓用檢查點:

1) 使用狀態轉:若是在應用程序中使用updateStateByKeyreduceByKeyAndWindow(具備逆函數),則必須提供檢查點目錄以容許按期保存RDD檢查點。

2) 從運行應用程序的driver程序的故障中恢復:元數據檢查點用於使用進度信息進行恢復。

 

③ 如何配置檢查點:

能夠經過在一些可容錯、高可靠的文件系統(例如,HDFSS3等)中設置保存檢查點信息的目錄來啓用檢查點。這是經過使用streamingContext.checkpoint(checkpointDirectory)完成的。設置檢查點後,您就可使用上述的有狀態轉換操做。此外,若是要使應用程序從驅動程序故障中恢復,您應該重寫streaming應用程序以使程序具備如下行爲:

1) 當程序第一次啓動時,它將建立一個新的StreamingContext,設置好全部流數據源,而後調用start()方法。

2) 當程序在失敗後從新啓動時,它將從checkpoint目錄中的檢查點數據從新建立一個StreamingContext

使用StreamingContext.getOrCreate能夠簡化此行爲

④ 改寫以前的WordCount程序,使得每次計算的結果和狀態都保存到檢查點目錄下

package demo

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

object MyCheckpointNetworkWordCount {
  def main(args: Array[String]): Unit = {
    //在主程序中,建立一個Streaming Context對象
    //一、讀取一個檢查點的目錄
    //二、若是該目錄下已經存有以前的檢查點信息,從已有的信息上建立這個Streaming Context對象
    //三、若是該目錄下沒有信息,建立一個新的Streaming Context
    val context = StreamingContext.getOrCreate("hdfs://192.168.157.111:9000/spark_checkpoint",createStreamingContext)

    //啓動任務
    context.start()
    context.awaitTermination()
  }

  //建立一個StreamingContext對象,而且設置檢查點目錄,執行WordCount程序(記錄以前的狀態信息)
  def createStreamingContext():StreamingContext = {
    val conf = new SparkConf().setAppName("MyCheckpointNetworkWordCount").setMaster("local[2]")
    //建立這個StreamingContext對象
    val ssc = new StreamingContext(conf,Seconds(3))

    //設置檢查點目錄
    ssc.checkpoint("hdfs://192.168.157.111:9000/spark_checkpoint")

    //建立一個DStream,執行WordCount
    val lines = ssc.socketTextStream("192.168.157.81",7788,StorageLevel.MEMORY_AND_DISK_SER)

    //分詞操做
    val words = lines.flatMap(_.split(" "))
    //每一個單詞記一次數
    val wordPair = words.map(x=> (x,1))

    //執行單詞計數
    //定義一個新的函數:把當前的值跟以前的結果進行一個累加
    val addFunc = (currValues:Seq[Int],preValueState:Option[Int]) => {
      //當前當前批次的值
      val currentCount = currValues.sum

      //獲得已經累加的值。若是是第一次求和,以前沒有數值,從0開始計數
      val preValueCount = preValueState.getOrElse(0)

      //進行累加,而後累加後結果,是Option[Int]
      Some(currentCount + preValueCount)
    }

    //要把新的單詞個數跟以前的結果進行疊加(累計)
    val totalCount = wordPair.updateStateByKey[Int](addFunc)

    //輸出結果
    totalCount.print()

    //返回這個對象
    ssc
  }
}
View Code  

經過查看HDFS中的信息,能夠看到相關的檢查點信息,以下:

 

3、高級數據源

1Spark Streaming接收Flume數據

  基於FlumePush模式

  Flume被用於在Flume agents之間推送數據.在這種方式下,Spark Streaming能夠很方便的創建一個receiver,起到一個Avro agent的做用.Flume能夠將數據推送到改receiver.

  • 第一步:Flume的配置文件
    #bin/flume-ng agent -n a4 -f myagent/a4.conf -c conf -Dflume.root.logger=INFO,console
    #定義agent名, source、channel、sink的名稱
    a4.sources = r1
    a4.channels = c1
    a4.sinks = k1
    
    #具體定義source
    a4.sources.r1.type = spooldir
    a4.sources.r1.spoolDir = /root/training/logs
    
    #具體定義channel
    a4.channels.c1.type = memory
    a4.channels.c1.capacity = 10000
    a4.channels.c1.transactionCapacity = 100
    
    #具體定義sink
    a4.sinks = k1
    a4.sinks.k1.type = avro
    a4.sinks.k1.channel = c1
    a4.sinks.k1.hostname = 192.168.157.1
    a4.sinks.k1.port = 1234
    
    #組裝source、channel、sink
    a4.sources.r1.channels = c1
    a4.sinks.k1.channel = c1
    View Code 
  • 第二步:Spark Streaming程序
    package demo
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.flume.FlumeUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object MyFlumeStream {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(5))
    
        //建立FlumeEvent的DStream
        val flumeEvent = FlumeUtils.createStream(ssc,"192.168.157.1",1234)
    
        //將FlumeEvent中的事件轉成字符串
        val lineDStream = flumeEvent.map( e => {
           new String(e.event.getBody.array)
        })
    
        //輸出結果
        lineDStream.print()
    
        ssc.start()
        ssc.awaitTermination();
      }
    }
    View Code 
  • 第三步:注意除了須要使用Flumelibjar包之外,還須要如下jar包:
    spark-streaming-flume_2.10-2.1.0.jar
     
  • 第四步:測試

    啓動Spark Streaming程序

    啓動Flume

    拷貝日誌文件到/root/training/logs目錄

    觀察輸出,採集到數據
    

基於Custom SinkPull模式

  不一樣於Flume直接將數據推送到Spark Streaming中,第二種模式經過如下條件運行一個正常的Flume sinkFlume將數據推送到sink中,而且數據保持buffered狀態。Spark Streaming使用一個可靠的Flume接收器和轉換器從sink拉取數據。只要當數據被接收而且被Spark Streaming備份後,轉換器才運行成功。

  這樣,與第一種模式相比,保證了很好的健壯性和容錯能力。然而,這種模式須要爲Flume配置一個正常的sink

  如下爲配置步驟:

  • 第一步:Flume的配置文件
    #bin/flume-ng agent -n a1 -f myagent/a1.conf -c conf -Dflume.root.logger=INFO,console
    a1.channels = c1
    a1.sinks = k1
    a1.sources = r1
    
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /root/training/logs
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 100000
    a1.channels.c1.transactionCapacity = 100000
    
    a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
    a1.sinks.k1.channel = c1
    a1.sinks.k1.hostname = 192.168.157.81
    a1.sinks.k1.port = 1234
    
    #組裝source、channel、sink
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    View Code 
  • 第二步:Spark Streaming程序
    package demo
    
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.flume.FlumeUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object FlumeLogPull {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(10))
    
        //建立FlumeEvent的DStream
        val flumeEvent = FlumeUtils.createPollingStream(ssc,"192.168.157.81",1234,StorageLevel.MEMORY_ONLY_SER_2)
    
        //將FlumeEvent中的事件轉成字符串
        val lineDStream = flumeEvent.map( e => {
          new String(e.event.getBody.array)
        })
    
        //輸出結果
        lineDStream.print()
    
        ssc.start()
        ssc.awaitTermination();
      }
    }
    View Code 
  • 第三步:須要的jar

    spark-streaming-flume-sink_2.10-2.1.0.jar   

    將Sparkjar包拷貝到Flumelib目錄下

    下面的這個jar包也須要拷貝到Flumelib目錄下,同時加入IDEA工程的classpath 

  • 第四步:測試

    啓動Flume

    在IDEA中啓動FlumeLogPull

    將測試數據拷貝到/root/training/logs

    觀察IDEA中的輸出

   

2Spark Streaming接收Kafka數據

Apache Kafka是一種高吞吐量的分佈式發佈訂閱消息系統。

搭建ZooKeeperStandalone):

*)配置/root/training/zookeeper-3.4.10/conf/zoo.cfg文件

dataDir=/root/training/zookeeper-3.4.10/tmp

server.1=spark81:2888:3888

*)在/root/training/zookeeper-3.4.10/tmp目錄下建立一個myid的空文件

echo 1 > /root/training/zookeeper-3.4.6/tmp/myid

搭建Kafka環境(單機單broker):

*)修改server.properties文件
 

*)啓動Kafka

bin/kafka-server-start.sh config/server.properties &

 出現如下錯誤:

     須要修改bin/kafka-run-class.sh文件,將這個選項註釋掉。

*)測試Kafka

  建立Topic

bin/kafka-topics.sh --create --zookeeper spark81:2181 -replication-factor 1 --partitions 3 --topic mydemo1

 

  發送消息

bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1

 

  接收消息

bin/kafka-console-consumer.sh --zookeeper spark81:2181 --topic mydemo1

 

搭建Spark StreamingKafka的集成開發環境

因爲Spark StreamingKafka集成的時候,依賴的jar包比較多,並且還會產生衝突。強烈建議使用Maven的方式來搭建項目工程

下面是依賴的pom.xml文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>ZDemo5</groupId>
    <artifactId>ZDemo5</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <spark.version>2.1.0</spark.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
    </dependencies>

</project>
View Code

基於Receiver的方式

這個方法使用了Receivers來接收數據。Receivers的實現使用到Kafka高層次的消費者API。對於全部的Receivers,接收到的數據將會保存在Spark executors中,而後由Spark Streaming啓動的Job來處理這些數據。
 

  • 開發Spark StreamingKafka Receivers
    package demo
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object KafkaWordCount {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(10))
    
        //建立topic名稱,1表示一次從這個topic中獲取一條記錄
        val topics = Map("mydemo1" ->1)
    
        //建立Kafka的輸入流,指定ZooKeeper的地址
        val kafkaStream = KafkaUtils.createStream(ssc,"192.168.157.81:2181","mygroup",topics)
    
        //處理每次接收到的數據
        val lineDStream = kafkaStream.map(e => {
          new String(e.toString())
        })
        //輸出結果
        lineDStream.print()
    
        ssc.start()
        ssc.awaitTermination();
      }
    }
    View Code
  • 測試

  啓動Kafka消息的生產者

    bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1 

  在IDEA中啓動任務,接收Kafka消息

直接讀取方式

和基於Receiver接收數據不同,這種方式按期地從Kafkatopic+partition中查詢最新的偏移量,再根據定義的偏移量範圍在每一個batch裏面處理數據。看成業須要處理的數據來臨時,spark經過調用Kafka的簡單消費者API讀取必定範圍的數據。

  • 開發Spark Streaming的程序
    package demo
    
    import kafka.serializer.StringDecoder
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka.KafkaUtils
    
    object DirectKafkaWordCount {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(10))
    
        //建立topic名稱,1表示一次從這個topic中獲取一條記錄
        val topics = Set("mydemo1")
        //指定Kafka的broker地址
        val kafkaParams = Map[String, String]("metadata.broker.list" -> "192.168.157.81:9092")
    
        //建立DStream,接收Kafka的數據
        val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    
        //處理每次接收到的數據
        val lineDStream = kafkaStream.map(e => {
          new String(e.toString())
        })
        //輸出結果
        lineDStream.print()
    
        ssc.start()
        ssc.awaitTermination();
      }
    }
    View Code  
  • 測試

  啓動Kafka消息的生產者

  bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1

 

  在IDEA中啓動任務,接收Kafka消息

4、性能優化

1、減小批數據的執行時間

Spark中有幾個優化能夠減小批處理的時間:

① 數據接收的並行水平

經過網絡(kafkaflumesocket)接收數據須要這些數據反序列化並被保存到Spark中。若是數據接收成爲系統的瓶頸,就要考慮並行地接收數據。注意,每一個輸入DStream建立一個receiver(運行在worker機器上)接收單個數據流。建立多個輸入DStream並配置它們能夠從源中接收不一樣分區的數據流,從而實現多數據流接收。例如,接收兩個topic數據的單個輸入DStream能夠被切分爲兩個kafka輸入流,每一個接收一個topic。這將在兩個worker上運行兩個receiver,所以容許數據並行接收,提升總體的吞吐量。多個DStream能夠被合併生成單個DStream,這樣運用在單個輸入DStreamtransformation操做能夠運用在合併的DStream上。

② 數據處理的並行水平

若是運行在計算stage上的併發任務數不足夠大,就不會充分利用集羣的資源。默認的併發任務數經過配置屬性來肯定spark.default.parallelism

③ 數據序列化

能夠經過改變序列化格式來減小數據序列化的開銷。在流式傳輸的狀況下,有兩種類型的數據會被序列化:

  輸入數據

  由流操做生成的持久RDD

在上述兩種狀況下,使用Kryo序列化格式能夠減小CPU和內存開銷。

2、設置正確的批容量

爲了Spark Streaming應用程序可以在集羣中穩定運行,系統應該可以以足夠的速度處理接收的數據(即處理速度應該大於或等於接收數據的速度)。這能夠經過流的網絡UI觀察獲得。批處理時間應該小於批間隔時間。

 

根據流計算的性質,批間隔時間可能顯著的影響數據處理速率,這個速率能夠經過應用程序維持。能夠考慮WordCountNetwork這個例子,對於一個特定的數據處理速率,系統可能能夠每2秒打印一次單詞計數(批間隔時間爲2秒),但沒法每500毫秒打印一次單詞計數。因此,爲了在生產環境中維持指望的數據處理速率,就應該設置合適的批間隔時間(即批數據的容量)

 

找出正確的批容量的一個好的辦法是用一個保守的批間隔時間(5-10,秒)和低數據速率來測試你的應用程序。

3、內存調優

在這一節,咱們重點介紹幾個強烈推薦的自定義選項,它們能夠減小Spark Streaming應用程序垃圾回收的相關暫停,得到更穩定的批處理時間。

  Default persistence level of DStreamsRDDs不一樣的是,默認的持久化級別是序列化數據到內存中(DStreamStorageLevel.MEMORY_ONLY_SERRDDStorageLevel.MEMORY_ONLY)。即便保存數據爲序列化形態會增長序列化/反序列化的開銷,可是能夠明顯的減小垃圾回收的暫停。

 

  Clearing persistent RDDs默認狀況下,經過Spark內置策略(LUR),Spark Streaming生成的持久化RDD將會從內存中清理掉。若是spark.cleaner.ttl已經設置了,比這個時間存在更老的持久化RDD將會被定時的清理掉。正如前面提到的那樣,這個值須要根據Spark Streaming應用程序的操做當心設置。然而,能夠設置配置選項spark.streaming.unpersisttrue來更智能的去持久化(unpersistRDD。這個配置使系統找出那些不須要常常保有的RDD,而後去持久化它們。這能夠減小Spark RDD的內存使用,也可能改善垃圾回收的行爲。

 

  Concurrent garbage collector使用併發的標記-清除垃圾回收能夠進一步減小垃圾回收的暫停時間。儘管併發的垃圾回收會減小系統的總體吞吐量,可是仍然推薦使用它以得到更穩定的批處理時間。

相關文章
相關標籤/搜索