基於Spark SQL實現對HDFS操做的實時監控報警

做者:木艮,阿里雲E-MapReduce開發工程師javascript

1.前言

E-MapReduce計劃從EMR-3.18.1版本開始提供Spark Streaming SQL的預覽版功能。Spark Streaming SQL是在Spark Structured Streaming的基礎上作了進一步封裝,方便用戶使用SQL語言進行Spark流式分析開發。Spark Streaming SQL直接地透明地受惠於Spark SQL的優化帶來的性能提高,同時也遵循Spark Structured Streaming的語法約束,例如Spark Structured Streaming不支持多流聚合查詢,Spark Streaming SQL也就一樣不支持。關於EMR Spark Streaming SQL的使用入門,請參考:html

  • Spark Streaming SQL Baseline Testing,驗證和測試工具,能夠做爲開發入門使用。原文地址:https://github.com/aliyun/aliyun-emapreduce-sdk/tree/master-2.x/emr-baseline-testingjava




2.案例介紹

本文將使用Spark Streaming SQL實現對HDFS audilog的實時統計分析。數據流圖以下所示:python

一共分爲4步:nginx

  • 採集HDFS的audit-log到LogService中。git

  • Spark Streaming SQL從LogService消費數據並進行分析。github

  • 分析結果寫到到存儲系統中。sql

  • 下游系統從存儲系統中查詢和使用分析結果,典型的下游系統有報表或者監控報警。apache

本文專一於第二步,即如何使用Spark Streaming SQL分析LogService數據。json

3.案例實現

本次演示中,咱們將結果數據寫到Kafka中。

3.1 環境準備

在E-MapReduce中建立兩個集羣:Hadoop集羣和Kafka集羣。這裏就不詳細演示如何建立EMR集羣了,不熟悉的能夠參考幫助文檔https://help.aliyun.com/document_detail/28088.html。

3.2 建立數據表

  • 建立loghub數據源表

CREATE DATABASE IF NOT EXISTS default;USE default;DROP TABLE IF EXISTS loghub_hdfs_auditlog;
CREATE TABLE loghub_hdfs_auditlogUSING loghubOPTIONS (sls.project = "${SLS_PROJECT_NAME}",sls.store = "${SLS_STORE_NAME}",access.key.id = "${ACCESS_KEY_ID}",access.key.secret = "${ACCESS_KEY_SECRET}",endpoint = "${SLS_ENDPOINT}",zookeeper.connect.address = "${ZOOKEEPER_ADDRESS}");
  • 建立Kafka輸出表

CREATE DATABASE IF NOT EXISTS default;USE default;DROP TABLE IF EXISTS kafka_hdfs_auditlog_analysis;
CREATE TABLE kafka_hdfs_auditlog_analysisUSING kafkaOPTIONS (kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",subscribe = 'temp_hdfs_auditlog_analysis',output.mode = 'complete',kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}");

注意:

  1. 具體作查詢分析時,須要單首創建一個Kafka結果表。這是由於每一個查詢結果不同,輸出表的schema也就不同。

  2. 根據具體的查詢結果schema,咱們須要預先在Kafka Schema Registry中註冊好Kafka Topic Schema信息。

  • Schema定義示例:

{"namespace": "org.apache.spark.emr.baseline.testing", "type": "record", "name": "TempResult", "fields": [ {"name": "avg_ss_quantity", "type": ["double", "null"]}, {"name": "avg_ss_ext_sales_price", "type": [{"type": "bytes", "logicalType": "decimal", "precision": 11, "scale": 6}, "null"]}, {"name": "avg_ss_ext_wholesale_cost", "type": [{"type": "bytes", "logicalType": "decimal", "precision": 11, "scale": 6}, "null"]}, {"name": "sum_ss_ext_wholesale_cost", "type": [{"type": "bytes", "logicalType": "decimal", "precision": 17, "scale": 2}, "null"]} ]}

將schema定義保存到文件中,並使用腳本工具註冊schema到Kafka Schema Registry中。


