第四範式已經在不少行業落地了上萬個AI應用,好比在金融行業的反欺詐,媒體行業的新聞推薦,能源行業管道檢測,而SparkSQL在這些AI應用中快速實現特徵變換髮揮着重要的做用
SparkSQL在特徵變換主要有一下幾類python
1. 多表場景,用於表之間拼接操做,好比交易信息表去拼接帳戶表 2. 使用udf進行簡單的特徵變換,好比對時間戳進行hour函數處理 3. 使用時間窗口和udaf進行時序類特徵處理,好比計算一我的最近1天的消費金額總和
SparkSQL到目前爲止,解決很好的解決離線模型訓練特徵變換問題,可是隨着AI應用的發展,你們對模型的指望再也不只是得出離線調研效果,而是在真實的業務場景發揮出價值,而真實的業務場景是模型應用場景,它須要高性能,須要實時推理,這時候咱們就會遇到如下問題git
1. 多表數據離線到在線怎麼映射,即批量訓練過程當中輸入不少表,到在線環境這些表該以什麼形式存在,這點也會影響整個系統架構,作得好可以提高效率,作得很差就會大大增長模型產生業務價值的成本 2. SQL轉換成實時執行成本高,由於在線推理須要高性能,而數據科學家可能作出成千上萬個特徵,每一個特徵都人肉轉換,會大大增長的工程成本 3. 離線特徵和在線特徵保持一致困難,手動轉換就會致使一致性能,並且每每很難一致 4. 離線效果很棒可是在線效果沒法知足業務需求
在具體的反欺詐場景,模型應用要求tp99 20ms去檢測一筆交易是不是欺詐,因此對模型應用性能要求很是高github
經過特徵工程數據庫讓SparkSQL的能力獲得了補充sql
經過llvm加速的sql,相比scala實現的spark2.x和3.x在時序複雜特徵場景可以加速2~3倍,在線經過in-memory的存儲,可以保證sql可以在很是低延遲返回結果數據庫
demo的模型訓練場景爲預測一次打車行程到結束所須要的時間,這裏咱們將使用fedb ,pyspark,lightgbm等工具最終搭建一個http 模型推理服務,這也會是spark在機器學習場景的實踐
整個demo200多行代碼,製做時間不超過半個小時json
場景數據和特徵介紹
整個訓練數據以下樣子架構
樣例數據機器學習
id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration id3097625,1,2016-01-22 16:01:00,2016-01-22 16:15:16,2,-73.97746276855469,40.7613525390625,-73.95573425292969,40.772396087646484,N,856 id3196697,1,2016-01-28 07:20:18,2016-01-28 07:40:16,1,-73.98524475097656,40.75959777832031,-73.99615478515625,40.72945785522461,N,1198 id0224515,2,2016-01-31 00:48:27,2016-01-31 00:53:30,1,-73.98342895507812,40.7500114440918,-73.97383880615234,40.74980163574219,N,303 id3370903,1,2016-01-14 11:46:43,2016-01-14 12:25:33,2,-74.00027465820312,40.74786376953125,-73.86485290527344,40.77039337158203,N,2330 id2763851,2,2016-02-20 13:21:00,2016-02-20 13:45:56,1,-73.95218658447266,40.772220611572266,-73.9920425415039,40.74932098388672,N,1496 id0904926,1,2016-02-20 19:17:44,2016-02-20 19:33:19,4,-73.97344207763672,40.75189971923828,-73.98480224609375,40.76243209838867,N,935 id2026293,1,2016-02-25 01:16:23,2016-02-25 01:31:27,1,-73.9871597290039,40.68777847290039,-73.9115219116211,40.68180847167969,N,904 id1349988,1,2016-01-28 20:16:05,2016-01-28 20:21:36,1,-74.0028076171875,40.7338752746582,-73.9968032836914,40.743770599365234,N,331 id3218692,2,2016-02-17 16:43:27,2016-02-17 16:54:41,5,-73.98147583007812,40.77408218383789,-73.97216796875,40.76400375366211,N,674
場景特徵變換sql腳本
特徵變換ide
select trip_duration, passenger_count, sum(pickup_latitude) over w as vendor_sum_pl, max(pickup_latitude) over w as vendor_max_pl, min(pickup_latitude) over w as vendor_min_pl, avg(pickup_latitude) over w as vendor_avg_pl, sum(pickup_latitude) over w2 as pc_sum_pl, max(pickup_latitude) over w2 as pc_max_pl, min(pickup_latitude) over w2 as pc_min_pl, avg(pickup_latitude) over w2 as pc_avg_pl , count(vendor_id) over w2 as pc_cnt, count(vendor_id) over w as vendor_cnt from {} window w as (partition by vendor_id order by pickup_datetime ROWS_RANGE BETWEEN 1d PRECEDING AND CURRENT ROW), w2 as (partition by passenger_count order by pickup_datetime ROWS_RANGE BETWEEN 1d PRECEDING AND CURRENT ROW)
咱們選擇了vendor_id 和 passenger_count 兩個緯度作時序特徵函數
train_df = spark.sql(train_sql) # specify your configurations as a dict params = { 'boosting_type': 'gbdt', 'objective': 'regression', 'metric': {'l2', 'l1'}, 'num_leaves': 31, 'learning_rate': 0.05, 'feature_fraction': 0.9, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'verbose': 0 } print('Starting training...') gbm = lgb.train(params, lgb_train, num_boost_round=20, valid_sets=lgb_eval, early_stopping_rounds=5) gbm.save_model('model.txt')
執行模型訓練過程,最終產生model.txt
導入數據代碼
import
def insert_row(line): row = line.split(',') row[2] = '%dl'%int(datetime.datetime.strptime(row[2], '%Y-%m-%d %H:%M:%S').timestamp() * 1000) row[3] = '%dl'%int(datetime.datetime.strptime(row[3], '%Y-%m-%d %H:%M:%S').timestamp() * 1000) insert = "insert into t1 values('%s', %s, %s, %s, %s, %s, %s, %s, %s, '%s', %s);"% tuple(row) driver.executeInsert('db_test', insert) with open('data/taxi_tour_table_train_simple.csv', 'r') as fd: idx = 0 for line in fd: if idx == 0: idx = idx + 1 continue insert_row(line.replace('\n', '')) idx = idx + 1
注:train.csv爲訓練數據csv格式版本
模型推理邏輯
predict.py
def post(self): row = json.loads(self.request.body) ok, req = fedb_driver.getRequestBuilder('db_test', sql) if not ok or not req: self.write("fail to get req") return input_schema = req.GetSchema() if not input_schema: self.write("no schema found") return str_length = 0 for i in range(input_schema.GetColumnCnt()): if sql_router_sdk.DataTypeName(input_schema.GetColumnType(i)) == 'string': str_length = str_length + len(row.get(input_schema.GetColumnName(i), '')) req.Init(str_length) for i in range(input_schema.GetColumnCnt()): tname = sql_router_sdk.DataTypeName(input_schema.GetColumnType(i)) if tname == 'string': req.AppendString(row.get(input_schema.GetColumnName(i), '')) elif tname == 'int32': req.AppendInt32(int(row.get(input_schema.GetColumnName(i), 0))) elif tname == 'double': req.AppendDouble(float(row.get(input_schema.GetColumnName(i), 0))) elif tname == 'timestamp': req.AppendTimestamp(int(row.get(input_schema.GetColumnName(i), 0))) else: req.AppendNULL() if not req.Build(): self.write("fail to build request") return ok, rs = fedb_driver.executeQuery('db_test', sql, req) if not ok: self.write("fail to execute sql") return rs.Next() ins = build_feature(rs) self.write("----------------ins---------------\n") self.write(str(ins) + "\n") duration = bst.predict(ins) self.write("---------------predict trip_duration -------------\n") self.write("%s s"%str(duration[0]))
# 發送推理請求 ,會看到以下輸出 python3 predict.py ----------------ins--------------- [[ 2. 40.774097 40.774097 40.774097 40.774097 40.774097 40.774097 40.774097 40.774097 1. 1. ]] ---------------predict trip_duration ------------- 859.3298781277192 s
運行demo
https://github.com/4paradigm/SparkSQLWithFeDB