Spark Streaming筆記整理(三):DS的transformation與output操做

DStream的各類transformation

Transformation Meaning
map(func)         對DStream中的各個元素進行func函數操做,而後返回一個新的DStream. flatMap(func)       與map方法相似,只不過各個輸入項能夠被輸出爲零個或多個輸出項 filter(func)        過濾出全部函數func返回值爲true的DStream元素並返回一個新的DStream repartition(numPartitions) 增長或減小DStream中的分區數,從而改變DStream的並行度 union(otherStream)     將源DStream和輸入參數爲otherDStream的元素合併,並返回一個新的DStream. count()          經過對DStreaim中的各個RDD中的元素進行計數,而後返回只有一個元素的RDD構成的DStream reduce(func)       對源DStream中的各個RDD中的元素利用func進行聚合操做,而後返回只有一個元素的RDD構成的新的DStream. countByValue()       對於元素類型爲K的DStream,返回一個元素爲(K,Long)鍵值對形式的新的DStream,Long對應的值爲源DStream中各個RDD的key出現的次數 reduceByKey(func, [numTasks])利用func函數對源DStream中的key進行聚合操做,而後返回新的(K,V)對構成的DStream join(otherStream, [numTasks])輸入爲(K,V)、(K,W)類型的DStream,返回一個新的(K,(V,W)類型的DStream cogroup(otherStream, [numTasks]) 輸入爲(K,V)、(K,W)類型的DStream,返回一個新的 (K, Seq[V], Seq[W]) 元組類型的DStream transform(func)     經過RDD-to-RDD函數做用於源碼DStream中的各個RDD,能夠是任意的RDD操做,從而返回一個新的RDD updateStateByKey(func)   根據於key的前置狀態和key的新值,對key進行更新,返回一個新狀態的Dstream Window 函數: 

能夠看到不少都是在RDD中已經有的transformation算子操做,因此這裏只關注transform、updateStateByKey和window函數php

transformation之transform操做

DStream transform

一、transform操做,應用在DStream上時,能夠用於執行任意的RDD到RDD的轉換操做。它能夠用於實現,DStream API中所沒有提供的操做。好比說,DStream API中,並無提供將一個DStream中的每一個batch,與一個特定的RDD進行join的操做。可是咱們本身就可使用transform操做來實現該功能。java

二、DStream.join(),只能join其餘DStream。在DStream每一個batch的RDD計算出來以後,會去跟其餘DStream的RDD進行join。python

案例

測試代碼以下:shell

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

/** * 使用Transformation之transform來完成在線黑名單過濾 * 需求: * 將日誌數據中來自於ip["27.19.74.143", "110.52.250.126"]實時過濾掉 * 數據格式 * 27.19.74.143##2016-05-30 17:38:20##GET /static/image/common/faq.gif HTTP/1.1##200##1127 */ object _06SparkStreamingTransformOps { def main(args: Array[String]): Unit = { if (args == null || args.length < 2) { System.err.println( """Parameter Errors! Usage: <hostname> <port> |hostname: 監聽的網絡socket的主機名或ip地址 |port: 監聽的網絡socket的端口 """.stripMargin) System.exit(-1) } Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_01SparkStreamingNetWorkOps.getClass.getSimpleName) .setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(2)) val hostname = args(0).trim val port = args(1).trim.toInt //黑名單數據 val blacklist = List(("27.19.74.143", true), ("110.52.250.126", true)) // val blacklist = List("27.19.74.143", "110.52.250.126") val blacklistRDD:RDD[(String, Boolean)] = ssc.sparkContext.parallelize(blacklist) val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port) // 若是用到一個DStream和rdd進行操做,沒法使用dstream直接操做,只能使用transform來進行操做 val filteredDStream:DStream[String] = linesDStream.transform(rdd => { val ip2InfoRDD:RDD[(String, String)] = rdd.map{line => { (line.split("##")(0), line) }} /** A(M) B(N)兩張表: * across join * 交叉鏈接,沒有on條件的鏈接,會產生笛卡爾積(M*N條記錄) 不能用 * inner join * 等值鏈接,取A表和B表的交集,也就是獲取在A和B中都有的數據,沒有的剔除掉 不能用 * left outer join * 外連接:最經常使用就是左外鏈接(將左表中全部的數據保留,右表中可以對應上的數據正常顯示,在右表中對應不上,顯示爲null) * 能夠經過非空判斷是左外鏈接達到inner join的結果 */ val joinedInfoRDD:RDD[(String, (String, Option[Boolean]))] = ip2InfoRDD.leftOuterJoin(blacklistRDD) joinedInfoRDD.filter{case (ip, (line, joined)) => { joined == None }}//執行過濾操做 .map{case (ip, (line, joined)) => line} }) filteredDStream.print() ssc.start() ssc.awaitTermination() ssc.stop() // stop中的boolean參數,設置爲true,關閉該ssc對應的SparkContext,默認爲false,只關閉自身 } }

nc中產生數據:數據庫

[uplooking@uplooking01 ~]$ nc -lk 4893
27.19.74.143##2016-05-30 17:38:20##GET /data/attachment/common/c8/common_2_verify_icon.png HTTP/1.1##200##582
110.52.250.126##2016-05-30 17:38:20##GET /static/js/logging.js?y7a HTTP/1.1##200##603
8.35.201.144##2016-05-30 17:38:20##GET /uc_server/avatar.php?uid=29331&size=middle HTTP/1.1##301##-

輸出結果以下:apache

------------------------------------------- Time: 1526006084000 ms ------------------------------------------- 8.35.201.144##2016-05-30 17:38:20##GET /uc_server/avatar.php?uid=29331&size=middle HTTP/1.1##301##-

transformation之updateStateByKey操做

概述

一、Spark Streaming的updateStateByKey能夠DStream中的數據進行按key作reduce操做,而後對各個批次的數據進行累加。api

二、 updateStateByKey 解釋緩存

以DStream中的數據進行按key作reduce操做,而後對各個批次的數據進行累加在有新的數據信息進入或更新時,可讓用戶保持想要的任何狀。使用這個功能須要完成兩步:ruby

1) 定義狀態:能夠是任意數據類型bash