python ./schema_register.py ${SCHEMA_REGISTRY_URL} ${TOPIC_NAME} ${SCHEMA_FILE}

3.3 統計5分鐘內各個操做次數

  • 註冊Kafka Topic Schema:

{"type":"record", "name":"TempResult", "namespace":"org.apache.spark.sql.streaming.test", "fields":[ {"name":"op","type":["string","null"]}, {"name":"count","type":"long"}, {"name":"window_time","type": {"type":"record", "name":"window_time", "namespace":"org.apache.spark.sql.streaming.test.window_time", "fields":[ {"name":"start","type":[{"type":"long","logicalType":"timestamp-millis"},"null"]}, {"name":"end","type":[{"type":"long","logicalType":"timestamp-millis"},"null"]} ] } } ]}


  • 建立一個結果表,命名爲:「kafka_hdfs_auditlog_op_count_in_5_mins」。

DROP TABLE IF EXISTS kafka_hdfs_auditlog_op_count_in_5_mins;
CREATE TABLE kafka_hdfs_auditlog_op_count_in_5_minsUSING kafkaOPTIONS (kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",subscribe = "kafka_hdfs_auditlog_op_count_in_5_mins",output.mode = "append",kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",kafka.schema.record.name = "TempResult",kafka.schema.record.namespace = "org.apache.spark.sql.streaming.test");
  • 查詢分析

SET streaming.query.name=hdfs_auditlog_op_count_in_5_mins;SET spark.sql.streaming.checkpointLocation.hdfs_auditlog_op_count_in_5_mins=/tmp/spark/sql/streaming/hdfs_auditlog_op_count_in_5_mins;
INSERT INTO kafka_hdfs_auditlog_op_count_in_5_minsSELECT cmd op, count(*) count, window window_timeFROM loghub_hdfs_auditlogGROUP BY TUMBLING(__time__, interval 5 minute), opHAVING delay(__time__) < '5 minutes';

注:

  1. 「streaming.query.name」能夠爲任意名字。必填配置。

  2. 「spark.sql.streaming.checkpointLocation.${streaming.query.name}」對「streaming.query.name」的job配置checkpoint路徑,能夠爲任意hdfs路徑。必填配置。

  • 查看結果

經過Kafka命令行,查看kafka_hdfs_auditlog_op_count_in_5_mins topic數據:
窗口: 1550493600000 ~ 1550493900000{"op":{"string":"create"},"count":47438,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}{"op":{"string":"delete"},"count":181197,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}{"op":{"string":"getfileinfo"},"count":265451,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}{"op":{"string":"listStatus"},"count":641205,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}{"op":{"string":"mkdirs"},"count":12171,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}{"op":{"string":"append"},"count":30981,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}{"op":{"string":"rename"},"count":169709,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}{"op":{"string":"setPermission"},"count":52164,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}{"op":{"string":"open"},"count":436877,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}

3.4 統計1分鐘內各個來源IP的open操做次數

  • 註冊Kafka Topic Schema:

{"type":"record", "name":"TempResult", "namespace":"org.apache.spark.sql.streaming.test", "fields":[ {"name":"ip","type":["string","null"]}, {"name":"count","type":"long"}, {"name":"window_time","type": {"type":"record", "name":"window_time", "namespace":"org.apache.spark.sql.streaming.test.window_time", "fields":[ {"name":"start","type":[{"type":"long","logicalType":"timestamp-millis"},"null"]}, {"name":"end","type":[{"type":"long","logicalType":"timestamp-millis"},"null"]} ] } } ]}
  • 建立一個結果表,命名爲

kafka_hdfs_auditlog_read_count_per_ip_in_1_mins

DROP TABLE IF EXISTS kafka_hdfs_auditlog_read_count_per_ip_in_1_mins;
CREATE TABLE kafka_hdfs_auditlog_read_count_per_ip_in_1_minsUSING kafkaOPTIONS (kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",subscribe = "kafka_hdfs_auditlog_read_count_per_ip_in_1_mins",output.mode = "append",kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",kafka.schema.record.name = "TempResult",kafka.schema.record.namespace = "org.apache.spark.sql.streaming.test");
  • 查詢分析

