用實例講解Spark Sreaming--轉

本篇文章用Spark Streaming +Hbase爲列,Spark Streaming專爲流式數據處理,對Spark核心API進行了相應的擴展。html

什麼是Spark Streaming?

首先,什麼是流式處理呢?數據流是一個數據持續不斷到達的無邊界序列集。流式處理是把接二連三的數據輸入分割成單元數據塊來處理。流式處理是一個低延遲的處理和流式數據分析。Spark Streaming對Spark核心API進行了相應的擴展,支持高吞吐、低延遲、可擴展的流式數據處理。實時數據處理應用的場景有下面幾個:java

  • 網站監控和網絡監控;
  • 異常監測;
  • 網頁點擊;
  • 廣告數據;

物聯網(IOT)

圖1數據庫

Spark Streaming支持的數據源包括HDFS文件,TCP socket,Kafka,Flume,Twitter等,數據流能夠經過Spark核心API、DataFrame SQL或者機器學習API處理,並能夠持久化到本地文件、HDFS、數據庫或者其它任意支持Hadoop輸出格式的形式。apache

Spark Streaming如何工做?

Spark Streaming以X秒(batch size)爲時間間隔把數據流分割成Dstream,組成一個RDD序列。你的Spark應用處理RDD,並把處理的結果批量返回。

圖2api

Spark Streaming例子的架構圖


圖3網絡

Spark Streaming例子代碼分下面幾部分:
- 讀取流式數據;
- 處理流式數據;
- 寫處理結果倒Hbase表。架構

Spark處理部分的代碼涉及到以下內容:app

  • 讀取Hbase表的數據;
  • 按天計算數據統計;
  • 寫統計結果到Hbase表,列簇:stats。

數據集

數據集來自油泵信號數據,以CSV格式存儲在指定目錄下。Spark Streaming監控此目錄,CSV文件的格式如圖3。

圖4機器學習

採用Scala的case class來定義數據表結構,parseSensor函數解析逗號分隔的數據。socket

Hbase表結構

流式處理的Hbase表結構以下:

  • 油泵名字 + 日期 + 時間戳 組合成row key;
  • 列簇是由輸入數據列、報警數據列等組成,並設置過時時間。
  • 天天等統計數據表結構以下:
  • 油泵名和日期組成row key;

列簇爲stats,包含列有最大值、最小值和平均值;

圖5

配置寫入Hbase表

Spark直接用TableOutputFormat類寫數據到Hbase裏,跟在MapReduce中寫數據到Hbase表同樣,下面就直接用TableOutputFormat類了。

Spark Streaming代碼

Spark Streaming的基本步驟:

  • 初始化Spark StreamingContext對象;
  • 在DStream上進行transformation操做和輸出操做;
  • 開始接收數據並用streamingContext.start();
  • 等待處理中止,streamingContext.awaitTermination()。

初始化Spark StreamingContext對象

建立 StreamingContext對象,StreamingContext是Spark Streaming處理的入口,這裏設置2秒的時間間隔。

val sparkConf = new SparkConf().setAppName("HBaseStream")
// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sparkConf, Seconds(2))

接下來用StreamingContext的textFileStream(directory)建立輸入流跟蹤Hadoop文件系統的新文件,並處理此目錄下的全部文件,這裏directory指文件目錄。

// create a DStream that represents streaming data from a directory source
val linesDStream = ssc.textFileStream("/user/user01/stream")

linesDStream是數據流,每條記錄是按行記錄的text格式。

圖6

對DStream進行transformation操做和輸出操做

接下來進行解析,對linesDStream進行map操做,map操做是對RDD應用Sensor.parseSensor函數,返回Sensor的RDD。

// parse each line of data in linesDStream into sensor objects
val sensorDStream = linesDStream.map(Sensor.parseSensor)


圖7

對DStream的每一個RDD執行foreachRDD 方法,使用filter過濾Sensor中低psi值來建立報警,使用Hbase的Put對象轉換sensor和alter數據以便能寫入到Hbase。而後使用PairRDDFunctions的saveAsHadoopDataset方法將最終結果寫入到任何Hadoop兼容到存儲系統。

// for each RDD. performs function on each RDD in DStream
sensorRDD.foreachRDD { rdd =>
// filter sensor data for low psi
val alertRDD = rdd.filter(sensor => sensor.psi < 5.0)
// convert sensor data to put object and write to HBase Table CF data
rdd.map(Sensor.convertToPut).saveAsHadoopDataset(jobConfig)
// convert alert to put object write to HBase Table CF alerts
rdd.map(Sensor.convertToPutAlert).saveAsHadoopDataset(jobConfig)
}

sensorRDD通過Put對象轉換,而後寫入到Hbase。

圖8

開始接收數據

經過streamingContext.start()顯式的啓動數據接收,而後調用streamingContext.awaitTermination()來等待計算完成。

// Start the computation
    ssc.start()
    // Wait for the computation to terminate
    ssc.awaitTermination()

Spark讀寫Hbase

如今開始讀取Hbase的sensor表,計算每條的統計指標並把對應的數據寫入stats列簇。

圖9

下面的代碼讀取Hbase的sensor表psi列數據,用StatCounter計算統計數據,而後寫入stats列簇。

// configure HBase for reading 
    val conf = HBaseConfiguration.create()
    conf.set(TableInputFormat.INPUT_TABLE, HBaseSensorStream.tableName)
    // scan data column family psi column
    conf.set(TableInputFormat.SCAN_COLUMNS, "data:psi") 
// Load an RDD of (row key, row Result) tuples from the table
    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])
    // transform (row key, row Result) tuples into an RDD of Results
    val resultRDD = hBaseRDD.map(tuple => tuple._2)
    // transform into an RDD of (RowKey, ColumnValue)s , with Time removed from row key
    val keyValueRDD = resultRDD.
              map(result => (Bytes.toString(result.getRow()).
              split(" ")(0), Bytes.toDouble(result.value)))
    // group by rowkey , get statistics for column value
    val keyStatsRDD = keyValueRDD.
             groupByKey().
             mapValues(list => StatCounter(list))
    // convert rowkey, stats to put and write to hbase table stats column family
    keyStatsRDD.map { case (k, v) => convertToPut(k, v) }.saveAsHadoopDataset(jobConfig)

下面的流程圖顯示newAPIHadoopRDD輸出,(row key,result)的鍵值對。PairRDDFunctions 的saveAsHadoopDataset方法把Put對象存入到Hbase。

圖10

運行Spark Streaming應用

運行Spark Streaming應用跟運行Spark應用相似,比較簡單,此處不贅述,參見Spark Streaming官方文檔

相關文章
相關標籤/搜索