2) 定義狀態更新函數:用一個函數指定如何使用先前的狀態,從輸入流中的新值更新狀態。對於有狀態操做,要不斷的把當前和歷史的時間切片的RDD累加計算,隨着時間的流失,計算的數據規模會變得愈來愈大

三、要思考的是若是數據量很大的時候,或者對性能的要求極爲苛刻的狀況下,能夠考慮將數據放在Redis或者tachyon或者ignite上

四、注意,updateStateByKey操做,要求必須開啓Checkpoint機制。

案例

Scala版

測試代碼以下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 狀態函數updateStateByKey * 更新key的狀態(就是key對應的value) * * 一般的做用,計算某個key截止到當前位置的狀態 * 統計截止到目前爲止的word對應count * 要想完成截止到目前爲止的操做,必須將歷史的數據和當前最新的數據累計起來,因此須要一個地方來存放歷史數據 * 這個地方就是checkpoint目錄 * */ object _07SparkStreamingUpdateStateByKeyOps { def main(args: Array[String]): Unit = { if (args == null || args.length < 2) { System.err.println( """Parameter Errors! Usage: <hostname> <port> |hostname: 監聽的網絡socket的主機名或ip地址 |port: 監聽的網絡socket的端口 """.stripMargin) System.exit(-1) } val hostname = args(0).trim val port = args(1).trim.toInt Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_07SparkStreamingUpdateStateByKeyOps.getClass.getSimpleName) .setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(2)) ssc.checkpoint("hdfs://ns1/checkpoint/streaming/usb") // 接收到的當前批次的數據 val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port) // 這是記錄下來的當前批次的數據 val rbkDStream:DStream[(String, Int)] =linesDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_) val usbDStream:DStream[(String, Int)] = rbkDStream.updateStateByKey(updateFunc) usbDStream.print() ssc.start() ssc.awaitTermination() ssc.stop() // stop中的boolean參數,設置爲true,關閉該ssc對應的SparkContext,默認爲false,只關閉自身 } /** * @param seq 當前批次的key對應的數據 * @param history 歷史key對應的數據,可能有可能沒有 * @return */ def updateFunc(seq: Seq[Int], history: Option[Int]): Option[Int] = { var sum = seq.sum if(history.isDefined) { sum += history.get } Option[Int](sum) } }

nc產生數據:

[uplooking@uplooking01 ~]$ nc -lk 4893 hello hello hello you hello he hello me

輸出結果以下:

-------------------------------------------
Time: 1526009358000 ms
-------------------------------------------
(hello,2)

18/05/11 11:29:18 INFO WriteAheadLogManager  for Thread: Attempting to clear 0 old log files in hdfs://ns1/checkpoint/streaming/usb/receivedBlockMetadata older than 1526009338000: ------------------------------------------- Time: 1526009360000 ms ------------------------------------------- (hello,5) (me,1) (you,1) (he,1) 18/05/11 11:29:20 INFO WriteAheadLogManager for Thread: Attempting to clear 0 old log files in hdfs://ns1/checkpoint/streaming/usb/receivedBlockMetadata older than 1526009340000: ------------------------------------------- Time: 1526009362000 ms ------------------------------------------- (hello,5) (me,1) (you,1) (he,1)

