經過Spark SQL實時歸檔SLS數據

做者:木艮,阿里雲E-MapReduce開發工程師nginx

我在前一篇文章介紹過基於Spark SQL實現對HDFS操做的實時監控報警。今天,我再舉例說明一下如何使用Spark SQL進行流式應用的開發。本文主要分紅三部分:sql

  • 流式計算和SQLbootstrap

  • 簡要介紹Spark SQL流式開發語法服務器

  • 實時歸檔SLS數據到HDFS微信

1. 流式計算和SQL

數據的價值隨着時間逐漸下降。及時儘早的對數據進行處理提高了數據的價值,因此流式計算系統的應用也愈來愈普遍。目前經常使用的流式計算框架有Storm,Spark Streaming及Flink等,也有Kafka Streams這類基於Kafka的流式處理類庫。各類流式處理框架都有其各自的API,開發者不可避免的須要學習如何使用這些API。如何提供簡單而有效的開發工具,從而把更多的精力投放在業務處理中。因此,各個流式處理系統都逐漸支持SQL API做爲開發語言,讓使用者能夠像處理Table同樣處理Stream。例如KSQL支持使用SQL進行流式處理Kafka數據。Spark一樣提出來Structured Streaming做爲最新一代的流式處理系統,底層的處理引擎也是Spark SQL。不過在上層SQL API,缺乏Structured Streaming必要的功能,例如window,watermark等。EMR在Spark開源版本上進行了功能擴展,支持使用SQL API在Spark上進行完整的流式查詢開發。app

2. Spark SQL流式開發入門

這節將簡要介紹Spark SQL中關於流式開的概念和語法。框架

2.1 建表

當咱們須要對流式數據源進行讀寫操做時,須要首先建立一張表來表示這個數據源。定義表的語法以下:機器學習

CREATE TABLE tbName[(columnName dataType [,columnName dataType]*)]USING providerNameOPTIONS(propertyName=propertyValue[,propertyName=propertyValue]*);

以上語法中,針對特殊source,不要求必定指定表的列定義。當不指定列定義時,會自動識別數據源的schema信息。舉一個例子:ide

CREATE TABLE driver_behavior USING kafka OPTIONS (kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",subscribe = "${TOPIC_NAME}",output.mode = "${OUTPUT_MODE}",kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}");

當數據源是Kafka時,會根據Kafka Topic名去到Kafka Schema Registry中查找schema信息。固然,咱們也能夠指定列定義,例如:工具

CREATE TABLE driverbehavior(deviceId string, velocity double)USING kafka OPTIONS (kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",subscribe = "${TOPIC_NAME}",output.mode = "${OUTPUT_MODE}",kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}");

當指定列定義時,要求必須和Source中的字段定義是一致的。當執行完CREATE TABLE操做,表的定義會保存到Hive MetaStore中。

2.2 CTAS

咱們能夠將建立表和將查詢結果寫入到表的語句合併到一塊兒,那麼就是CREATE TABLE ... AS SELECT ...語法:

CREATE TABLE tbName[(columnName dataType [,columnName dataType]*)]USING providerNameOPTIONS(propertyName=propertyValue[,propertyName=propertyValue]*)ASqueryStatement;

舉一個例子(引用自這裏: q103):

CREATE TABLE kafka_temp_tableUSING kafkaOPTIONS (kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",subscribe = "${TOPIC_NAME}",output.mode = "${OUTPUT_MODE}",kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}") ASSELECT i_brand_id brand_id, i_brand brand, sum(ss_ext_sales_price) ext_priceFROM date_dim, kafka_store_sales, itemWHERE d_date_sk = ss_sold_date_sk AND ss_item_sk = i_item_sk AND i_manager_id = 28 AND d_moy = 11 AND d_year = 1999 AND delay(ss_data_time) < '2 minutes'GROUP BY TUMBLING(ss_data_time, interval 1 minute), i_brand, i_brand_id

當執行完操做,將建立出表並實際生成一個StreamQuery實例,將查詢結果寫入到結果表中。

2.3 DML

流式查詢SQL和離線SQL標準語法大部分是同樣,這邊主要介紹insert操做。流式查詢是不容許單獨進行SELECT操做,必須將SELECT的查詢結果寫入到表中。因此,須要在SELECT操做以前執行INSERT操做。

INSERT INTO tbName[(columnName[,columnName]*)]queryStatement;

