本篇文章用Spark Streaming +Hbase爲列,Spark Streaming專爲流式數據處理,對Spark核心API進行了相應的擴展。html
首先,什麼是流式處理呢?數據流是一個數據持續不斷到達的無邊界序列集。流式處理是把接二連三的數據輸入分割成單元數據塊來處理。流式處理是一個低延遲的處理和流式數據分析。Spark Streaming對Spark核心API進行了相應的擴展,支持高吞吐、低延遲、可擴展的流式數據處理。實時數據處理應用的場景有下面幾個:java
物聯網(IOT)
圖1數據庫
Spark Streaming支持的數據源包括HDFS文件,TCP socket,Kafka,Flume,Twitter等,數據流能夠經過Spark核心API、DataFrame SQL或者機器學習API處理,並能夠持久化到本地文件、HDFS、數據庫或者其它任意支持Hadoop輸出格式的形式。apache
Spark Streaming以X秒(batch size)爲時間間隔把數據流分割成Dstream,組成一個RDD序列。你的Spark應用處理RDD,並把處理的結果批量返回。
圖2api
圖3網絡
Spark Streaming例子代碼分下面幾部分:
- 讀取流式數據;
- 處理流式數據;
- 寫處理結果倒Hbase表。架構
Spark處理部分的代碼涉及到以下內容:app
數據集來自油泵信號數據,以CSV格式存儲在指定目錄下。Spark Streaming監控此目錄,CSV文件的格式如圖3。
圖4機器學習
採用Scala的case class來定義數據表結構,parseSensor函數解析逗號分隔的數據。socket
流式處理的Hbase表結構以下:
列簇爲stats,包含列有最大值、最小值和平均值;
圖5
配置寫入Hbase表
Spark直接用TableOutputFormat類寫數據到Hbase裏,跟在MapReduce中寫數據到Hbase表同樣,下面就直接用TableOutputFormat類了。
Spark Streaming代碼
Spark Streaming的基本步驟:
建立 StreamingContext對象,StreamingContext是Spark Streaming處理的入口,這裏設置2秒的時間間隔。
val sparkConf = new SparkConf().setAppName("HBaseStream") // create a StreamingContext, the main entry point for all streaming functionality val ssc = new StreamingContext(sparkConf, Seconds(2))
接下來用StreamingContext的textFileStream(directory)建立輸入流跟蹤Hadoop文件系統的新文件,並處理此目錄下的全部文件,這裏directory指文件目錄。
// create a DStream that represents streaming data from a directory source val linesDStream = ssc.textFileStream("/user/user01/stream")
linesDStream是數據流,每條記錄是按行記錄的text格式。
圖6
接下來進行解析,對linesDStream進行map操做,map操做是對RDD應用Sensor.parseSensor函數,返回Sensor的RDD。
// parse each line of data in linesDStream into sensor objects val sensorDStream = linesDStream.map(Sensor.parseSensor)
圖7
對DStream的每一個RDD執行foreachRDD 方法,使用filter過濾Sensor中低psi值來建立報警,使用Hbase的Put對象轉換sensor和alter數據以便能寫入到Hbase。而後使用PairRDDFunctions的saveAsHadoopDataset方法將最終結果寫入到任何Hadoop兼容到存儲系統。
// for each RDD. performs function on each RDD in DStream sensorRDD.foreachRDD { rdd => // filter sensor data for low psi val alertRDD = rdd.filter(sensor => sensor.psi < 5.0) // convert sensor data to put object and write to HBase Table CF data rdd.map(Sensor.convertToPut).saveAsHadoopDataset(jobConfig) // convert alert to put object write to HBase Table CF alerts rdd.map(Sensor.convertToPutAlert).saveAsHadoopDataset(jobConfig) }
sensorRDD通過Put對象轉換,而後寫入到Hbase。
圖8
經過streamingContext.start()顯式的啓動數據接收,而後調用streamingContext.awaitTermination()來等待計算完成。
// Start the computation ssc.start() // Wait for the computation to terminate ssc.awaitTermination()
如今開始讀取Hbase的sensor表,計算每條的統計指標並把對應的數據寫入stats列簇。
圖9
下面的代碼讀取Hbase的sensor表psi列數據,用StatCounter計算統計數據,而後寫入stats列簇。
// configure HBase for reading val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, HBaseSensorStream.tableName) // scan data column family psi column conf.set(TableInputFormat.SCAN_COLUMNS, "data:psi") // Load an RDD of (row key, row Result) tuples from the table val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) // transform (row key, row Result) tuples into an RDD of Results val resultRDD = hBaseRDD.map(tuple => tuple._2) // transform into an RDD of (RowKey, ColumnValue)s , with Time removed from row key val keyValueRDD = resultRDD. map(result => (Bytes.toString(result.getRow()). split(" ")(0), Bytes.toDouble(result.value))) // group by rowkey , get statistics for column value val keyStatsRDD = keyValueRDD. groupByKey(). mapValues(list => StatCounter(list)) // convert rowkey, stats to put and write to hbase table stats column family keyStatsRDD.map { case (k, v) => convertToPut(k, v) }.saveAsHadoopDataset(jobConfig)
下面的流程圖顯示newAPIHadoopRDD輸出,(row key,result)的鍵值對。PairRDDFunctions 的saveAsHadoopDataset方法把Put對象存入到Hbase。
圖10
運行Spark Streaming應用跟運行Spark應用相似,比較簡單,此處不贅述,參見Spark Streaming官方文檔。