使用Spark SQL進行流式機器學習計算(上)

做者:餘根茂,阿里巴巴計算平臺事業部EMR團隊的技術專家,參與了Hadoop,Spark,Kafka等開源項目的研發工做。目前主要專一於EMR流式計算產品的研發工做。前端

今天來和你們聊一下如何使用Spark SQL進行流式數據的機器學習處理。本文主要分爲如下幾個章節:mysql

  • 什麼是流式機器學習git

  • 機器學習模型獲取途徑github

  • 系統演示算法

1. 什麼是流式機器學習

一般,當咱們聽到有人提到實時數據機器學習時,其實他們是討論:sql

  • 他們但願有一個模型,這個模型利用最近歷史信息來進行預測分析。舉一個天氣的例子,若是最近幾天都是晴天,那麼將來幾天極小機率會出現雨雪和低溫天氣apache

  • 這個模型還須要是可更新的。當數據流經系統時,模型是能夠隨之進化升級。舉個例子,隨着業務規模的擴大,咱們但願零售銷售模型仍然保持準確。微信

第一個例子咱們能夠將它歸爲時序預測。第二個例子中,模型須要更新或者從新訓練,這是一個non-stationarity問題。時序預測和non-stationarity數據分佈是兩類不一樣的問題。本文主要關注第二類問題,對於這類問題,通常的解決方案主要有:網絡

  • 增量式算法:有一些算法支持經過數據逐步學習。也就是說,每次進來一些新的數據時,模型會被更新。SVM,神經網絡等算法都有增量式版本,此外貝葉斯網絡也能夠用做增量學習。架構

  • 週期從新學習:一個更加直接的方法就是用一批最新數據從新訓練咱們的模型。這種方法能夠用到的絕大多數的算法上。

2. 機器學習模型獲取途徑

實時機器學習應用分紅兩塊,一部分是模型實時訓練,另外一部分是數據實時預測分析。現實中,咱們可能無法實現模型的實時訓練,只能退而求其次地使用已經訓練好模型。這些模型可能會週期性地使用歷史數據訓練更新一次。因此,咱們能夠根據實際的算法和模型時效性要求,來選擇實時訓練模型仍是使用預訓練好的模型。

  • 模型算法支持增量訓練:能夠選擇用流式數據實時訓練更新

  • 模型算法不自持增量訓練:能夠選擇用離線數據預先訓練好模式

回到主題上,咱們要實現使用Spark SQL進行流式機器學習。前面幾篇文章已經簡單介紹了EMR如何使用Spark SQL進行流式ETL處理。既然要進行機器學習,咱們很天然地想到Spark MLlib。DataBricks有篇文檔介紹了在Spark Structured Streaming進行機器學習,你們有興趣的能夠看下。若是想將Spark MLlib應用到Spark SQL上,咱們能夠簡單地將MLlib算法包裝成UDF使用。另一個模型獲取途徑是利用阿里雲上的一些在線機器學習服務,咱們能夠將在線機器學習服務使用UDF封裝後使用。

  • 使用UDF封裝現有的Spark MLlib算法

  • 使用UDF封裝阿里雲在線機器學習服務

限於篇幅,我會分兩篇文章分別介紹這兩個方式,本文將簡單介紹如何利用Spark MLlib進行流式機器學習。

3. 系統演示

本節,咱們將演示一下如何利用邏輯迴歸算法進行演示。

3.1 系統架構

下面這張圖展現了整個實時監測系統的架構,前端接LogService數據,實時監測分析結果寫入到RDS,最後經過DataV展現出來。

3.2 測試數據集

測試數據集使用Spark自帶的sample_libsvm_data.txt,咱們要作的是寫一個數據生成器,將數據集的數據不斷地向SLS中發送,模擬流式數據。

算法模型準備

Spark MLlib提供了大量的機器學習算法實現,能夠方便的再RDD或者DataFrame API上使用,可是沒法直接用在SQL API上,因此咱們須要使用UDF來封裝一下。這裏,咱們選用邏輯迴歸算法,具體的實現就不細說了,能夠參考這裏的代碼:LogisticRegressionUDF.scala,地址:https://github.com/aliyun/aliyun-emapreduce-sdk/blob/master-2.x/emr-sql/src/main/scala/org/apache/spark/sql/aliyun/udfs/ml/LogisticRegressionUDF.scala