Java版

用法略有不一樣,主要是 狀態更新函數的寫法上有區別,以下:

package cn.xpleaf.bigdata.spark.java.streaming.p1; import com.google.common.base.Optional; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import java.util.Arrays; import java.util.List; public class _02SparkStreamingUpdateStateByKeyOps { public static void main(String[] args) { if(args == null || args.length < 2) { System.err.println("Parameter Errors! Usage: <hostname> <port>"); System.exit(-1); } Logger.getLogger("org.apache.spark").setLevel(Level.OFF); SparkConf conf = new SparkConf() .setAppName(_02SparkStreamingUpdateStateByKeyOps.class.getSimpleName()) .setMaster("local[2]"); JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(2)); jsc.checkpoint("hdfs://ns1/checkpoint/streaming/usb"); String hostname = args[0].trim(); int port = Integer.valueOf(args[1].trim()); JavaReceiverInputDStream<String> lineDStream = jsc.socketTextStream(hostname, port);//默認的持久化級別:MEMORY_AND_DISK_SER_2 JavaDStream<String> wordsDStream = lineDStream.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); JavaPairDStream<String, Integer> pairsDStream = wordsDStream.mapToPair(word -> { return new Tuple2<String, Integer>(word, 1); }); JavaPairDStream<String, Integer> rbkDStream = pairsDStream.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); // 作歷史的累計操做 JavaPairDStream<String, Integer> usbDStream = rbkDStream.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { @Override public Optional<Integer> call(List<Integer> current, Optional<Integer> history) throws Exception { int sum = 0; for (int i : current) { sum += i; } if (history.isPresent()) { sum += history.get(); } return Optional.of(sum); } }); usbDStream.print(); jsc.start();//啓動流式計算 jsc.awaitTermination();//等待執行結束 jsc.close(); } }

transformation之window操做

DStream window 滑動窗口

Spark Streaming提供了滑動窗口操做的支持,從而讓咱們能夠對一個滑動窗口內的數據執行計算操做。每次掉落在窗口內的RDD的數據,會被聚合起來執行計算操做,而後生成的RDD,會做爲window DStream的一個RDD。好比下圖中,就是對每三秒鐘的數據執行一次滑動窗口計算,這3秒內的3個RDD會被聚合起來進行處理,而後過了兩秒鐘,又會對最近三秒內的數據執行滑動窗口計算。因此每一個滑動窗口操做,都必須指定兩個參數,窗口長度以及滑動間隔,並且這兩個參數值都必須是batch間隔的整數倍。

Spark Streaming筆記整理(三):DS的transformation與output操做

1.紅色的矩形就是一個窗口,窗口hold的是一段時間內的數據流。

2.這裏面每個time都是時間單元,在官方的例子中,每隔window size是3 time unit, 並且每隔2個單位時間,窗口會slide一次。

因此基於窗口的操做,須要指定2個參數:

window length - The duration of the window (3 in the figure) slide interval - The interval at which the window-based operation is performed (2 in the figure). 1.窗口大小,我的感受是一段時間內數據的容器。 2.滑動間隔,就是咱們能夠理解的cron表達式吧。 舉個例子吧: 仍是以最著名的wordcount舉例,每隔10秒,統計一下過去30秒過來的數據。 // Reduce last 30 seconds of data, every 10 seconds val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10)) 

DSstream window滑動容器功能

window 對每一個滑動窗口的數據執行自定義的計算 countByWindow 對每一個滑動窗口的數據執行count操做 reduceByWindow 對每一個滑動窗口的數據執行reduce操做 reduceByKeyAndWindow 對每一個滑動窗口的數據執行reduceByKey操做 countByValueAndWindow 對每一個滑動窗口的數據執行countByValue操做

案例

測試代碼以下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} /** *窗口函數window * 每隔多長時間(滑動頻率slideDuration)統計過去多長時間(窗口長度windowDuration)中的數據 * 須要注意的就是窗口長度和滑動頻率 * windowDuration = M*batchInterval, slideDuration = N*batchInterval */ object _08SparkStreamingWindowOps { def main(args: Array[String]): Unit = { if (args == null || args.length < 2) { System.err.println( """Parameter Errors! Usage: <hostname> <port> |hostname: 監聽的網絡socket的主機名或ip地址 |port: 監聽的網絡socket的端口 """.stripMargin) System.exit(-1) } val hostname = args(0).trim val port = args(1).trim.toInt Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_08SparkStreamingWindowOps.getClass.getSimpleName) .setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(2)) // 接收到的當前批次的數據 val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port) val pairsDStream:DStream[(String, Int)] =linesDStream.flatMap(_.split(" ")).map((_, 1)) // 每隔4s,統計過去6s中產生的數據 val retDStream:DStream[(String, Int)] = pairsDStream.reduceByKeyAndWindow(_+_, windowDuration = Seconds(6), slideDuration = Seconds(4)) retDStream.print() ssc.start() ssc.awaitTermination() ssc.stop() // stop中的boolean參數,設置爲true,關閉該ssc對應的SparkContext,默認爲false,只關閉自身 } }

nc產生數據:

[uplooking@uplooking01 ~]$ nc -lk 4893 hello you hello he hello me hello you hello he

輸出結果以下:

------------------------------------------- Time: 1526016316000 ms ------------------------------------------- (hello,4) (me,1) (you,2) (he,1) ------------------------------------------- Time: 1526016320000 ms ------------------------------------------- (hello,5) (me,1) (you,2) (he,2) ------------------------------------------- Time: 1526016324000 ms -------------------------------------------

DStream的output操做以及foreachRDD

DStream output操做

一、print 打印每一個batch中的前10個元素,主要用於測試,或者是不須要執行什麼output操做時,用於簡單觸發一下job。

二、saveAsTextFile(prefix, [suffix]) 將每一個batch的數據保存到文件中。每一個batch的文件的命名格式爲:prefix-TIME_IN_MS[.suffix]

三、saveAsObjectFile 同上,可是將每一個batch的數據以序列化對象的方式,保存到SequenceFile中。

四、saveAsHadoopFile 同上,將數據保存到Hadoop文件中

五、foreachRDD 最經常使用的output操做,遍歷DStream中的每一個產生的RDD,進行處理。能夠將每一個RDD中的數據寫入外部存儲,好比文件、數據庫、緩存等。一般在其中,是針對RDD執行action操做的,好比foreach。

DStream foreachRDD詳解

相關內容其實在Spark開發調優中已經有相關的說明。

一般在foreachRDD中,都會建立一個Connection,好比JDBC Connection,而後經過Connection將數據寫入外部存儲。

誤區一:在RDD的foreach操做外部,建立Connection

這種方式是錯誤的,由於它會致使Connection對象被序列化後傳輸到每一個Task中。而這種Connection對象,實際上通常是不支持序列化的,也就沒法被傳輸。

dstream.foreachRDD { rdd =>
  val connection = createNewConnection() 
  rdd.foreach { record => connection.send(record) } }

誤區二:在RDD的foreach操做內部,建立Connection

這種方式是能夠的,可是效率低下。由於它會致使對於RDD中的每一條數據,都建立一個Connection對象。而一般來講,Connection的建立,是很消耗性能的。

dstream.foreachRDD { rdd =>
  rdd.foreach { record => val connection = createNewConnection() connection.send(record) connection.close() } }

DStream foreachRDD合理使用

合理方式一:使用RDD的foreachPartition操做,而且在該操做內部,建立Connection對象,這樣就至關因而,爲RDD的每一個partition建立一個Connection對象,節省資源的多了。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record)) connection.close() } }

合理方式二:本身手動封裝一個靜態鏈接池,使用RDD的foreachPartition操做,而且在該操做內部,從靜態鏈接池中,經過靜態方法,獲取到一個鏈接,使用以後再還回去。這樣的話,甚至在多個RDD的partition之間,也能夠複用鏈接了。並且可讓鏈接池採起懶建立的策略,而且空閒一段時間後,將其釋放掉。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) } }

foreachRDD 與foreachPartition實現實戰

須要注意的是:

(1)、你最好使用forEachPartition函數來遍歷RDD,而且在每臺Work上面建立數據庫的connection。

(2)、若是你的數據庫併發受限,能夠經過控制數據的分區來減小併發。

(3)、在插入MySQL的時候最好使用批量插入。

(4),確保你寫入的數據庫過程可以處理失敗,由於你插入數據庫的過程可能會通過網絡,這可能致使數據插入數據庫失敗。

(5)、不建議將你的RDD數據寫入到MySQL等關係型數據庫中。

這部份內容其實能夠參考開發調優部分的案例,只是那裏並無foreachRDD,由於其並無使用DStream,可是原理是同樣的,由於最終都是針對RDD來進行操做的。

 

 

原文連接:http://blog.51cto.com/xpleaf/2115343

相關文章
相關標籤/搜索