半小時,利用FEDB將你的Spark SQL模型變爲在線服務

SparkSQL在機器學習場景中應用

第四範式已經在不少行業落地了上萬個AI應用,好比在金融行業的反欺詐,媒體行業的新聞推薦,能源行業管道檢測,而SparkSQL在這些AI應用中快速實現特徵變換髮揮着重要的做用
半小時,利用FEDB將你的Spark SQL模型變爲在線服務
SparkSQL在特徵變換主要有一下幾類python

1. 多表場景,用於表之間拼接操做,好比交易信息表去拼接帳戶表
2. 使用udf進行簡單的特徵變換,好比對時間戳進行hour函數處理
3. 使用時間窗口和udaf進行時序類特徵處理,好比計算一我的最近1天的消費金額總和

SparkSQL到目前爲止,解決很好的解決離線模型訓練特徵變換問題,可是隨着AI應用的發展,你們對模型的指望再也不只是得出離線調研效果,而是在真實的業務場景發揮出價值,而真實的業務場景是模型應用場景,它須要高性能,須要實時推理,這時候咱們就會遇到如下問題git

1. 多表數據離線到在線怎麼映射,即批量訓練過程當中輸入不少表,到在線環境這些表該以什麼形式存在,這點也會影響整個系統架構,作得好可以提高效率,作得很差就會大大增長模型產生業務價值的成本
2. SQL轉換成實時執行成本高,由於在線推理須要高性能,而數據科學家可能作出成千上萬個特徵,每一個特徵都人肉轉換,會大大增長的工程成本
3. 離線特徵和在線特徵保持一致困難,手動轉換就會致使一致性能,並且每每很難一致
4. 離線效果很棒可是在線效果沒法知足業務需求

在具體的反欺詐場景,模型應用要求tp99 20ms去檢測一筆交易是不是欺詐,因此對模型應用性能要求很是高github

第四範式特徵工程數據庫是如何解決這些問題

半小時,利用FEDB將你的Spark SQL模型變爲在線服務
經過特徵工程數據庫讓SparkSQL的能力獲得了補充sql

  1. 以數據庫的形式,解決了離線表到在線的映射問題,咱們對前面給出的答案就是離線表是怎麼分佈的,在線也就怎麼分佈
  2. 經過同一套代碼去執行離線和在線特徵轉換,讓在線模型效果獲得了保證
  3. 數據科學家與業務開發團隊的合做以sql爲傳遞介質,而再也不是手工去轉換代碼,大大提高模型迭代效率
  4. 經過llvm加速的sql,相比scala實現的spark2.x和3.x在時序複雜特徵場景可以加速2~3倍,在線經過in-memory的存儲,可以保證sql可以在很是低延遲返回結果數據庫

    快速將spark sql 模型變成實時服務demo

    demo的模型訓練場景爲預測一次打車行程到結束所須要的時間,這裏咱們將使用fedb ,pyspark,lightgbm等工具最終搭建一個http 模型推理服務,這也會是spark在機器學習場景的實踐
    半小時,利用FEDB將你的Spark SQL模型變爲在線服務
    整個demo200多行代碼,製做時間不超過半個小時json

  5. train_sql.py 特徵計算與訓練, 80行代碼
  6. predict_server.py 模型推理http服務, 129行代碼

場景數據和特徵介紹
整個訓練數據以下樣子架構

樣例數據機器學習

id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration
id3097625,1,2016-01-22 16:01:00,2016-01-22 16:15:16,2,-73.97746276855469,40.7613525390625,-73.95573425292969,40.772396087646484,N,856
id3196697,1,2016-01-28 07:20:18,2016-01-28 07:40:16,1,-73.98524475097656,40.75959777832031,-73.99615478515625,40.72945785522461,N,1198
id0224515,2,2016-01-31 00:48:27,2016-01-31 00:53:30,1,-73.98342895507812,40.7500114440918,-73.97383880615234,40.74980163574219,N,303
id3370903,1,2016-01-14 11:46:43,2016-01-14 12:25:33,2,-74.00027465820312,40.74786376953125,-73.86485290527344,40.77039337158203,N,2330
id2763851,2,2016-02-20 13:21:00,2016-02-20 13:45:56,1,-73.95218658447266,40.772220611572266,-73.9920425415039,40.74932098388672,N,1496
id0904926,1,2016-02-20 19:17:44,2016-02-20 19:33:19,4,-73.97344207763672,40.75189971923828,-73.98480224609375,40.76243209838867,N,935
id2026293,1,2016-02-25 01:16:23,2016-02-25 01:31:27,1,-73.9871597290039,40.68777847290039,-73.9115219116211,40.68180847167969,N,904
id1349988,1,2016-01-28 20:16:05,2016-01-28 20:21:36,1,-74.0028076171875,40.7338752746582,-73.9968032836914,40.743770599365234,N,331
id3218692,2,2016-02-17 16:43:27,2016-02-17 16:54:41,5,-73.98147583007812,40.77408218383789,-73.97216796875,40.76400375366211,N,674