3.4 部署測試

  • CLI

## emr datasources包尚未發佈,須要手動編譯出來git clone git@github.com:aliyun/aliyun-emapreduce-sdk.gitcd aliyun-emapreduce-sdkgit checkout -b master-2.x origin/master-2.xmvn clean package -DskipTests
## 編譯完後, assembly/target目錄下會生成emr-datasources_shaded_2.11-1.7.0-SNAPSHOT.jar
spark-sql --master yarn-client --num-executors 2 --executor-memory 2g --executor-cores 2 --jars emr-datasources_shaded_2.11-1.7.0-SNAPSHOT.jar --driver-class-path emr-datasources_shaded_2.11-1.7.0-SNAPSHOT.jar
  • 建表

spark-sql> CREATE DATABASE IF NOT EXISTS default;spark-sql> USE default;
-- 測試數據源spark-sql> CREATE TABLE IF NOT EXISTS sls_datasetUSING loghubOPTIONS (sls.project = "${logProjectName}",sls.store = "${logStoreName}",access.key.id = "${accessKeyId}",access.key.secret = "${accessKeySecret}",endpoint = "${endpoint}");
spark-sql> DESC sls_dataset__logProject__ string NULL__logStore__ string NULL__shard__ int NULL__time__ timestamp NULL__topic__ string NULL__source__ string NULLlabel string NULLfeatures string NULL__tag__hostname__ string NULL__tag__path__ string NULL__tag__receive_time__ string NULLTime taken: 0.058 seconds, Fetched 11 row(s)
-- 結果數據源spark-sql> CREATE TABLE IF NOT EXISTS rds_resultUSING jdbc2OPTIONS (url="${rdsUrl}",driver="com.mysql.jdbc.Driver",dbtable="${rdsTableName}",user="${user}",password="${password}",batchsize="100",isolationLevel="NONE");
spark-sql> DESC rds_result;acc double NULLlabel double NULLtime string NULLTime taken: 0.457 seconds, Fetched 3 row(s)
  • 註冊UDF

-- udf_jar_path: 編譯完後, emr-sql/target目錄下會生成emr-sql_2.11-1.7.0-SNAPSHOT.jar,使用之。
CREATE FUNCTION Logistic_Regression AS 'org.apache.spark.sql.aliyun.udfs.ml.LogisticRegressionUDF' USING JAR '${udf_jar_path}';
  • 提交執行

SET streaming.query.name=lr_prediction;SET spark.sql.streaming.checkpointLocation.lr_prediction=hdfs:///tmp/spark/lr_prediction;SET spark.sql.streaming.query.outputMode.lr_prediction=update;-- 因爲DataSource是基於JDBC實現的,因此咱們須要設置向RDS表插入數據的SQL-- 這裏個人RDS表名是`result`SET streaming.query.lr_prediction.sql=insert into `result`(`time`, `label`, `acc`) values(?, ?, ?);
INSERT INTO rds_result SELECT window.start, label, sum(if(tb.predict = tb.label, 1, 0)) / count(tb.label) as acc FROM(SELECT default.Logistic_Regression("${LR_model_path}", concat_ws(" ", label, features)) as predict, label, __time__ as time FROM sls_dataset) tb GROUP BY TUMBLING(tb.time, interval 10 second), tb.label;

3.5 效果展現

在DataV中配置上面的RDS結果表,使用折線圖查看label=1的預測準確率,以下:

4. 小結

本文簡要介紹了流式機器學習面臨的幾個問題,以及相應的解決方法。並使用Spark SQL結合Spark MLlib演示了一個流式機器學習的案例。下一篇,我會簡要介紹Spark SQL如何結合阿里雲的在線機器學習服務來進行流式機器學習應用開發。


本文分享自微信公衆號 - Apache Spark技術交流社區(E-MapReduce_Spark)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索