1、前述java
今天分享一篇SparkStreaming經常使用的算子transform和updateStateByKey。node
算子內,拿到的RDD算子外,代碼是在Driver端執行的,每一個batchInterval執行一次,能夠作到動態改變廣播變量。apache
爲SparkStreaming中每個Key維護一份state狀態,經過更新函數對該key的狀態不斷更新。windows
2、具體細節api
一、transform 是一個transformation類算子socket
package com.spark.sparkstreaming; import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import com.google.common.base.Optional; import scala.Tuple2; /** * 過濾黑名單 * transform操做 * DStream能夠經過transform作RDD到RDD的任意操做。 * @author root * */ public class TransformOperator { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local[2]").setAppName("transform"); JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5)); //模擬黑名單 List<Tuple2<String,Boolean>> blackList = new ArrayList<Tuple2<String,Boolean>>(); blackList.add(new Tuple2<String,Boolean>("zhangsan",true)); //將黑名單轉換成RDD final JavaPairRDD<String, Boolean> blackNameRDD = jsc.sparkContext().parallelizePairs(blackList); //接受socket數據源 JavaReceiverInputDStream<String> nameList = jsc.socketTextStream("node5", 9999); JavaPairDStream<String, String> pairNameList = nameList.mapToPair(new PairFunction<String, String, String>() { /** *這塊代碼在Driver端執行。 */ private static final long serialVersionUID = 1L; @Override public Tuple2<String, String> call(String s) throws Exception { return new Tuple2<String, String>(s.split(" ")[1], s); } }); JavaDStream<String> transFormResult = pairNameList.transform(new Function<JavaPairRDD<String,String>, JavaRDD<String>>() { /** * */ private static final long serialVersionUID = 1L; @Override public JavaRDD<String> call(JavaPairRDD<String, String> nameRDD) throws Exception { /** * nameRDD: * ("zhangsan","1 zhangsan") * ("lisi","2 lisi") * ("wangwu","3 wangwu") * blackNameRDD: * ("zhangsan",true) * * ("zhangsan",("1 zhangsan",[true])) * */ JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> leftOuterJoin = nameRDD.leftOuterJoin(blackNameRDD); //打印下leftOuterJoin /*leftOuterJoin.foreach(new VoidFunction<Tuple2<String,Tuple2<String,Optional<Boolean>>>>() { *//** * *//* private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Tuple2<String, Optional<Boolean>>> t) throws Exception { System.out.println(t); } });*/ //過濾:true的留下,false的過濾 //("zhangsan",("1 zhangsan",[true])) JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> filter = leftOuterJoin.filter(new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>, Boolean>() { /** * */ private static final long serialVersionUID = 1L; @Override public Boolean call(Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple)throws Exception { if(tuple._2._2.isPresent()){ return !tuple._2._2.get(); } return true; } }); JavaRDD<String> resultJavaRDD = filter.map(new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>, String>() { /** * */ private static final long serialVersionUID = 1L; @Override public String call( Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple) throws Exception { return tuple._2._1; } }); //返回過濾好的結果 return resultJavaRDD; } }); transFormResult.print(); jsc.start(); jsc.awaitTermination(); jsc.stop(); } }
二、UpdateStateByKey算子(至關於對不一樣批次的累加和更新)ide
UpdateStateByKey的主要功能: * 一、爲Spark Streaming中每個Key維護一份state狀態,state類型能夠是任意類型的, 能夠是一個自定義的對象,那麼更新函數也能夠是自定義的。 * 二、經過更新函數對該key的狀態不斷更新,對於每一個新的batch而言,Spark Streaming會在使用updateStateByKey的時候爲已經存在的key進行state的狀態更新
* 使用到updateStateByKey要開啓checkpoint機制和功能。函數
* 多久會將內存中的數據寫入到磁盤一份?優化
若是batchInterval設置的時間小於10秒,那麼10秒寫入磁盤一份。若是batchInterval設置的時間大於10秒,那麼就會batchInterval時間間隔寫入磁盤一份。google
java代碼:
package com.spark.sparkstreaming; import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import com.google.common.base.Optional; import scala.Tuple2; /** * UpdateStateByKey的主要功能: * 一、爲Spark Streaming中每個Key維護一份state狀態,state類型能夠是任意類型的, 能夠是一個自定義的對象,那麼更新函數也能夠是自定義的。 * 二、經過更新函數對該key的狀態不斷更新,對於每一個新的batch而言,Spark Streaming會在使用updateStateByKey的時候爲已經存在的key進行state的狀態更新 * * hello,3 * spark,2 * * 若是要不斷的更新每一個key的state,就必定涉及到了狀態的保存和容錯,這個時候就須要開啓checkpoint機制和功能 * * 全面的廣告點擊分析 * @author root * * 有何用? 統計廣告點擊流量,統計這一天的車流量,統計點擊量 */ public class UpdateStateByKeyOperator { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("UpdateStateByKeyDemo"); JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5)); /** * 設置checkpoint目錄 * * 多久會將內存中的數據(每個key所對應的狀態)寫入到磁盤上一份呢? * 若是你的batch interval小於10s 那麼10s會將內存中的數據寫入到磁盤一份 * 若是bacth interval 大於10s,那麼就以bacth interval爲準 * * 這樣作是爲了防止頻繁的寫HDFS */ JavaSparkContext sparkContext = jsc.sparkContext(); sparkContext.setCheckpointDir("./checkpoint"); // jsc.checkpoint("hdfs://node1:9000/spark/checkpoint"); // jsc.checkpoint("./checkpoint"); JavaReceiverInputDStream<String> lines = jsc.socketTextStream("node5", 9999); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { /** * */ private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String s) { return Arrays.asList(s.split(" ")); } }); JavaPairDStream<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() { /** * */ private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); JavaPairDStream<String, Integer> counts = ones.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { /** * */ private static final long serialVersionUID = 1L; @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) throws Exception { /** * values:通過分組最後 這個key所對應的value [1,1,1,1,1] * state:這個key在本次以前以前的狀態 */ Integer updateValue = 0 ; if(state.isPresent()){ updateValue = state.get(); } for (Integer value : values) { updateValue += value; } return Optional.of(updateValue); } });
//output operator counts.print(); jsc.start(); jsc.awaitTermination(); jsc.close(); } }
scala代碼:
package com.bjsxt.sparkstreaming import org.apache.spark.SparkConf import org.apache.spark.streaming.Durations import org.apache.spark.streaming.StreamingContext object Operator_UpdateStateByKey { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local[2]").setAppName("updateStateByKey") val jsc = new StreamingContext(conf,Durations.seconds(5)) //設置日誌級別 jsc.sparkContext.setLogLevel("WARN") //設置checkpoint路徑 jsc.checkpoint("hdfs://node1:9000/spark/checkpoint") val lineDStream = jsc.socketTextStream("node5", 9999) val wordDStream = lineDStream.flatMap { _.split(" ") } val pairDStream = wordDStream.map { (_,1)} val result = pairDStream.updateStateByKey((seq:Seq[Int],option:Option[Int])=>{ var value = 0 value += option.getOrElse(0) for(elem <- seq){ value +=elem } Option(value) }) result.print() jsc.start() jsc.awaitTermination() jsc.stop() } }
結果:
可見從啓動以來一直維護這個累加狀態!!!
2、windows窗口函數(實現一階段內的累加 ,而不是程序啓動時)
假設每隔5s 1個batch,上圖中窗口長度爲15s,窗口滑動間隔10s。
窗口長度和滑動間隔必須是batchInterval的整數倍。若是不是整數倍會檢測報錯。
優化後的window操做要保存狀態因此要設置checkpoint路徑,沒有優化的window操做能夠不設置checkpoint路徑。
package com.spark.sparkstreaming; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; /** * 基於滑動窗口的熱點搜索詞實時統計 * @author root * */ public class WindowOperator { public static void main(String[] args) { SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName("WindowHotWord"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); /** * 設置日誌級別爲WARN * */ jssc.sparkContext().setLogLevel("WARN"); /** * 注意: * 沒有優化的窗口函數能夠不設置checkpoint目錄 * 優化的窗口函數必須設置checkpoint目錄 */ // jssc.checkpoint("hdfs://node1:9000/spark/checkpoint"); jssc.checkpoint("./checkpoint"); JavaReceiverInputDStream<String> searchLogsDStream = jssc.socketTextStream("node04", 9999); //word 1 JavaDStream<String> searchWordsDStream = searchLogsDStream.flatMap(new FlatMapFunction<String, String>() { /** * */ private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String t) throws Exception { return Arrays.asList(t.split(" ")); } }); // 將搜索詞映射爲(searchWord, 1)的tuple格式 JavaPairDStream<String, Integer> searchWordPairDStream = searchWordsDStream.mapToPair( new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String searchWord) throws Exception { return new Tuple2<String, Integer>(searchWord, 1); } }); /** * 每隔10秒,計算最近60秒內的數據,那麼這個窗口大小就是60秒,裏面有12個rdd,在沒有計算以前,這些rdd是不會進行計算的。 * 那麼在計算的時候會將這12個rdd聚合起來,而後一塊兒執行reduceByKeyAndWindow操做 , * reduceByKeyAndWindow是針對窗口操做的而不是針對DStream操做的。 */ JavaPairDStream<String, Integer> searchWordCountsDStream = searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }, Durations.seconds(15), Durations.seconds(5)); //窗口長度,滑動間隔 /** * window窗口操做優化:不用設置checkpoint目錄。 */ // JavaPairDStream<String, Integer> searchWordCountsDStream = // // searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() { // // private static final long serialVersionUID = 1L; // // @Override // public Integer call(Integer v1, Integer v2) throws Exception { // return v1 + v2; // } // // },new Function2<Integer, Integer, Integer>() { // // private static final long serialVersionUID = 1L; // // @Override // public Integer call(Integer v1, Integer v2) throws Exception { // return v1 - v2; // } // // }, Durations.seconds(15), Durations.seconds(5)); searchWordCountsDStream.print(); jssc.start(); jssc.awaitTermination(); jssc.close(); } }
Scala代碼:
package com.bjsxt.sparkstreaming import org.apache.spark.SparkConf import org.apache.spark.streaming.Durations import org.apache.spark.streaming.StreamingContext object Operator_Window { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local[2]").setAppName("updateStateByKey") val jsc = new StreamingContext(conf,Durations.seconds(5)) //設置日誌級別 jsc.sparkContext.setLogLevel("WARN") //設置checkpoint路徑 jsc.checkpoint("hdfs://node1:9000/spark/checkpoint") val lineDStream = jsc.socketTextStream("node04", 9999) val wordDStream = lineDStream.flatMap { _.split(" ") } val mapDStream = wordDStream.map { (_,1)} //window沒有優化後的 val result = mapDStream.reduceByKeyAndWindow((v1:Int,v2:Int)=>{ v1+v2 }, Durations.seconds(60), Durations.seconds(10)) //優化後的 // val result = mapDStream.reduceByKeyAndWindow((v1:Int,v2:Int)=>{ // v1+v2 // }, (v1:Int,v2:Int)=>{ // v1-v2 // }, Durations.seconds(60), Durations.seconds(10)) result.print() jsc.start() jsc.awaitTermination() jsc.stop() } }
結果: