使用Spark Streaming SQL基於時間窗口進行數據統計

1.背景介紹

流式計算一個很常見的場景是基於事件時間進行處理,經常使用於檢測、監控、根據時間進行統計等系統中。好比埋點日誌中每條日誌記錄了埋點處操做的時間,或者業務系統中記錄了用戶操做時間,用於統計各類操做處理的頻率等,或者根據規則匹配,進行異常行爲檢測或監控系統告警。這樣的時間數據都會包含在事件數據中,須要提取時間字段並根據必定的時間範圍進行統計或者規則匹配等。
使用Spark Streaming SQL能夠很方便的對事件數據中的時間字段進行處理,同時Spark Streaming SQL提供的時間窗口函數能夠將事件時間按照必定的時間區間對數據進行統計操做。
本文經過講解一個統計用戶在過去5秒鐘內點擊網頁次數的案例,介紹如何使用Spark Streaming SQL對事件時間進行操做。html

2.時間窗語法說明

Spark Streaming SQL支持兩類窗口操做:滾動窗口(TUMBLING)和滑動窗口(HOPPING)。git

2.1滾動窗口github

滾動窗口(TUMBLING)根據每條數據的時間字段將數據分配到一個指定大小的窗口中進行操做,窗口以窗口大小爲步長進行滑動,窗口之間不會出現重疊。例如:若是指定了一個5分鐘大小的滾動窗口,數據會根據時間劃分到 [0:00 - 0:05)、 [0:05, 0:10)[0:10, 0:15)等窗口。sql

  • 語法
GROUP BY TUMBLING ( colName, windowDuration )
  • 示例

對inventory表的inv_data_time時間列進行窗口操做,統計inv_quantity_on_hand的均值;窗口大小爲1分鐘。apache

SELECT avg(inv_quantity_on_hand) qoh
FROM inventory
GROUP BY TUMBLING (inv_data_time, interval 1 minute)

2.2滑動窗口json

滑動窗口(HOPPING),也被稱做Sliding Window。不一樣於滾動窗口,滑動窗口能夠設置窗口滑動的步長,因此窗口能夠重疊。滑動窗口有兩個參數:windowDuration和slideDuration。slideDuration爲每次滑動的步長,windowDuration爲窗口的大小。當slideDuration < windowDuration時窗口會重疊,每一個元素會被分配到多個窗口中。
因此,滾動窗口實際上是滑動窗口的一種特殊狀況,即slideDuration = windowDuration則等同於滾動窗口。架構

  • 語法
GROUP BY HOPPING ( colName, windowDuration, slideDuration )
  • 示例

對inventory表的inv_data_time時間列進行窗口操做,統計inv_quantity_on_hand的均值;窗口爲1分鐘,滑動步長爲30秒。ide

SELECT avg(inv_quantity_on_hand) qoh
FROM inventory
GROUP BY HOPPING (inv_data_time, interval 1 minute, interval 30 second)

3.系統架構

業務日誌收集到Aliyun SLS後,Spark對接SLS,經過Streaming SQL對數據進行處理並將統計後的結果寫入HDFS中。後續的操做流程主要集中在Spark Streaming SQL接收SLS數據並寫入HDFS的部分,有關日誌的採集請參考日誌服務函數

4.操做流程

4.1環境準備oop

  • 建立E-MapReduce 3.21.0以上版本的Hadoop集羣。
  • 下載並編譯E-MapReduce-SDK包
git clone git@github.com:aliyun/aliyun-emapreduce-sdk.git
cd aliyun-emapreduce-sdk
git checkout -b master-2.x origin/master-2.x
mvn clean package -DskipTests

編譯完後, assembly/target目錄下會生成emr-datasources_shaded_${version}.jar,其中${version}爲sdk的版本。

4.2建立表

命令行啓動spark-sql客戶端

spark-sql --master yarn-client --num-executors 2 --executor-memory 2g --executor-cores 2 --jars emr-datasources_shaded_2.11-${version}.jar --driver-class-path emr-datasources_shaded_2.11-${version}.jar

建立SLS和HDFS表

spark-sql> CREATE DATABASE IF NOT EXISTS default;
spark-sql> USE default;

-- 數據源表
spark-sql> CREATE TABLE IF NOT EXISTS sls_user_log
USING loghub
OPTIONS (
sls.project = "${logProjectName}",
sls.store = "${logStoreName}",
access.key.id = "${accessKeyId}",
access.key.secret = "${accessKeySecret}",
endpoint = "${endpoint}");

--結果表
spark-sql> CREATE TABLE hdfs_user_click_count
USING org.apache.spark.sql.json
OPTIONS (path '${hdfsPath}');

4.3統計用戶點擊數

spark-sql>SET streaming.query.name=user_click_count; 
spark-sql>SET spark.sql.streaming.checkpointLocation.user_click_count=hdfs:///tmp/spark/sql/streaming/test/user_click_count; 
spark-sql>insert into hdfs_user_click_count 
select sum(cast(action_click as int)) as click, userId, window from sls_user_log 
where delay(__time__)<"1 minute" 
group by TUMBLING(__time__, interval 5 second), userId;

其中,內建函數delay()用來設置Streaming SQL中的watermark,後續會有專門的文章介紹Streaming SQL watermark的相關內容。

4.4查看結果

能夠看到,產生的結果會自動生成一個window列,包含窗口的起止時間信息。

5.結語

本文簡要介紹了流式處理中基於事件時間進行處理的場景,以及Spark Streaming SQL時間窗口的相關內容,並經過一個簡單案例介紹了時間窗口的使用。後續文章,我將介紹Spark Streaming SQL的更多內容。



本文做者:ligh-rain

閱讀原文

本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索