SparkStreaming流處理

1、Spark Streaming的介紹

1.       流處理

流式處理(Stream Processing)。流式處理就是指源源不斷的數據流過系統時,系統可以不停地連續計算。因此流式處理沒有什麼嚴格的時間限制,數據從進入系統到出來結果多是須要一段時間。然而流式處理惟一的限制是系統長期來看的輸出速率應當快於或至少等於輸入速率。不然的話,數據豈不是會在系統中越積越多(否則數據哪去了)?如此,無論處理時是在內存、閃存仍是硬盤,遲早都會空間耗盡的。就像雪崩效應,系統愈來愈慢,數據越積越多。算法

二、spark架構

三、Spark Streaming特色

Spark Streaming是一種構建在Spark上的實時計算框架,它擴展了Spark處理大規模流式數據的能力。apache

Spark Streaming的優點在於:架構

能運行在100+的結點上,並達到秒級延遲。app

使用基於內存的Spark做爲執行引擎,具備高效和容錯的特性。框架

能集成Spark的批處理和交互查詢。socket

爲實現複雜的算法提供和批處理相似的簡單接口。函數

Spark Streaming在內部的處理機制是,接收實時流的數據,並根據必定的時間間隔拆分紅一批批的數據,而後經過Spark Engine處理這些批數據,最終獲得處理後的一批批結果數據。spa

對應的批數據,在Spark內核對應一個RDD實例,所以,對應流數據的DStream能夠當作是一組RDDs,即RDD的一個序列。通俗點理解的話,在流數據分紅一批一批後,經過一個先進先出的隊列,而後 Spark Engine從該隊列中依次取出一個個批數據,把批數據封裝成一個RDD,而後進行處理,這是一個典型的生產者消費者模型,對應的就有生產者消費者模型的問題,即如何協調生產速率和消費速率。對象

四、程序流程blog

引入頭文件

import org.apache.spark._
import org.apache.spark.streaming._

 

1.  建立StreamingContext對象 同Spark初始化須要建立SparkContext對象同樣,使用Spark Streaming就須要建立StreamingContext對象。建立StreamingContext對象所需的參數與SparkContext基本一致,包括指明Master,設定名稱(如NetworkWordCount)。須要注意的是參數Seconds(1),Spark Streaming須要指定處理數據的時間間隔,如上例所示的1s,那麼Spark Streaming會以1s爲時間窗口進行數據處理。此參數須要根據用戶的需求和集羣的處理能力進行適當的設置;

 


val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val  ssc = new StreamingContext(conf, Seconds(1))

 

 

  1. 建立InputDStream Spark Streaming須要指明數據源。如上例所示的socketTextStream,Spark Streaming以socket鏈接做爲數據源讀取數據。固然Spark Streaming支持多種不一樣的數據源,包括Kafka、 Flume、HDFS/S三、Kinesis和Twitter等數據源;

val lines = ssc.socketTextStream("10.2.5.3", 9999

 

  1. 操做DStream對於從數據源獲得的DStream,用戶能夠在其基礎上進行各類操做,如上例所示的操做就是一個典型的WordCount執行流程:對於當前時間窗口內從數據源獲得的數據首先進行分割,而後利用Map和ReduceByKey方法進行計算,固然最後還有使用print()方法輸出結果;

val words = lines.flatMap(_.split(" "))

val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()

 

  1. 啓動Spark Streaming以前所做的全部步驟只是建立了執行流程,程序沒有真正鏈接上數據源,也沒有對數據進行任何操做,只是設定好了全部的執行計劃,當ssc.start()啓動後程序才真正進行全部預期的操做。

ssc.start()

ssc.awaitTermination()

 

五、單詞統計例子

object Count
{
  def main(args: Array[String]): Unit =
  {  
 val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val  ssc = new StreamingContext(conf, Seconds(1))
    val lines = ssc.socketTextStream("10.2.5.3", 9999)
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
     wordCounts.print()
    ssc.start()            

ssc.awaitTermination() 

  }
}

2、SparkStreaming累加器及廣播變量的使用

Spark Streaming的累加器和廣播變量沒法從checkpoint恢復。若是在應用中既使用到checkpoint又使用了累加器和廣播變量的話,最好對累加器和廣播變量作懶實例化操做,這樣纔可使累加器和廣播變量在driver失敗重啓時可以從新實例化。

定義累加器

Object  DroppedWordsCounter {
  @volatile private var instance: LongAccumulator = null
  def getInstance(sc: SparkContext): LongAccumulator = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = sc.longAccumulator("WordsInBlacklistCounter")
        }
      }
    }
    instance
  }
}

 

定義廣播變量

object WordBlacklist {
  @volatile private var instance: Broadcast[Seq[String]] = null
  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    if (instance == null) {
      synchronized{
        if (instance == null) {
          val wordBlacklist = Seq("a", "b", "c")
          instance = sc.broadcast(wordBlacklist)
        }
      }
    }
    instance
  }
}

 

刪除重複的單詞

object RecoverableNetworkWordCount {
  def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String)
  : StreamingContext = {
    println("Creating new context")
    val outputFile = new File(outputPath)
    if (outputFile.exists()) outputFile.delete()
    val sparkConf = new

SparkConf().setAppName("RecoverableNetworkWordCount").setMaster("local[2]")
    val ssc = new StreamingContext( sparkConf, Seconds(120) )
    ssc.checkpoint( checkpointDirectory )
    val lines = ssc.socketTextStream(ip, port)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
    wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
    val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
    val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
    val counts = rdd.filter { case (word, count) =>
        if (blacklist.value.contains(word)) {
          droppedWordsCounter.add(count)
          false
        } else {
          true
        }
      }.collect().mkString("[", ", ", "]")
      val output = "Counts at time " + time + " " + counts
      println(output)
      println("Dropped " + droppedWordsCounter.value + " word(s) totally")
      println("Appending to " + outputFile.getAbsolutePath)
      Files.append(droppedWordsCounter.value + "\n", outputFile, Charset.defaultCharset())
    }
    ssc
  }

 

 

主函數

def main(args: Array[String]) {   val Array(ip, port, checkpointDirectory, outputPath) = Array("10.2.5.3","9999","E:\\point","E:\\out\\test.txt")   val ssc = StreamingContext.getOrCreate(checkpointDirectory,     () => createContext(ip, port.toInt, outputPath, checkpointDirectory))   ssc.start()   ssc.awaitTermination() }

相關文章
相關標籤/搜索