SET streaming.query.name=hdfs_auditlog_read_count_per_ip_in_1_mins;SET spark.sql.streaming.checkpointLocation.hdfs_auditlog_read_count_per_ip_in_1_mins=/tmp/spark/sql/streaming/hdfs_auditlog_read_count_per_ip_in_1_mins;
INSERT INTO kafka_hdfs_auditlog_read_count_per_ip_in_1_minsSELECT ip, count(*) count, window window_timeFROM loghub_hdfs_auditlogwhere cmd='open'GROUP BY TUMBLING(__time__, interval 1 minute), ipHAVING delay(__time__) < '1 minutes';
  • 查看結果

窗口: 1550540940000 ~ 1550541000000{"ip":{"string":"172.*.*.130"},"count":69090,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}{"ip":{"string":"172.*.*.1"},"count":69266,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}{"ip":{"string":"10.*.*.1"},"count":5129,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}{"ip":{"string":"172.*.*.52"},"count":70469,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}{"ip":{"string":"10.*.*.24"},"count":7206,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}{"ip":{"string":"172.*.*.48"},"count":136101,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}{"ip":{"string":"172.*.*.19"},"count":67226,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}{"ip":{"string":"172.*.*.82"},"count":141886,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}{"ip":{"string":"172.*.*.50"},"count":69165,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}{"ip":{"string":"172.*.*.54"},"count":151539,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
窗口: 1550541000000 ~ 1550541060000{"ip":{"string":"192.*.*.22"},"count":77776,"window_time":{"start":{"long":1550541000000},"end":{"long":1550541060000}}}{"ip":{"string":"10.*.*.111"},"count":9373,"window_time":{"start":{"long":1550541000000},"end":{"long":1550541060000}}}{"ip":{"string":"10.*.*.2"},"count":9329,"window_time":{"start":{"long":1550541000000},"end":{"long":1550541060000}}}
窗口: 1550541060000 ~ 1550541120000{"ip":{"string":"172.*.*.207"},"count":10481,"window_time":{"start":{"long":1550541060000},"end":{"long":1550541120000}}}{"ip":{"string":"192.*.*.138"},"count":28965,"window_time":{"start":{"long":1550541060000},"end":{"long":1550541120000}}}{"ip":{"string":"10.*.*.160"},"count":22015,"window_time":{"start":{"long":1550541060000},"end":{"long":1550541120000}}}{"ip":{"string":"172.*.*.234"},"count":32892,"window_time":{"start":{"long":1550541060000},"end":{"long":1550541120000}}}

3.5 統計1分鐘內QPS超過100的來源IP

  • 註冊Kafka Topic Schema:

{"type":"record", "name":"TempResult", "namespace":"org.apache.spark.sql.streaming.test", "fields":[ {"name":"ip","type":["string","null"]}, {"name":"qps","type":["double","null"]}, {"name":"window_time","type": {"type":"record", "name":"window_time", "namespace":"org.apache.spark.sql.streaming.test.window_time", "fields":[ {"name":"start","type":[{"type":"long","logicalType":"timestamp-millis"},"null"]}, {"name":"end","type":[{"type":"long","logicalType":"timestamp-millis"},"null"]} ] } } ]}
  • 建立一個結果表,命名爲:kafka_hdfs_auditlog_qps_gt_100_ip_in_1_mins

DROP TABLE IF EXISTS kafka_hdfs_auditlog_qps_gt_100_ip_in_1_mins;
CREATE TABLE kafka_hdfs_auditlog_qps_gt_100_ip_in_1_minsUSING kafkaOPTIONS (kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",subscribe = "kafka_hdfs_auditlog_qps_gt_100_ip_in_1_mins",output.mode = "append",kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",kafka.schema.record.name = "TempResult",kafka.schema.record.namespace = "org.apache.spark.sql.streaming.test");
  • 查詢分析

SET streaming.query.name=hdfs_auditlog_qps_gt_100_ip_in_1_mins;SET spark.sql.streaming.checkpointLocation.hdfs_auditlog_qps_gt_100_ip_in_1_mins=/tmp/spark/sql/streaming/hdfs_auditlog_qps_gt_100_ip_in_1_mins;
INSERT INTO kafka_hdfs_auditlog_qps_gt_100_ip_in_1_minsSELECT ip, qps, window window_timeFROM (SELECT ip, count(cmd)/60 qps, window FROM loghub_hdfs_auditlog GROUP BY TUMBLING(__time__, interval 1 minute), ip HAVING delay(__time__) < '1 minutes') tWHERE qps > 100;
  • 查看結果

窗口:1550543160000 ~ 1550543220000{"ip":{"string":"10.*.*.140"},"qps":{"double":328.6333333333333},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}{"ip":{"string":"172.*.*.13"},"qps":{"double":276.43333333333334},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}{"ip":{"string":"172.*.*.27"},"qps":{"double":228.23333333333332},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}{"ip":{"string":"192.*.*.170"},"qps":{"double":407.4},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}{"ip":{"string":"172.*.*.95"},"qps":{"double":233.6},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}{"ip":{"string":"172.*.*.8"},"qps":{"double":341.3833333333333},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}{"ip":{"string":"192.*.*.167"},"qps":{"double":357.73333333333335},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}{"ip":{"string":"10.*.*.142"},"qps":{"double":283.05},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}{"ip":{"string":"192.*.*.98"},"qps":{"double":371.46666666666664},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}{"ip":{"string":"192.*.*.209"},"qps":{"double":501.35},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}{"ip":{"string":"10.*.*.12"},"qps":{"double":276.8333333333333},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}{"ip":{"string":"10.*.*.98"},"qps":{"double":276.3833333333333},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}{"ip":{"string":"172.*.*.169"},"qps":{"double":231.86666666666667},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}{"ip":{"string":"10.*.*.166"},"qps":{"double":358.9},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
窗口:1550543220000 ~ 1550543280000{"ip":{"string":"172.*.*.41"},"qps":{"double":675.3},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}{"ip":{"string":"192.*.*.97"},"qps":{"double":364.45},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}{"ip":{"string":"172.*.*.17"},"qps":{"double":392.98333333333335},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}{"ip":{"string":"192.*.*.234"},"qps":{"double":361.3833333333333},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}{"ip":{"string":"192.*.*.72"},"qps":{"double":354.98333333333335},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}{"ip":{"string":"172.*.*.3"},"qps":{"double":513.2833333333333},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}{"ip":{"string":"10.*.*.1"},"qps":{"double":435.18333333333334},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}{"ip":{"string":"10.*.*.240"},"qps":{"double":458.45},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}{"ip":{"string":"10.*.*.85"},"qps":{"double":500.46666666666664},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}{"ip":{"string":"172.*.*.234"},"qps":{"double":635.1333333333333},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}{"ip":{"string":"172.*.*.107"},"qps":{"double":371.76666666666665},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}{"ip":{"string":"10.*.*.195"},"qps":{"double":357.46666666666664},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}{"ip":{"string":"10.*.*.31"},"qps":{"double":178.95},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}

4. 小結

本文簡要示例瞭如何在EMR上使用Spark Streaming SQL進行流式分析。須要注意的是,EMR Spark Streaming SQL處於預覽版階段,語法和功能還在不斷豐富完善中。

  • JIRA:https://issues.apache.org/jira/browse/SPARK-24630

       

  • Design doc:https://docs.google.com/document/d/19degwnIIcuMSELv6BQ_1VQI5AIVcvGeqOm5xE2-aRA0



本文分享自微信公衆號 - Apache Spark技術交流社區(E-MapReduce_Spark)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索