歡迎你們前往騰訊雲+社區,獲取更多騰訊海量技術實踐乾貨哦~git
本文將幫助您使用基於HBase的Apache Spark Streaming。Spark Streaming是Spark API核心的一個擴展,支持連續的數據流處理。github
首先,什麼是流(streaming)?數據流是連續到達的無窮序列。流處理將不斷流動的輸入數據分紅獨立的單元進行處理。流處理是對流數據的低延遲處理和分析。Spark Streaming是Spark API核心的擴展,可實現實時數據的快速擴展,高吞吐量,高容錯處理。Spark Streaming適用於大量數據的快速處理。實時處理用例包括:apache
Spark Streaming支持如HDFS目錄,TCP套接字,Kafka,Flume,Twitter等數據源。數據流能夠用Spark 的核心API,DataFrames SQL,或機器學習的API進行處理,而且能夠被保存到HDFS,databases或Hadoop OutputFormat提供的任何文件系統中去。編程
Spark Streaming將數據流每X秒分做一個集合,稱爲Dstreams,它在內部是一系列RDD。您的Spark應用程序使用Spark API處理RDD,而且批量返回RDD操做的結果。網絡
Spark Streaming示例代碼執行如下操做:app
其餘Spark示例代碼執行如下操做:機器學習
油泵傳感器數據文件放入目錄中(文件是以逗號爲分隔符的CSV)。Spark Streaming將監視目錄並處理在該目錄中建立的全部文件。(如前所述,Spark Streaming支持不一樣的流式數據源;爲簡單起見,此示例將使用CSV。)maven
如下是帶有一些示例數據的csv文件示例:函數
咱們使用Scala案例類來定義與傳感器數據csv文件相對應的傳感器模式,並使用parseSensor函數將逗號分隔值解析到傳感器案例類中。oop
流數據的HBase表格模式以下:
平常統計彙總的模式以下所示:
下面的函數將Sensor對象轉換爲HBase Put對象,該對象用於將數據行插入到HBase中。
您可使用Spark 的TableOutputFormat類寫入HBase表,這與您從MapReduce寫入HBase表的方式相似。下面咱們使用TableOutputFormat類設置HBase的配置。
這些是Spark Streaming代碼的基本步驟:
咱們將經過示例應用程序代碼完成這些步驟。
首先,咱們建立一個StreamingContext,這是流式傳輸的主要入口點(2秒間隔時間)。
val sparkConf = new SparkConf ( ) . setAppName ( "HBaseStream" ) // 建立 StreamingContext, 流式函數的主要入口 val ssc = new StreamingContext ( sparkConf , Seconds ( 2 ) )
接下來,咱們使用StreamingContext textFileStream(directory)方法建立一個輸入流,該輸入流監視Hadoop兼容的文件系統以獲取新文件,並處理在該目錄中建立的全部文件。
// 建立表明數據 DStream對象 val linesDStream = ssc . textFileStream ( "/user/user01/stream" )
linesDStream表明數據流,每一個記錄都是一行文本。內部DStream是一系列RDD,每一個批處理間隔一個RDD。
接下來,咱們將數據行解析爲Sensor對象,並使用DStream行上的map操做。
// 把lineDSream的每一行解析爲Sensor對象 val sensorDStream = linesDStream . map ( Sensor . parseSensor )
map操做在linesDStream中的RDD上使用Sensor.parseSensor函數,從而生成Sensor對象(RDD)。
接下來,咱們使用DStream foreachRDD方法將處理應用於此DStream中的每一個RDD。咱們過濾低psi傳感器對象以建立警報,而後咱們經過將傳感器和警報數據轉換爲Put對象並使用PairRDDFunctions saveAsHadoopDataset方法將傳感器和警報數據寫入HBase ,該方法使用Hadoop將RDD輸出到任何支持Hadoop的存儲系統,該存儲系統的配置對象(請參閱上面的HBase的Hadoop配置)。
// 對每個RDD. sensorRDD . foreachRDD { rdd => // 低psi的傳感器過濾器 val alertRDD = rdd . filter ( sensor => sensor . psi < 5.0 ) // 把傳感器數據轉爲對象並寫入HD rdd . map ( Sensor . convertToPut ) . saveAsHadoopDataset (jobConfig ) // 把警報轉爲對象並寫入HD rdd . map ( Sensor . convertToPutAlert ) . saveAsHadoopDataset (jobConfig ) }
sensorRDD對象被轉換並寫入HBase。
要開始接收數據,咱們必須在StreamingContext上顯式調用start(),而後調用awaitTermination來等待計算完成。
// 開始計算 ssc . start ( ) // 等待計算完成 ssc . awaitTermination ( )
如今咱們要讀取HBase傳感器表數據,計算每日摘要統計信息並將這些統計信息寫入。
如下代碼讀取HBase表,傳感器表,psi列數據,使用StatCounter計算此數據的統計數據,而後將統計數據寫入傳感器統計數據列。
// HBase的讀取設置 val conf = HBaseConfiguration . create ( ) conf . set ( TableInputFormat . INPUT_TABLE , HBaseSensorStream . tableName ) // 掃描數據 conf . set ( TableInputFormat . SCAN_COLUMNS , "data:psi" ) // 加載RDD (row key, row Result)元組 val hBaseRDD = sc . newAPIHadoopRDD ( conf , classOf [TableInputFormat ] , classOf [ org . apache . hadoop . hbase . io . ImmutableBytesWritable ] , classOf [ org . apache . hadoop . hbase . client . Result ] ) // 把(row key, row Result) 元組爲RDD val resultRDD = hBaseRDD.map(tuple => tuple._2) // 轉爲 RDD (RowKey, ColumnValue), 移除Time val keyValueRDD = resultRDD. map(result => (Bytes.toString(result.getRow()). split(" ")(0), Bytes.toDouble(result.value))) // 分組,獲得統計數據 val keyStatsRDD = keyValueRDD. groupByKey(). mapValues(list => StatCounter(list)) // 轉碼rowkey,統計信息放入並寫入hbase keyStatsRDD.map { case (k, v) => convertToPut(k, v)}.saveAsHadoopDataset(jobConfig)
下圖顯示newAPIHadoopRDD的輸出。PairRDDFunctions saveAsHadoopDataset將Put對象保存到HBase。
本教程將在MapR Sandbox上運行 ,其中包括Spark。
您能夠從這裏下載代碼和數據以運行這些例子:
代碼:https://github.com/caroljmcdo...
您能夠將代碼做爲獨立應用程序運行,如「MapR Sandbox上的Spark入門教程」中所述。
如下是總的步驟:
這就結束了關於使用HBase進行Spark Streaming的教程。您能夠在相關閱讀部分找到更多信息。
問答
如何使用MySQL和ApacheSPark?
相關閱讀
Spark Streaming編程指南
Spark Streaming應用與實戰全攻略
簡談Spark Streaming的實時計算整合
此文已由做者受權騰訊雲+社區發佈,原文連接:https://cloud.tencent.com/dev...