場景特徵變換sql腳本
特徵變換ide

select trip_duration, passenger_count,
sum(pickup_latitude) over w as vendor_sum_pl,
max(pickup_latitude) over w as vendor_max_pl,
min(pickup_latitude) over w as vendor_min_pl,
avg(pickup_latitude) over w as vendor_avg_pl,
sum(pickup_latitude) over w2 as pc_sum_pl,
max(pickup_latitude) over w2 as pc_max_pl,
min(pickup_latitude) over w2 as pc_min_pl,
avg(pickup_latitude) over w2 as pc_avg_pl ,
count(vendor_id) over w2 as pc_cnt,
count(vendor_id) over w as vendor_cnt
from {}
window w as (partition by vendor_id order by pickup_datetime ROWS_RANGE BETWEEN 1d PRECEDING AND CURRENT ROW),
w2 as (partition by passenger_count order by pickup_datetime ROWS_RANGE BETWEEN 1d PRECEDING AND CURRENT ROW)

咱們選擇了vendor_id 和 passenger_count 兩個緯度作時序特徵函數

train_df = spark.sql(train_sql)
# specify your configurations as a dict
params = {
    'boosting_type': 'gbdt',
    'objective': 'regression',
    'metric': {'l2', 'l1'},
    'num_leaves': 31,
    'learning_rate': 0.05,
    'feature_fraction': 0.9,
    'bagging_fraction': 0.8,
    'bagging_freq': 5,
    'verbose': 0
}
print('Starting training...')
gbm = lgb.train(params,
                lgb_train,
                num_boost_round=20,
                valid_sets=lgb_eval,
                early_stopping_rounds=5)
gbm.save_model('model.txt')

執行模型訓練過程,最終產生model.txt

模型推理過程

導入數據代碼
import

def insert_row(line):
    row = line.split(',')
    row[2] = '%dl'%int(datetime.datetime.strptime(row[2], '%Y-%m-%d %H:%M:%S').timestamp() * 1000)
    row[3] = '%dl'%int(datetime.datetime.strptime(row[3], '%Y-%m-%d %H:%M:%S').timestamp() * 1000)
    insert = "insert into t1 values('%s', %s, %s, %s, %s, %s, %s, %s, %s, '%s', %s);"% tuple(row)
    driver.executeInsert('db_test', insert)
with open('data/taxi_tour_table_train_simple.csv', 'r') as fd:
    idx = 0
    for line in fd:
        if idx == 0:
            idx = idx + 1
            continue
        insert_row(line.replace('\n', ''))
        idx = idx + 1

注:train.csv爲訓練數據csv格式版本

模型推理邏輯
predict.py

def post(self):
        row = json.loads(self.request.body)
        ok, req = fedb_driver.getRequestBuilder('db_test', sql)
        if not ok or not req:
            self.write("fail to get req")
            return
        input_schema = req.GetSchema()
        if not input_schema:
            self.write("no schema found")
            return
        str_length = 0
        for i in range(input_schema.GetColumnCnt()):
            if sql_router_sdk.DataTypeName(input_schema.GetColumnType(i)) == 'string':
                str_length = str_length + len(row.get(input_schema.GetColumnName(i), ''))
        req.Init(str_length)
        for i in range(input_schema.GetColumnCnt()):
            tname =  sql_router_sdk.DataTypeName(input_schema.GetColumnType(i))
            if tname == 'string':
                req.AppendString(row.get(input_schema.GetColumnName(i), ''))
            elif tname == 'int32':
                req.AppendInt32(int(row.get(input_schema.GetColumnName(i), 0)))
            elif tname == 'double':
                req.AppendDouble(float(row.get(input_schema.GetColumnName(i), 0)))
            elif tname == 'timestamp':
                req.AppendTimestamp(int(row.get(input_schema.GetColumnName(i), 0)))
            else:
                req.AppendNULL()
        if not req.Build():
            self.write("fail to build request")
            return

        ok, rs = fedb_driver.executeQuery('db_test', sql, req)
        if not ok:
            self.write("fail to execute sql")
            return
        rs.Next()
        ins = build_feature(rs)
        self.write("----------------ins---------------\n")
        self.write(str(ins) + "\n")
        duration = bst.predict(ins)
        self.write("---------------predict trip_duration -------------\n")
        self.write("%s s"%str(duration[0]))

最終執行效果

# 發送推理請求 ,會看到以下輸出
python3 predict.py
----------------ins---------------
[[ 2.       40.774097 40.774097 40.774097 40.774097 40.774097 40.774097
  40.774097 40.774097  1.        1.      ]]
---------------predict trip_duration -------------
859.3298781277192 s

運行demo

https://github.com/4paradigm/SparkSQLWithFeDB
相關文章
相關標籤/搜索