Basic相關API算法
import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{HashPartitioner, SparkConf, SparkContext} /** * WordCount程序,Spark Streaming消費TCP Server發過來的實時數據的例子: * * 一、在master服務器上啓動一個Netcat server * `$ nc -lk 9998` (若是nc命令無效的話,咱們能夠用yum install -y nc來安裝nc) * * 二、用下面的命令在在集羣中將Spark Streaming應用跑起來 spark-submit --class com.dev.streaming.NetworkWordCount \ --master spark://master:7077 \ --deploy-mode client \ --driver-memory 512m \ --executor-memory 512m \ --total-executor-cores 4 \ --executor-cores 2 \ /home/hadoop-dev/spark-course/streaming/spark-streaming-basic-1.0-SNAPSHOT.jar spark-shell --master spark://master:7077 --total-executor-cores 4 --executor-cores 2 */ object BasicAPITest { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("NetworkWordCount") val sc = new SparkContext(sparkConf) // StreamingContext 編程入口 val ssc = new StreamingContext(sc, Seconds(1)) //數據接收器(Receiver) //建立一個接收器(ReceiverInputDStream),這個接收器接收一臺機器上的某個端口經過socket發送過來的數據並處理 val lines = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER) //數據處理(Process) //處理的邏輯,就是簡單的進行word count val words = lines.flatMap(_.split(" ")).filter(_.contains("exception")) val wordPairs = words.map(x => (x, 1)) // reduceByKey((a: Int, b: Int) => a + b, new HashPartitioner(10) 指定suffer後分區數量和分區算法(默認是HashPartitioner) val wordCounts = wordPairs.repartition(100).reduceByKey((a: Int, b: Int) => a + b, new HashPartitioner(10)) //結果輸出(Output) //將結果輸出到控制檯 wordCounts.print() //啓動Streaming處理流 ssc.start() //等待Streaming程序終止 ssc.awaitTermination() } }
Join相關APIshell
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by tangweiqun on 2018/1/6. */ object JoinAPITest { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("NetworkWordCount") val sc = new SparkContext(sparkConf) // Create the context with a 5 second batch size val ssc = new StreamingContext(sc, Seconds(5)) val lines1 = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER) val kvs1 = lines1.map { line => val arr = line.split(" ") (arr(0), arr(1)) } val lines2 = ssc.socketTextStream("master", 9997, StorageLevel.MEMORY_AND_DISK_SER) val kvs2 = lines2.map { line => val arr = line.split(" ") (arr(0), arr(1)) } kvs1.join(kvs2).print() kvs1.fullOuterJoin(kvs2).print() kvs1.leftOuterJoin(kvs2).print() kvs1.rightOuterJoin(kvs2).print() //啓動Streaming處理流 ssc.start() ssc.stop(false) //等待Streaming程序終止 ssc.awaitTermination() } }
TransformAPIapache
import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} /** * Created by tangweiqun on 2018/1/6. */ object TransformAPITest { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("NetworkWordCount") val sc = new SparkContext(sparkConf) // Create the context with a 1 second batch size val ssc = new StreamingContext(sc, Seconds(5)) val lines1 = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER) val kvs1 = lines1.map { line => val arr = line.split(" ") (arr(0), arr(1)) } /// 實時數據 val path = "hdfs://master:9999/user/hadoop-twq/spark-course/streaming/keyvalue.txt" val keyvalueRDD = sc.textFile(path).map { line => val arr = line.split(" ") (arr(0), arr(1)) } /// 靜態數據 kvs1.transform { rdd => rdd.join(keyvalueRDD) } print() //啓動Streaming處理流 ssc.start() ssc.stop(false) val lines2 = ssc.socketTextStream("master", 9997, StorageLevel.MEMORY_AND_DISK_SER) val kvs2 = lines2.map { line => val arr = line.split(" ") (arr(0), arr(1)) } //(將實時數據與靜態數據相關聯) kvs1.transformWith(kvs2, (rdd1: RDD[(String, String)], rdd2: RDD[(String, String)]) => rdd1.join(rdd2)) //等待Streaming程序終止 ssc.awaitTermination() } }
WindowAPI編程
import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} /** * Created by tangweiqun on 2018/1/6. */ object WindowAPITest { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("NetworkWordCount") val sc = new SparkContext(sparkConf) // Create the context with a 1 second batch size val ssc = new StreamingContext(sc, Seconds(1)) //// 用來控制RDD的分區 val lines = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER) //每過2秒鐘,而後顯示前20秒的數據 val windowDStream = lines.window(Seconds(20), Seconds(2)) windowDStream.print() //啓動Streaming處理流 ssc.start() //等待Streaming程序終止 ssc.awaitTermination() ssc.stop(false) } }
import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} /** * Created by tangweiqun on 2018/1/6. */ object ReduceByKeyAndWindowAPITest { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("NetworkWordCount") val sc = new SparkContext(sparkConf) // Create the context with a 1 second batch size val ssc = new StreamingContext(sc, Seconds(1)) ssc.checkpoint("hdfs://master:9999/user/hadoop-twq/spark-course/streaming/checkpoint") val lines = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) //每5秒中,統計前20秒內每一個單詞出現的次數 val wordPair = words.map(x => (x, 1)) val wordCounts = wordPair.reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(20), Seconds(5)) wordCounts.print() //啓動Streaming處理流 ssc.start() ssc.stop(false) //接受一個ReduceFunc和一個invReduceFunc //滑動時間比較短,窗口長度很長的場景 // 須要用checkpoint機制 val wordCountsOther = wordPair.reduceByKeyAndWindow((a: Int, b: Int) => a + b, (a: Int, b: Int) => a - b, Seconds(60), Seconds(2)) wordCountsOther.checkpoint(Seconds(12)) //窗口滑動間隔的5到10倍 wordCountsOther.print() ssc.start() //過濾掉value = 0的值 words.map(x => (x, 1)).reduceByKeyAndWindow((a: Int, b: Int) => a + b, (a: Int, b: Int) => a - b, Seconds(30), Seconds(10), 4, (record: (String, Int)) => record._2 != 0) //等待Streaming程序終止 ssc.awaitTermination() } }
import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{HashPartitioner, SparkConf, SparkContext} import scala.collection.mutable.ListBuffer object UpdateStateByKeyAPITest { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("NetworkWordCount") val sc = new SparkContext(sparkConf) // Create the context with a 1 second batch size val ssc = new StreamingContext(sc, Seconds(1)) ssc.checkpoint("hdfs://master:9999/user/hadoop-twq/spark-course/streaming/checkpoint") val lines = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordsDStream = words.map(x => (x, 1)) ///values: Seq[Int] 在必定的時間段內收到的 當前key在這個時間段內收集到的value, /// currentState: Option[Int] 當前key的狀態 wordsDStream.updateStateByKey( (values: Seq[Int], currentState: Option[Int]) => Some(currentState.getOrElse(0) + values.sum)).print() //啓動Streaming處理流 ssc.start() ssc.stop(false) //updateStateByKey的另外一個API /// 接收的函數是Iterator 三元組 String Key Seq[Int] 接收到的數據 Option[Int]) Key當前的狀態 wordsDStream.updateStateByKey[Int]((iter: Iterator[(String, Seq[Int], Option[Int])]) => { val list = ListBuffer[(String, Int)]() while (iter.hasNext) { val (key, newCounts, currentState) = iter.next val state = Some(currentState.getOrElse(0) + newCounts.sum) val value = state.getOrElse(0) if (key.contains("error")) { list += ((key, value)) // Add only keys with contains error } } list.toIterator }, new HashPartitioner(4), true).print() ssc.start() //等待Streaming程序終止 ssc.awaitTermination() } }
import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.{SparkConf, SparkContext} object MapWithStateAPITest { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("NetworkWordCount") val sc = new SparkContext(sparkConf) // Create the context with a 1 second batch size val ssc = new StreamingContext(sc, Seconds(5)) ssc.checkpoint("hdfs://master:9999/user/hadoop-twq/spark-course/streaming/checkpoint") val lines = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordsDStream = words.map(x => (x, 1)) val initialRDD = sc.parallelize(List(("dummy", 100L), ("source", 32L))) // currentBatchTime : 表示當前的Batch的時間 // key: 表示須要更新狀態的key // value: 表示當前batch的對應的key的對應的值 // currentState: 對應key的當前的狀態 val stateSpec = StateSpec.function((currentBatchTime: Time, key: String, value: Option[Int], currentState: State[Long]) => { val sum = value.getOrElse(0).toLong + currentState.getOption.getOrElse(0L) val output = (key, sum) if (!currentState.isTimingOut()) { currentState.update(sum) } Some(output) }).initialState(initialRDD).numPartitions(2).timeout(Seconds(30)) //timeout: 當一個key超過這個時間沒有接收到數據的時候,這個key以及對應的狀態會被移除掉 val result = wordsDStream.mapWithState(stateSpec) result.print() // 從一開始顯示全部數據,包含初始值 result.stateSnapshots().print() //啓動Streaming處理流 ssc.start() ssc.stop(false) //等待Streaming程序終止 ssc.awaitTermination() } }