Spark Streaming是Spark核心API的擴展,能夠實現可伸縮、高吞吐量、具有容錯機制的實時流時數據的處理。支持多種數據源,好比Kafka、Flume、Twitter、ZeroMQ、Kinesis 以及TCP sockets。java
可使用諸如map、reduce、join和window等高級函數進行復雜算法(好比,機器學習和圖計算)的處理。最後還能夠將處理結果存儲到文件系統,數據庫和儀表盤。算法
Spark Streaming接收實時流的數據,並根據必定的時間間隔拆分紅一批批的數據,而後經過Spark Engine處理這些批數據,最終獲得處理後的一批批結果數據。數據庫
Spark Streaming提供了一個叫作DStream(discretized stream,離散流)的抽象概念,DStream由一系列的RDD組成,表示每一個批次中連續的數據流。DStream能夠從輸入源(好比,Kafka、Flume、Kinesis等)中建立,也能夠從其餘DStream中使用高級算子操做轉換生成。編程
DStream的全部操做其實都是對DStream中全部RDD的操做。好比,在單詞統計案例中,flatMap轉化操做會應用到每一個行RDD上來生成單詞RDD。緩存
Receiver:Spark Streaming內置的數據流接收器或自定義接收器,用於從數據源接收源源不斷的數據流。架構
CurrentBuffer:用於緩存輸入流接收器接收的數據流。less
BlockIntervalTimer:一個定時器,用於將CurrentBuffer中緩存的數據流封裝爲Block後放入blocksForPushing隊列中。機器學習
BlocksForPushing:待處理的Blocksocket
BlockPushingThread:此線程每隔100毫秒從BlocksForPushing隊列中取出一個Block存入存儲系統,並緩存到ReceivedBlockQueue隊列中。ide
Block Batch:Block批次,按照批次時間間隔,從ReceivedBlockQueue隊列中獲取一批Block。
DStream轉化操做分爲無狀態(stateless)和有狀態(stateful)兩種。
無狀態轉化操做中,每一個批次的處理不依賴於以前批次的數據。
無狀態轉化操做就是把簡單的RDD轉化操做應用到每一個批次上,轉化DStream中的每一個RDD。
經常使用的無狀態轉化操做
函數名稱 | 做用 | scala示例 |
---|---|---|
map() | 對DStream中的每一個元素應用指定函數,返回由各元素輸出的元素組成的DStream | ds.map(x => x+1) |
flatMap() | 對DStream中的每一個元素應用指定函數,返回由各元素輸出的迭代器組成的DStream | ds.flatMap(x => x.split(" ")) |
filter | 返回由給定DStream中經過篩選的元素組成的DStream | ds.filter(x => x!=1) |
repartition() | 改變DStream的分區數 | ds.repartition(10) |
reduceByKey | 將每一個批次中鍵相同的記錄聚合 | ds.reduceByKey((x,y) => x+y) |
groupByKey | 將每一個批次中的記錄根據鍵分組 | ds.groupByKey() |
使用map()和reduceByKey()在每一個時間區間中對日誌根據IP地址進行計數。
//假設ApacheAccessingLog是用來從Apache日誌中解析條目的工具類 val accessLogDStream = logData.map(line => ApacheAccessingLog.parseFromLogLine(line)) val ipDStream = accessLogsDStream.map(entry => (entry.getIpAddress(), 1) val ipCountsDStream = ipDStream.reduceByKey((x,y) => x+y)
//假設ApacheAccessingLog是用來從Apache日誌中解析條目的工具類 static final class IpTuple implements PairFunction<ApacheAccessLog, String, Long> { public Tuple2<String, Long> call(ApacheAccessLog log) { return new Tuple2<>(log.getIpAddress(), 1L); } } JavaDStream<ApacheAccessLog> accessLogDStream = logData.map(new ParseFromLogLine()); JavaPairDStream<String, Long> ipDStream = accessLogDStream.mapToPair(new IpTuple()); JavaPairDStream(String, Long) ipCountsDStream = ipDStream.reduceByKey(new LongSumReducer());
以IP地址爲鍵,將請求計數的數據和傳輸數據量的數據鏈接起來
val ipBytesDStream = accessLogsDStream.map(entry => (entry.getIpAddress(), entry.getContentSize())) val ipBytesSumDStream = ipBytesDStream.reduceByKey((x,y) => x+y) val ipBytesRequestCountDStream = ipCountsDStream.join(ipBytesSumDStream)
JavaPairDStream<String, Long> ipBytesDStream = accessLogsDStream.mapToPair(new IpContentTuple()); JavaPairDStream<String, Long> ipBytesSumDStream = ipBytesDStream.reduceByKey(new LongSumReducer()); JavaPairDStream<String, Tuple2<Long,Long>> ipBytesRequestCountDStream = ipCountsDStream.join(ipBytesSumDStream);
使用transform()操做實現自定義轉化操做,從日誌記錄中提取異常值。
val outlierDStream = accessLogsDStream.transform{ rdd => extractOutliers(rdd) }
JavaPairDStream<String, Long> ipRawDStream = accessLogsDStream.transform( new Function<JavaPairRDD<ApacheAccessLog>, JavaRDD<ApacheAccessLog>>() { public JavaPairRDD<ApacheAccessLog> call(JavaRDD<ApacheAccessLog> rdd) { return extractOutliers(rdd); } } );
DStream的有狀態轉化操做是跨時間區間跟蹤數據的操做,先前批次的數據也被用來在新的批次中計算結果。
有狀態轉化操做主要有兩種類型:滑動窗口和updateStateByKey()。前者以一個時間階段爲滑動窗口進行操做,後者用來跟蹤每一個鍵的狀態變化。
有狀態轉化操做須要在StreamingContext中打開檢查點機制確保容錯性。
ssc.checkpoint("hdfs://...")
基於窗口的操做會在一個比StreamingContext批次間隔更長的時間範圍內,經過整合多個批次的結果,計算出整個窗口的結果。
基於窗口的轉化操做須要兩個參數,分別是窗口時長和滑動時長。二者都是批次間隔的整數倍。
窗口時長:控制每次計算最近的windowDuration/batchInterval個批次的數據。
使用window()對窗口進行計數
val accessLogsWindow = accessLogsDStream.window(Seconds(30), Seconds(10)) val windowCounts = accessLogsWindow.count()
JavaDStream<ApacheAccessLog> accessLogsWindow = accessLogsDStream.window(Durations.seconds(30), Duration.seconds(10)); JavaDStream<Integer> windowCounts = accessLogsWindow.count();
使用reduceByKeyAndWindow對每一個IP地址的訪問量計數
val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1)) val ipCountDStream = ipDStream.reduceByKeyAndWindow( {(x,y) => x+y}, //加入新進入窗口的批次中的元素 {(x,y) => x-y}, //移除離開窗口的老批次中的元素 Seconds(30), //窗口時長 Seconds(10) //滑動步長 )
class ExtractIp extends PairFunction<ApacheAccessLog, String, Long> { public Tuple2<String, Long> call(ApacheAccessLog entry) { return new Tuple2(entry.getIpAddress(), 1L); } } class AddLongs extends Function2<Long, Long, Long>() { public Long call(Long v1, Long v2) { return v1 + v2; } } class SubtractLongs extends Function2<Long, Long, Long>() { public Long call(Long v1, Long v2) { return v1 - v2; } } JavaPairDStream<String, Long> ipAddressPairDStream = accessLogsDStream.mapToPair(new ExtractIp()); JavaPairDStream<String, Long> ipCountDStream = ipAddressPairDStream.reduceByKeyAndWindow( new AddLongs(), //加上新進入窗口的批次中的元素 new SubtractLongs(), //移除離開窗口的老批次中的元素 Durations.seconds(30), //窗口時長 Durations.seconds(10) //滑動步長 )
使用countByWindow和countByValueAndWindow對窗口計數
scala
val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()} val ipAddre***equestCount = ipDStream.countByValueAndWindow(Seconds(30), Seconds(10)) val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10))
JavaDStream<String> ip = accessLogsDStream.map(new Function<ApacheAccessLog, String>() { public String call(ApacheAccessLog entry) { return entry.getIpAddress(); } }); JavaDStream<Long> requestCount = accessLogsDStream.countByWindow(Dirations.seconds(30), Durations.seconds(10)); JavaPairDStream<String, Long> ipAddre***equestCount = ip.countByValueAndWindow(Dirations.seconds(30), Durations.seconds(10));
updateStateByKey提供了跨批次維護狀態的功能,用於鍵值對形式的DStream。
updateStateByKey提供了一個update(events, oldState)函數,接收與某鍵相關的事件及該鍵以前對應的狀態,返回該鍵對應的新狀態。
使用updateStateByKey()跟蹤日誌消息中各HTTP響應代碼的計數。
def updateRunningSum(values: Seq[Long], state: Option[Long]) = { Some(state.getOrElse(0L) + values.size) } val responseCodeDStream = accessLogsDStream.map(log => (log.getResponseCode(), 1L)) val responseCodeCountDStream = responseCodeDStream.updateStateByKey(updateRunningSum _)
class UpdateRunningSum implements Function2<List<Long>, Optional<Long>, Optional<Long>> { public Optional<Long> call(List<Long> nums, Optional<Long> current) { long sum = current.or(0L); return Optional.of(sum + nums.size()); } }; JavaPairDStream<Integer, Long> responseCodeCountDStream = accessLogsDStream.mapToPair( new PairFunction<ApacheAccessLog, Integer, Long>() { public Tuple2<Integer, Long> call(ApacheAccessLog log) { return new Tuple2(log.getResponseCode(), 1L); } } ).updateStateByKey(new UpdateRunningSum());
DStream行動操做同RDD的行動操做。好比,將DStream保存爲SequenceFile文件。
val writableIpAddre***equestCount = ipAddre***equestCount.map{ (ip, count) => <new Text(ip), new LongWritable(count)) } writableIpAddre***equestCount.saveAsHadoopFiles[SequenceFileOutputFormat[Text, LongWritable]]("outputDir", "txt") }
JavaPairDStream<Text, LongWritable> writableDStream = ipDStream.mapToPair( new PairFunction<Tuple2<String, Long>, Text, LongWritable>() { public Tuple2<Text, LongWritable> call(Tuple2<String, Long> e) { return new Tuple2(new Text(e._1()), new LongWritable(e._2())); } } ); writableDStream.saveAsHadoopFiles("outputDir", "txt", Text.class, LongWritable.class, SequenceFileOutputFormat.class);
忠於技術,熱愛分享。歡迎關注公衆號:java大數據編程,瞭解更多技術內容。