編程模型:數據處理層

Basic相關API算法

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

/**
  * WordCount程序,Spark Streaming消費TCP Server發過來的實時數據的例子:
  *
  * 一、在master服務器上啓動一個Netcat server
  * `$ nc -lk 9998` (若是nc命令無效的話,咱們能夠用yum install -y nc來安裝nc)
  *
  * 二、用下面的命令在在集羣中將Spark Streaming應用跑起來
   spark-submit --class com.dev.streaming.NetworkWordCount \
   --master spark://master:7077 \
   --deploy-mode client \
   --driver-memory 512m \
   --executor-memory 512m \
   --total-executor-cores 4 \
   --executor-cores 2 \
   /home/hadoop-dev/spark-course/streaming/spark-streaming-basic-1.0-SNAPSHOT.jar

  spark-shell --master spark://master:7077 --total-executor-cores 4 --executor-cores 2
  */
object BasicAPITest {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val sc = new SparkContext(sparkConf)

    // StreamingContext 編程入口
    val ssc = new StreamingContext(sc, Seconds(1))

    //數據接收器(Receiver)
    //建立一個接收器(ReceiverInputDStream),這個接收器接收一臺機器上的某個端口經過socket發送過來的數據並處理
    val lines = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER)

    //數據處理(Process)
    //處理的邏輯,就是簡單的進行word count
    val words = lines.flatMap(_.split(" ")).filter(_.contains("exception"))
    val wordPairs = words.map(x => (x, 1))
    //  reduceByKey((a: Int, b: Int) => a + b, new HashPartitioner(10)   指定suffer後分區數量和分區算法(默認是HashPartitioner)
    val wordCounts = wordPairs.repartition(100).reduceByKey((a: Int, b: Int) => a + b, new HashPartitioner(10))

    //結果輸出(Output)
    //將結果輸出到控制檯
    wordCounts.print()

    //啓動Streaming處理流
    ssc.start()

    //等待Streaming程序終止
    ssc.awaitTermination()
  }
}

  Join相關APIshell

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by tangweiqun on 2018/1/6.
  */
object JoinAPITest {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val sc = new SparkContext(sparkConf)

    // Create the context with a 5 second batch size
    val ssc = new StreamingContext(sc, Seconds(5))

    val lines1 = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER)
    val kvs1 = lines1.map { line =>
      val arr = line.split(" ")
      (arr(0), arr(1))
    }


    val lines2 = ssc.socketTextStream("master", 9997, StorageLevel.MEMORY_AND_DISK_SER)
    val kvs2 = lines2.map { line =>
      val arr = line.split(" ")
      (arr(0), arr(1))
    }

    kvs1.join(kvs2).print()
    kvs1.fullOuterJoin(kvs2).print()
    kvs1.leftOuterJoin(kvs2).print()
    kvs1.rightOuterJoin(kvs2).print()

    //啓動Streaming處理流
    ssc.start()

    ssc.stop(false)

    //等待Streaming程序終止
    ssc.awaitTermination()
  }
}

  TransformAPIapache

import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by tangweiqun on 2018/1/6.
  */
object TransformAPITest {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val sc = new SparkContext(sparkConf)

    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sc, Seconds(5))

    val lines1 = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER)
    val kvs1 = lines1.map { line =>
      val arr = line.split(" ")
      (arr(0), arr(1))
    }
    ///  實時數據
    val path = "hdfs://master:9999/user/hadoop-twq/spark-course/streaming/keyvalue.txt"
    val keyvalueRDD =
      sc.textFile(path).map { line =>
        val arr = line.split(" ")
        (arr(0), arr(1))
      }
    ///  靜態數據
    kvs1.transform { rdd =>
      rdd.join(keyvalueRDD)
    } print()

    //啓動Streaming處理流
    ssc.start()

    ssc.stop(false)


    val lines2 = ssc.socketTextStream("master", 9997, StorageLevel.MEMORY_AND_DISK_SER)
    val kvs2 = lines2.map { line =>
      val arr = line.split(" ")
      (arr(0), arr(1))
    }
    //(將實時數據與靜態數據相關聯)
    kvs1.transformWith(kvs2, (rdd1: RDD[(String, String)], rdd2: RDD[(String, String)]) => rdd1.join(rdd2))

    //等待Streaming程序終止
    ssc.awaitTermination()
  }
}

  WindowAPI編程

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by tangweiqun on 2018/1/6.
  */
object WindowAPITest {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val sc = new SparkContext(sparkConf)

    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sc, Seconds(1))  ////  用來控制RDD的分區

    val lines = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER)

    //每過2秒鐘,而後顯示前20秒的數據
    val windowDStream = lines.window(Seconds(20), Seconds(2))

    windowDStream.print()

    //啓動Streaming處理流
    ssc.start()

    //等待Streaming程序終止
    ssc.awaitTermination()

    ssc.stop(false)
  }
}

  

 

 

 

batch interval - DStream產生的間隔,由StreamingContext指定 (這裏設置爲1s),控制RDD分區
window length - 窗口的長度,即一個窗口包含的RDD的個數 (這裏設置爲20s,必須是batch interval的倍數)
sliding interval - 窗口滑動間隔,執行窗口操做的時間段(這裏設置爲2s,必須是batch interval的倍數)

ReduceByKeyAndWindowAPI

 

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by tangweiqun on 2018/1/6.
  */