以上語法爲一次流式查詢:這個語句將實際生成一個StreamQuery實例,將查詢結果寫入到結果表中。舉一個例子:

INSERT INTO kafka_temp_tableSELECT i_brand_id brand_id, i_brand brand, sum(ss_ext_sales_price) ext_priceFROM date_dim, kafka_store_sales, itemWHERE d_date_sk = ss_sold_date_sk AND ss_item_sk = i_item_sk AND i_manager_id = 28 AND d_moy = 11 AND d_year = 1999 AND delay(ss_data_time) < '2 minutes'GROUP BY TUMBLING(ss_data_time, interval 1 minute), i_brand, i_brand_id

2.4 window及watermark

限於篇幅,本文暫且不介紹Spark SQL中如何使用window和watermak,有興趣的能夠先看看資料,後續會專門撰文介紹。

2.5 流式做業配置

使用SQL進行流式做業開發時,有些必要的配置沒法在Query表達出來,須要單獨進行設置。這裏咱們使用SET操做進行流式做業必要參數配置,當前有兩個參數須要設置:

name config
流式查詢實例名稱 streaming.query.name
流式做業Checkpoint地址 spark.sql.streaming.checkpointLocation.${streaming.query.name}

每個流式查詢實例前都須要進行配置,也就是說,當使用CTAS或者Insert操做時,必須前置這兩個配置。一個SQL文件支持多個流式查詢,例如:

-- test.sql
SET streaming.query.name=query1;SET spark.sql.streaming.checkpointLocation.query1=/tmp/spark/query1INSERT INTO tbName1 [(columnName[,columnName]*)]queryStatement1;
SET streaming.query.name=query2;SET spark.sql.streaming.checkpointLocation.query2=/tmp/spark/query2INSERT INTO tbName2 [(columnName[,columnName]*)]queryStatement2;

3. SLS數據實時歸檔實戰

假定一個場景,如今經過SLS收集了業務服務器上的日誌,須要歸檔到HDFS中,便於後續進行離線分析。這裏涉及到兩個數據源:SLS和HDFS。HDFS是Spark官方支持的數據源,支持流和批的讀寫。SLS是阿里雲的服務,EMR已經支持了流式讀寫。

  • 環境準備
    須要E-MapReduce 3.21.0以上版本集羣環境,當前正在發佈準備中,很快和你們見面,敬請期待。

  • 命令行

spark-sql --master yarn-client --conf spark.sql.streaming.datasource.provider=loghub --jars emr-logservice_shaded_2.11-1.7.0-SNAPSHOT.jar

注:emr-logservice_shaded_2.11-1.7.0-SNAPSHOT.jar將會在EMR SDK 1.7.0版本發佈出來。

  • 分別建立兩張表:sls_service_log和hdfs_service_log


CREATE DATABASE IF NOT EXISTS default;USE default;
DROP TABLE IF EXISTS hdfs_service_log;CREATE TABLE hdfs_service_log (instance_name string, ip string, content string)USING PARQUETLOCATION '/tmp/hdfs_service_log';
DROP TABLE IF EXISTS sls_service_log;CREATE TABLE sls_service_logUSING loghubOPTIONS (sls.project = "${logProjectName}",sls.store = "${logStoreName}",access.key.id = "${accessKeyId}",access.key.secret = "${accessKeySecret}",endpoint = "${endpoint}");
  • 經過Spark SQL啓動一個Stream Query將SLS數據實時同步到HDFS中

set streaming.query.name=sync_sls_to_hdfs;set spark.sql.streaming.checkpointLocation.sync_sls_to_hdfs=hdfs:///tmp/spark/sync_sls_to_hdfs;
INSERT INTO hdfs_service_logselect__tag__hostname__ as instance_name,ip,contentfrom sls_service_log;
  • 查看HDFS數據歸檔狀況


  • 使用Spark SQL對歸檔的數據進行離線分析:例如統計一共有多少個IP

select distinct(ip) from hdfs_service_log;

4. 結語

以上,咱們介紹了Spark SQL在流式處理中的一個很是簡單的例子。其實,咱們還可使用Spark SQL進行更加複雜的流式處理任務。後續文章,我將介紹窗口操做,watermark等概念,以及如何在流式數據上進行簡單的機器學習運算。


本文分享自微信公衆號 - Apache Spark技術交流社區(E-MapReduce_Spark)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索