1、transform以及實時黑名單過濾案例實戰java
一、概述apache
transform操做,應用在DStream上時,能夠用於執行任意的RDD到RDD的轉換操做。它能夠用於實現,DStream API中所沒有提供的操做。好比說,DStream API中,
並無提供將一個DStream中的每一個batch,與一個特定的RDD進行join的操做。可是咱們本身就可使用transform操做來實現該功能。
DStream.join(),只能join其餘DStream。在DStream每一個batch的RDD計算出來以後,會去跟其餘DStream的RDD進行join。
案例:廣告計費日誌實時黑名單過濾
二、java案例api
package cn.spark.study.streaming; import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import com.google.common.base.Optional; import scala.Tuple2; /** * 基於transform的實時廣告計費日誌黑名單過濾 * * @author bcqf * */ public class TransformBlacklist { @SuppressWarnings("deprecation") public static void main(String[] args) { SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName("TransformBlacklist"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); // 用戶對咱們的網站上的廣告能夠進行點擊 // 點擊以後,是否是要進行實時計費,點一下,算一次錢 // 可是,對於那些幫助某些無良商家刷廣告的人,那麼咱們有一個黑名單 // 只要是黑名單中的用戶點擊的廣告,咱們就給過濾掉 // 先作一份模擬的黑名單RDD List<Tuple2<String, Boolean>> blacklist = new ArrayList<Tuple2<String, Boolean>>(); blacklist.add(new Tuple2<String, Boolean>("tom", true)); final JavaPairRDD<String, Boolean> blacklistRDD = jssc.sc().parallelizePairs(blacklist); // 這裏的日誌格式,就簡化一下,就是date username的方式 JavaReceiverInputDStream<String> adsClickLogDStream = jssc.socketTextStream("spark1", 9999); // 因此,要先對輸入的數據,進行一下轉換操做,變成,(username, date username) // 以便於,後面對每一個batch RDD,與定義好的黑名單RDD進行join操做 JavaPairDStream<String, String> userAdsClickLogDStream = adsClickLogDStream.mapToPair( new PairFunction<String, String, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, String> call(String adsClickLog) throws Exception { return new Tuple2<String, String>( adsClickLog.split(" ")[1], adsClickLog); } }); // 而後,就能夠執行transform操做了,將每一個batch的RDD,與黑名單RDD進行join、filter、map等操做 // 實時進行黑名單過濾 JavaDStream<String> validAdsClickLogDStream = userAdsClickLogDStream.transform( new Function<JavaPairRDD<String,String>, JavaRDD<String>>() { private static final long serialVersionUID = 1L; @Override public JavaRDD<String> call(JavaPairRDD<String, String> userAdsClickLogRDD) throws Exception { // 這裏爲何用左外鏈接? // 由於,並非每一個用戶都存在於黑名單中的 // 因此,若是直接用join,那麼沒有存在於黑名單中的數據,會沒法join到 // 就給丟棄掉了 // 因此,這裏用leftOuterJoin,就是說,哪怕一個user不在黑名單RDD中,沒有join到 // 也仍是會被保存下來的 JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> joinedRDD = userAdsClickLogRDD.leftOuterJoin(blacklistRDD); // 鏈接以後,執行filter算子 JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> filteredRDD = joinedRDD.filter( new Function<Tuple2<String, Tuple2<String,Optional<Boolean>>>, Boolean>() { private static final long serialVersionUID = 1L; @Override public Boolean call( Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple) throws Exception { // 這裏的tuple,就是每一個用戶,對應的訪問日誌,和在黑名單中 // 的狀態 if(tuple._2._2().isPresent() && tuple._2._2.get()) { return false; } return true; } }); // 此時,filteredRDD中,就只剩下沒有被黑名單過濾的用戶點擊了 // 進行map操做,轉換成咱們想要的格式 JavaRDD<String> validAdsClickLogRDD = filteredRDD.map( new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>, String>() { private static final long serialVersionUID = 1L; @Override public String call( Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple) throws Exception { return tuple._2._1; } }); // return validAdsClickLogRDD; } }); // 打印有效的廣告點擊日誌 // 其實在真實企業場景中,這裏後面就能夠走寫入kafka、ActiveMQ等這種中間件消息隊列 // 而後再開發一個專門的後臺服務,做爲廣告計費服務,執行實時的廣告計費,這裏就是隻拿到了有效的廣告點擊 validAdsClickLogDStream.print(); jssc.start(); jssc.awaitTermination(); jssc.close(); } } ##在eclipse中運行程序 ##服務器端運行nc [root@spark1 kafka]# nc -lk 9999 20150814 marry 20150814 tom ##結果,tom已經被過濾掉了 20150814 marry
二、scala案例服務器
package cn.spark.study.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds /** * @author bcqf */ object TransformBlacklist { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local[2]") .setAppName("TransformBlacklist") val ssc = new StreamingContext(conf, Seconds(5)) val blacklist = Array(("tom", true)) val blacklistRDD = ssc.sparkContext.parallelize(blacklist, 5) val adsClickLogDStream = ssc.socketTextStream("spark1", 9999) val userAdsClickLogDStream = adsClickLogDStream .map { adsClickLog => (adsClickLog.split(" ")(1), adsClickLog) } val validAdsClickLogDStream = userAdsClickLogDStream.transform(userAdsClickLogRDD => { val joinedRDD = userAdsClickLogRDD.leftOuterJoin(blacklistRDD) val filteredRDD = joinedRDD.filter(tuple => { if(tuple._2._2.getOrElse(false)) { false } else { true } }) val validAdsClickLogRDD = filteredRDD.map(tuple => tuple._2._1) validAdsClickLogRDD }) validAdsClickLogDStream.print() ssc.start() ssc.awaitTermination() } }