object ReduceByKeyAndWindowAPITest {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val sc = new SparkContext(sparkConf)

    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sc, Seconds(1))

    ssc.checkpoint("hdfs://master:9999/user/hadoop-twq/spark-course/streaming/checkpoint")

    val lines = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER)

    val words = lines.flatMap(_.split(" "))

    //每5秒中,統計前20秒內每一個單詞出現的次數
    val wordPair = words.map(x => (x, 1))

    val wordCounts =
      wordPair.reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(20), Seconds(5))

    wordCounts.print()

    //啓動Streaming處理流
    ssc.start()

    ssc.stop(false)






    //接受一個ReduceFunc和一個invReduceFunc
    //滑動時間比較短,窗口長度很長的場景
    //  須要用checkpoint機制
    val wordCountsOther =
      wordPair.reduceByKeyAndWindow((a: Int, b: Int) => a + b,     
        (a: Int, b: Int) => a - b, Seconds(60), Seconds(2))

    wordCountsOther.checkpoint(Seconds(12)) //窗口滑動間隔的5到10倍

    wordCountsOther.print()

    ssc.start()



    //過濾掉value = 0的值
    words.map(x => (x, 1)).reduceByKeyAndWindow((a: Int, b: Int) => a + b,
      (a: Int, b: Int) => a - b,
      Seconds(30), Seconds(10), 4,
      (record: (String, Int)) => record._2 != 0)

    //等待Streaming程序終止
    ssc.awaitTermination()
  }
}

  

 

 

 

一、分別對rdd2和rdd3進行reduceByKey
二、取在window內的rdd進行union,生成unionRDD
三、對unionRDD再次進行reduceByKey
(不須要 checkpoint機制,不須要依賴)

 

一、將兩個window的全部rdd進行cogroup
(須要依賴前面的RDD,所以須要checkpoint機制)
二、對old rdds對應的value應用invReduceF
三、對new rdds對應的value應用reduceF

 

 

localCheckpoint() 存儲在內存和磁盤中,但數據不可靠
checkpoint() 存儲在HDFS中去,數據可靠,提升容錯性能,須要設置文件目錄
 

 

UpdateStateByKeyAPI
一、updateStateByKey,這個API根據一個key的以前的狀態和新的接收到的數據來計算而且更新新狀態。使用這個API須要作兩步:第一就是爲每個key定義一個初始狀態,這個狀態的類型能夠實任意類型;第二就是定義一個更新狀態的函數,這個函數根據每個key以前的狀態和新接收到的數據計算新的狀態。
 
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

import scala.collection.mutable.ListBuffer



object UpdateStateByKeyAPITest {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val sc = new SparkContext(sparkConf)

    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sc, Seconds(1))

    ssc.checkpoint("hdfs://master:9999/user/hadoop-twq/spark-course/streaming/checkpoint")

    val lines = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER)

    val words = lines.flatMap(_.split(" "))

    val wordsDStream = words.map(x => (x, 1))
    ///values: Seq[Int]   在必定的時間段內收到的  當前key在這個時間段內收集到的value,
    /// currentState: Option[Int]  當前key的狀態
    wordsDStream.updateStateByKey(
      (values: Seq[Int], currentState: Option[Int]) => Some(currentState.getOrElse(0) + values.sum)).print()

    //啓動Streaming處理流
    ssc.start()

    ssc.stop(false)



    //updateStateByKey的另外一個API
    ///  接收的函數是Iterator  三元組    String Key   Seq[Int]  接收到的數據   Option[Int]) Key當前的狀態
    wordsDStream.updateStateByKey[Int]((iter: Iterator[(String, Seq[Int], Option[Int])]) => {
      val list = ListBuffer[(String, Int)]()
      while (iter.hasNext) {
        val (key, newCounts, currentState) = iter.next
        val state = Some(currentState.getOrElse(0) + newCounts.sum)

        val value = state.getOrElse(0)
        if (key.contains("error")) {
          list += ((key, value)) // Add only keys with contains error
        }
      }
      list.toIterator
    }, new HashPartitioner(4), true).print()

    ssc.start()


    //等待Streaming程序終止
    ssc.awaitTermination()
  }
}

  

MapWithStateAPI

mapWithState,這個API的功能和updateStateByKey是同樣的,只不過在性能方面作了很大的優化,這個函數對於沒有接收到新數據的key是不會計算新狀態的,而updateStateByKey是會從新計算任何的key的新狀態的,因爲這個緣由因此致使mapWithState能夠處理的key的數量比updateStateByKey多10倍多,性能也比updateStateByKey快不少。 支持促使狀態mapWithState還支持timeout API
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext}


object MapWithStateAPITest {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val sc = new SparkContext(sparkConf)

    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sc, Seconds(5))

    ssc.checkpoint("hdfs://master:9999/user/hadoop-twq/spark-course/streaming/checkpoint")

    val lines = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER)

    val words = lines.flatMap(_.split(" "))

    val wordsDStream = words.map(x => (x, 1))

    val initialRDD = sc.parallelize(List(("dummy", 100L), ("source", 32L)))
    // currentBatchTime : 表示當前的Batch的時間
    // key: 表示須要更新狀態的key
    // value: 表示當前batch的對應的key的對應的值
    // currentState: 對應key的當前的狀態
    val stateSpec = StateSpec.function((currentBatchTime: Time, key: String, value: Option[Int], currentState: State[Long]) => {
      val sum = value.getOrElse(0).toLong + currentState.getOption.getOrElse(0L)
      val output = (key, sum) 
      if (!currentState.isTimingOut()) {
        currentState.update(sum)
      }
      Some(output)
    }).initialState(initialRDD).numPartitions(2).timeout(Seconds(30)) //timeout: 當一個key超過這個時間沒有接收到數據的時候,這個key以及對應的狀態會被移除掉

    val result = wordsDStream.mapWithState(stateSpec)

    result.print()
    //  從一開始顯示全部數據,包含初始值
    result.stateSnapshots().print()

    //啓動Streaming處理流
    ssc.start()

    ssc.stop(false)


    //等待Streaming程序終止
    ssc.awaitTermination()
  }
}
相關文章
相關標籤/搜索