學習筆記:spark Streaming的入門

 spark Streaming的入門算法

   1.概述shell

     spark streaming 是spark core api的一個擴展,可實現實時數據的可擴展,高吞吐量,容錯流處理。apache

     從上圖能夠看出,數據能夠有不少來源,如kafka,flume,Twitter,HDFS/S3,Kinesis用的比較少;這些採集回來的數據可使用以高級的函數(map,reduce等)表達的複雜算法進行處理,通過sparkstreaming框架處理後的數據能夠推送到文件系統,數據板或是實時儀表板上;除此以外,咱們還能夠在數據流上應用spark的機器學習算法和圖像處理算法。api

    spark streaming簡單的我的定義:將不一樣數據源的數據通過spark Streaming框架處理以後將結果輸出到外部文件系統。app

    特色:框架

      低延遲機器學習

      能從錯誤中高效的恢復:fault-tolerantsocket

      可以運行在成百上千的節點上函數

      能將批處理、機器學習、圖計算等子框架和spark streaming綜合起來使用oop

   2.應用場景:

     實時反映電子設備實時監測

     交易過程當中實時的金融欺詐

     電商行業的推薦信息

   3.集成spark生態系統的使用

     

     spark SQL、spark streaming、MLlib和GraphX都是基於spark core的擴展和開發,那它們是如何進行交互的?(後期補充)

   4.spark的發展史

    

      

   5.從詞頻統計功能着手Spark Streaming入門

    • spark-submit執行(開發)
      package org.apache.spark.examples.streaming import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. * * Usage: NetworkWordCount <hostname> <port> * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example * `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999` */
      object NetworkWordCount { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: NetworkWordCount <hostname> <port>") System.exit(1) } StreamingExamples.setStreamingLogLevels() // Create the context with a 1 second batch size
          val sparkConf = new SparkConf().setAppName("NetworkWordCount") val ssc = new StreamingContext(sparkConf, Seconds(1)) // Create a socket stream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') // Note that no duplication in storage level only for running locally. // Replication necessary in distributed scenario for fault tolerance.
          val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } } 使用spark-submit方式提交的命令以下(不懂看代碼前面的解析): ./spark-submit --master local[2] --class org.apache.spark.examples.streaming.NetworkWordCount --name NetworkWordCount /home/hadoop/app/spark/eaxmple/jars/spark-example_2.11-2.2.20.jar  hadoop0000  9999

       

    • spark-shell執行(測試)
      val ssc = new StreamingContext(sparkConf, Seconds(1)) val lines = ssc.socketTextStream("hadoop000", 9999) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination()

      只須要運行./spark-shell --master loacal[2],以後直接把代碼拷貝上去運行便可。

   6.工做原理

     粗粒度:spark streaming接受實時數據流,把數據按照指定的時間段切成一片片小的數據塊(spark streaming把每一個小的數據塊當成RDD來處理),而後把這些數據塊傳給Spark Engine處理,處理完以後的結果也是分批次的返回。

        

       細粒度:application中有兩個context,SparkContext和StreamingContext,使用receiver來接收數據。run receivers as taskes去executor上請求數據,當executor接收到數據後會將數據按時間段進行切分並存放在內存中,如設置了多副本將會拷貝到其餘的Exceutor上進行數據的備份(replicate blocks), exceutor的receiver會將blocks的信息告訴StreamingContext, 每到指定的週期 StreamingContext 將會通知SparkContext啓動jobs並把這些jobs分發到exceutor上執行。

相關文章
相關標籤/搜索