機器學習(一):模型訓練及線上部署相關筆記

1.前言

算法工程師不只要搭建模型,還要對模型進行優化及相關線上部署。這裏面涉及到不少方面:特徵處理(獨熱編碼、歸一化)、自定義損失函數、自定義評價函數、超參調節、構建pipeline流程線上部署(100ms返回要求)等。html

 

2.跑模型前準備

2.1 獨熱編碼

對於LR模型,進行獨熱編碼(類別型)和歸一化(數值型)是頗有必要的。好比某個類別型特徵的枚舉值是0到9(類別型再輸入到模型前都會作label enconding,好比有三個枚舉值:A、B、C,經過label enconding都變成0、一、2),某個數值型的分佈範圍是0-1000,那麼輸入到LR中,模型會認爲這兩個特徵是同一類,就會給數值型特徵一個相對小的權重(好比0.001),而給數值型特徵一個相對大的權重,但這顯然不是咱們想要的。所以須要把類別型和數值型特徵都轉換成0-1之間的數字。java

  • 對類別型特徵進行獨熱編碼前,須要對該字段進行label enconding。
    import pandas as pd df_raw=pd.DataFrame(['A','B','C','A'],columns=['col_raw']) #對類別型進行轉換
    from sklearn import preprocessing lbl = preprocessing.LabelEncoder() col='col_raw' df_raw['col_lab'] = lbl.fit_transform(df_raw[col].astype(str)) #保存label轉換映射關係
    import pickle save_lbl_path='./onehot_model/'+col+'.pkl' output = open(save_lbl_path, 'wb') pickle.dump(lbl, output) output.close() #讀取和轉換
    pkl_path='./onehot_model/'+col+'.pkl' pkl_file = open(pkl_path, 'rb') le_departure = pickle.load(pkl_file) df_raw['col_t_raw'] = le_departure.inverse_transform(df_raw['col_lab'])
    df_raw

        

  • 對label enconding後的列進行獨熱編碼 
    df_cate_tmp=pd.get_dummies(df_raw['col_lab'],prefix='col_lab') df_raw=pd.concat([df_raw, df_cate_tmp],axis=1) df_raw

2.2 歸一化和標準化

歸一化是把數值型特徵映射到0-1值域區間,而標準化是把數值分佈變成均值爲0方差爲1標準正態分佈。python

  • 歸一化
    nor_model=[] col_num='num_first'
    #賦值數值型
    df_raw[col_num]=[0.2,4,22,8] tmp_min=df_raw[col_num].min() tmp_max=df_raw[col_num].max() nor_model.append((col_num,tmp_min,tmp_max)) #最大-最小歸一化法
    df_raw[col+'_nor']=df_raw[col_num].apply(lambda x:(x-tmp_min)/(tmp_max-tmp_min)) #保存對應的列名及最大值、最小值
    with open("./nor_model/col_min_max.txt","w") as f: for i in nor_model: result=i[0]+','+str(i[1])+','+str(i[2])+'\n' f.write(str(result)) 

 2.3 自定義損失函數

在不一樣的業務場景中,python包提供的損失函數知足不了咱們的項目要求,這個時候就須要對自定義損失函數。(參考:https://cloud.tencent.com/developer/article/1357671)git

  • 自定義一個MSE,使得它對正殘差的懲罰是負殘差的10倍

         

 正如定義的那樣,非對稱MSE很好,由於它很容易計算梯度和hessian,以下圖所示。注意,hessian在兩個不一樣的值上是常量,左邊是2,右邊是20,儘管在下面的圖中很難看到這一點。github

          

LightGBM提供了一種直接實現定製訓練和驗證損失的方法。其餘的梯度提高包,包括XGBoost和Catboost,也提供了這個選項。這裏是一個Jupyter筆記本,展現瞭如何實現自定義培訓和驗證損失函數。細節在筆記本上,但在高層次上,實現略有不一樣。算法

一、訓練損失:在LightGBM中定製訓練損失須要定義一個包含兩個梯度數組的函數,目標和它們的預測。反過來,該函數應該返回梯度的兩個梯度和每一個觀測值的hessian數組。如上所述,咱們須要使用微積分來派生gradient和hessian,而後在Python中實現它。docker

二、驗證丟失:在LightGBM中定製驗證丟失須要定義一個函數,該函數接受相同的兩個數組,但返回三個值: 要打印的名稱爲metric的字符串、損失自己以及關因而否更高更好的布爾值。flask

#---sklearn api
def custom_asymmetric_train(y_true, y_pred): residual = (y_true - y_pred).astype("float") grad = np.where(residual<0, -2*10.0*residual, -2*residual) hess = np.where(residual<0, 2*10.0, 2.0) return grad, hess def custom_asymmetric_valid(y_true, y_pred): residual = (y_true - y_pred).astype("float") loss = np.where(residual < 0, (residual**2)*10.0, residual**2) return "custom_asymmetric_eval", np.mean(loss), False #---lgb api

def custom_asymmetric_train(preds,dtrain): y_true=np.array(dtrain.get_label()) y_pred=np.argmax(preds.reshape(len(y_true),-1), axis=0) residual = np.array((y_pred-y_true)).astype("float") p=20#參數
    tmpGrad=[] tmpHess=[] for i in residual: if i<0: tmpGrad.append(-i*p) tmpHess.append(p) elif (i>=0 and i<=12): tmpGrad.append(i*(p/10)) tmpHess.append(p/10) else: tmpGrad.append(i*p) tmpHess.append(p) grad=np.array(tmpGrad) hess=np.array(tmpHess) return grad, hess def custom_asymmetric_valid(preds,dtrain): p=20#參數
    y_true=np.array(dtrain.get_label()) y_pred=np.argmax(preds.reshape(len(y_true),-1), axis=0) residual = np.array((y_pred-y_true)).astype("float") tmpLoss=[] for i in residual: if i<0: tmpLoss.append(-i*p) elif (i>=0 and i<=12): tmpLoss.append(i*(p/10)) else: tmpLoss.append(i*p) loss=np.array(tmpLoss) return "custom_asymmetric_eval", np.mean(loss), False
相應的調用代碼(這裏須要區分lgb和sklearn的lgb,兩個模塊的y_red、y是不一樣的格式):
import lightgbm ********* Sklearn API **********
# default lightgbm model with sklearn api
gbm = lightgbm.LGBMRegressor() # updating objective function to custom # default is "regression" # also adding metrics to check different scores
gbm.set_params(**{'objective': custom_asymmetric_train}, metrics = ["mse", 'mae']) # fitting model 
gbm.fit( X_train, y_train, eval_set=[(X_valid, y_valid)], eval_metric=custom_asymmetric_valid, verbose=False, ) y_pred = gbm.predict(X_valid) ********* Python API **********
# create dataset for lightgbm # if you want to re-use data, remember to set free_raw_data=False
lgb_train = lgb.Dataset(X_train, y_train, free_raw_data=False) lgb_eval = lgb.Dataset(X_valid, y_valid, reference=lgb_train, free_raw_data=False) # specify your configurations as a dict
params = { 'objective': 'regression', 'verbose': 0 } gbm = lgb.train(params, lgb_train, num_boost_round=10, init_model=gbm, fobj=custom_asymmetric_train, feval=custom_asymmetric_valid, valid_sets=lgb_eval) y_pred = gbm.predict(X_valid)

 

2.4 自定義scoring

在評估模型預測結果時,若是python庫中沒有對應的指標(好比k-s值、GINI純度等),就須要自定義scoring。---此處是針對sklearn的metrics。api

  • sklearn中自帶的評估函數:
from sklearn.metrics import * SCORERS.keys() #結果
dict_keys(['explained_variance', 'r2', 'neg_median_absolute_error', 'neg_mean_absolute_error', 'neg_mean_squared_error', 'neg_mean_squared_log_error', 'accuracy', 'roc_auc', 'balanced_accuracy', 'average_precision', 'neg_log_loss', 'brier_score_loss', 'adjusted_rand_score', 'homogeneity_score', 'completeness_score', 'v_measure_score', 'mutual_info_score', 'adjusted_mutual_info_score', 'normalized_mutual_info_score', 'fowlkes_mallows_score', 'precision', 'precision_macro', 'precision_micro', 'precision_samples', 'precision_weighted', 'recall', 'recall_macro', 'recall_micro', 'recall_samples', 'recall_weighted', 'f1', 'f1_macro', 'f1_micro', 'f1_samples', 'f1_weighted'])
  • 自定義scoring
import numpy as np def ginic(actual, pred): actual = np.asarray(actual) # In case, someone passes Series or list
    n = len(actual) a_s = actual[np.argsort(pred)] a_c = a_s.cumsum() giniSum = a_c.sum() / a_s.sum() - (n + 1) / 2.0
    return giniSum / n def gini_normalizedc(a, p): if p.ndim == 2:  # Required for sklearn wrapper
        p = p[:,1]   # If proba array contains proba for both 0 and 1 classes, just pick class 1
    return ginic(a, p) / ginic(a, a) from sklearn import metrics gini_sklearn = metrics.make_scorer(gini_normalizedc, True, True) import numpy as np import pandas as pd from scipy.stats import ks_2samp from sklearn.metrics import make_scorer, roc_auc_score, log_loss,f1_score from sklearn.model_selection import GridSearchCV def ks_stat(y, yhat): y=np.array(y) yhat=np.array(yhat) try: ks =ks_2samp(yhat[y==1],yhat[y!=1]).statistic except: print(yhat.shape,y.shape) kmp=yhat[y==1] kmp2=yhat[y!=1] print(kmp.shape,kmp2.shape) ks=0 return ks ks_scorer = make_scorer(ks_stat, needs_proba=True, greater_is_better=True) log_scorer = make_scorer(log_loss, needs_proba=True, greater_is_better=False) roc_scorer = make_scorer(roc_auc_score, needs_proba=True) f1_scorer = make_scorer(f1_score, needs_proba=True, greater_is_better=True)

 

3.跑模型ing

3.1 GridSearchCV調參

模型有不少超參數,經過格點搜索進行(此處,若是設置並行n_jobs>1,則須要把自定義的scoring放在一個外部py文件中,由於:若是在聲明函數以前聲明池,則嘗試並行使用它將引起此錯誤。顛倒順序,它將再也不拋出此錯誤)。數組

from sklearn.ensemble import GradientBoostingClassifier #必須從外部導入scoring
import multiprocessing #multiprocessing.set_start_method('spawn')
from external import *
from sklearn.model_selection import GridSearchCV tuned_param = {'learning_rate': [0.1, 0.2, 0.5, 0.8, 1], 'max_depth':[3,5,7,10], 'min_samples_split':[50,100,200], 'n_estimators':[50,70,100,150,200], 'subsample':[0.6,0.8]} # n=20 clf_gbdt=GridSearchCV(GradientBoostingClassifier(),tuned_param,cv=2,scoring={'auc': roc_scorer, 'k-s': ks_scorer}, refit='k-s',n_jobs=n, verbose=10) clf_gbdt.fit(x_train[col_gbdt],y_train) #獲取最優超參組合
def print_best_score(gsearch,param_test): # 輸出best score
    print("Best score: %0.3f" % gsearch.best_score_) print("Best parameters set:") # 輸出最佳的分類器到底使用了怎樣的參數
    best_parameters = gsearch.best_estimator_.get_params() for param_name in sorted(param_test.keys()): print("\t%s: %r" % (param_name, best_parameters[param_name])) print_best_score(clf_gbdt,tuned_param) 

 

3.2 k-fold驗證模型穩定性

對於工業界算法模型,模型的穩定是首要!

from sklearn.model_selection import cross_val_score scores=cross_val_score(estimator=lr,X=x_train[col_lr],y=y_train,cv=5,n_jobs=10,scoring=ks_scorer,verbose=10) print('CV k-s scores: %s'%scores) print('CV k-s: %.3f +/- %.3f'%(np.mean(scores),np.std(scores)))

 

3.3 模型訓練

from sklearn.ensemble import GradientBoostingClassifier gbdt = GradientBoostingClassifier(learning_rate=0.2,max_depth=10,min_samples_split=200,n_estimators=200, subsample=0.8,min_samples_leaf=50) gbdt.fit(x_train[col_gbdt], y_train) #模型保存
pickle.dump(gbdt, open('./ml_model/gbdt_model.pkl', 'wb')) #模型預測輸出機率值
y_pred_gbdt=gbdt.predict_proba(x_valid[col_gbdt])

 

3.4 模型驗證

  • AUC
import seaborn as sns sns.set_style('darkgrid') import matplotlib.pyplot as plt plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False plt.rcParams.update({'font.size': 10}) plt.rcParams['savefig.dpi'] = 300 #圖片像素
plt.rcParams['figure.dpi'] = 300 #分辨率

# 計算AUC
fpr_lr,tpr_lr,thresholds = roc_curve(y_valid,y_pred_lr[:,1],pos_label=1) roc_auc_lr = auc(fpr_lr, tpr_lr) # 繪製roc
plt.rcParams['figure.figsize']=(8,5) plt.figure() plt.plot(fpr_lr, tpr_lr, color='darkorange', label='ROC curve (area = %0.2f)' % roc_auc_lr) plt.plot([0, 1], [0, 1], color='navy', linestyle='--') plt.xlim([0.0, 1.0]) plt.ylim([0.0, 1.05]) plt.xlabel('False Positive Rate') plt.ylabel('True Positive Rate') plt.title('ROC曲線-LR') plt.legend(loc="lower right")
  • K-S
# 繪製K-S曲線
import numpy as np import pandas as pd def PlotKS(preds, labels, n, asc): # preds is score: asc=1
    # preds is prob: asc=0
 pred = preds  # 預測值
    bad = labels  # 取1爲bad, 0爲good
    ksds = pd.DataFrame({'bad': bad, 'pred': pred}) ksds['good'] = 1 - ksds.bad if asc == 1: ksds1 = ksds.sort_values(by=['pred', 'bad'], ascending=[True, True]) elif asc == 0: ksds1 = ksds.sort_values(by=['pred', 'bad'], ascending=[False, True]) ksds1.index = range(len(ksds1.pred)) ksds1['cumsum_good1'] = 1.0*ksds1.good.cumsum()/sum(ksds1.good) ksds1['cumsum_bad1'] = 1.0*ksds1.bad.cumsum()/sum(ksds1.bad) if asc == 1: ksds2 = ksds.sort_values(by=['pred', 'bad'], ascending=[True, False]) elif asc == 0: ksds2 = ksds.sort_values(by=['pred', 'bad'], ascending=[False, False]) ksds2.index = range(len(ksds2.pred)) ksds2['cumsum_good2'] = 1.0*ksds2.good.cumsum()/sum(ksds2.good) ksds2['cumsum_bad2'] = 1.0*ksds2.bad.cumsum()/sum(ksds2.bad) # ksds1 ksds2 -> average
    ksds = ksds1[['cumsum_good1', 'cumsum_bad1']] ksds['cumsum_good2'] = ksds2['cumsum_good2'] ksds['cumsum_bad2'] = ksds2['cumsum_bad2'] ksds['cumsum_good'] = (ksds['cumsum_good1'] + ksds['cumsum_good2'])/2 ksds['cumsum_bad'] = (ksds['cumsum_bad1'] + ksds['cumsum_bad2'])/2
    
    # ks
    ksds['ks'] = ksds['cumsum_bad'] - ksds['cumsum_good'] ksds['tile0'] = range(1, len(ksds.ks) + 1) ksds['tile'] = 1.0*ksds['tile0']/len(ksds['tile0']) qe = list(np.arange(0, 1, 1.0/n)) qe.append(1) qe = qe[1:] ks_index = pd.Series(ksds.index) ks_index = ks_index.quantile(q = qe) ks_index = np.ceil(ks_index).astype(int) ks_index = list(ks_index) ksds = ksds.loc[ks_index] ksds = ksds[['tile', 'cumsum_good', 'cumsum_bad', 'ks']] ksds0 = np.array([[0, 0, 0, 0]]) ksds = np.concatenate([ksds0, ksds], axis=0) ksds = pd.DataFrame(ksds, columns=['tile', 'cumsum_good', 'cumsum_bad', 'ks']) ks_value = ksds.ks.max() ks_pop = ksds.tile[ksds.ks.idxmax()] tmp_str='ks_value is ' + str(np.round(ks_value, 4)) + ' at pop = ' + str(np.round(ks_pop, 4)) # chart

    # chart
    plt.plot(ksds.tile, ksds.cumsum_good, label='cum_good', color='blue', linestyle='-', linewidth=2) plt.plot(ksds.tile, ksds.cumsum_bad, label='cum_bad', color='red', linestyle='-', linewidth=2) plt.plot(ksds.tile, ksds.ks, label='ks', color='green', linestyle='-', linewidth=2) plt.axvline(ks_pop, color='gray', linestyle='--') plt.axhline(ks_value, color='green', linestyle='--') plt.axhline(ksds.loc[ksds.ks.idxmax(), 'cumsum_good'], color='blue', linestyle='--') plt.axhline(ksds.loc[ksds.ks.idxmax(),'cumsum_bad'], color='red', linestyle='--') plt.title('KS=%s ' %np.round(ks_value, 4) +  
                'at Pop=%s' %np.round(ks_pop, 4), fontsize=15) return tmp_str

#調用
PlotKS(y_valid,y_pred_lr[:,1], n=10, asc=0)
  • 其餘指標計算
#根據上述可知,在0.2的時候區分度最好,所以認爲大於0.2的就是1,小於則爲0
y_pred_lr_new=[] for i in y_pred_lr[:,1]: if i<=0.2: y_pred_lr_new.append(0) else: y_pred_lr_new.append(1) y_pred_gbdt_new=[] for i in y_pred_gbdt[:,1]: if i<=0.2: y_pred_gbdt_new.append(0) else: y_pred_gbdt_new.append(1) y_pred_lr_gbdt_new=[] for i in y_pred_gbdt_lr[:,1]: if i<=0.2: y_pred_lr_gbdt_new.append(0) else: y_pred_lr_gbdt_new.append(1) # gbdt
acc=accuracy_score(y_valid, y_pred_gbdt_new) p = precision_score(y_valid, y_pred_gbdt_new, average='binary') r = recall_score(y_valid, y_pred_gbdt_new, average='binary') f1score = f1_score(y_valid, y_pred_gbdt_new, average='binary') print(acc,p,r,f1score)

 

 

 

4.線上部署(高能預警:有大殺器)

 

4.1 提供flask服務

初始模型以讀取pickle文件方式:首先經過python腳本讀取pickle文件,再起flask提供模型預測服務,最後java應用調用flask實現預測。

  •  優勢:方便更新模型,且能用與除LR模型以外的複雜模型(樹模型等)。
  • 缺點:java調flask,可能會有通訊延時等,會影響速度。flask服務部署的docker和java應用部署的docker要分開,計算效率低
  • 參考連接:https://www.cnblogs.com/demodashi/p/8491170.html

 

4.2 純java代碼實現

用java腳本實現LR模型類:經過java腳本構建模型(經過python訓練好的模型能夠輸出成規則)方法,java應用之間調這個方法實現預測。

  •  優勢:相對於第一種方法,該方法不須要進行通訊(這裏指和python),且計算速度快(純java方法)。
  •  缺點:代碼實現複雜,且只能用於LR模型,不能用於複雜模型。模型特徵加工複雜(這裏特指獨熱編碼、歸一化和gbdt特徵生成),開發費勁

 

4.3 Java調用jpmml類

使用Java調用jpmml:把python相關的模型(獨熱編碼、LR模型等)轉成PMML文件(至關於一個本地txt文件),再經過java類(jpmml)調用。

  •  優勢:結合1和2的優勢,把全部(獨熱編碼、模型預測)的操做都聚集到一個方法中,速度快。
  •  缺點:暫無。
  • 參考連接:https://github.com/jpmml/jpmml-evaluator

綜上所述,綜合考慮計算速度及計算資源,推薦使用方案3。

 

4.4 GBDT+LR模型訓練及線上部署(快、粗、好、猛)

參考連接:https://openscoring.io/blog/2019/06/19/sklearn_gbdt_lr_ensemble/

from lightgbm import LGBMClassifier from sklearn_pandas import DataFrameMapper from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier from sklearn.linear_model import LogisticRegression from sklearn.preprocessing import LabelBinarizer, LabelEncoder from sklearn2pmml import sklearn2pmml from sklearn2pmml.decoration import CategoricalDomain, ContinuousDomain from sklearn2pmml.ensemble import GBDTLRClassifier from sklearn2pmml.pipeline import PMMLPipeline from xgboost import XGBClassifier import pandas df = pandas.read_csv("audit.csv") cat_columns = ["Education", "Employment", "Marital", "Occupation"] cont_columns = ["Age", "Hours", "Income"] label_column = "Adjusted"

def make_fit_gbdtlr(gbdt, lr): mapper = DataFrameMapper( [([cat_column], [CategoricalDomain(), LabelBinarizer()]) for cat_column in cat_columns] + [(cont_columns, ContinuousDomain())] ) classifier = GBDTLRClassifier(gbdt, lr) pipeline = PMMLPipeline([ ("mapper", mapper), ("classifier", classifier) ]) pipeline.fit(df[cat_columns + cont_columns], df[label_column]) return pipeline pipeline = make_fit_gbdtlr(GradientBoostingClassifier(n_estimators = 499, max_depth = 2), LogisticRegression()) sklearn2pmml(pipeline, "GBDT+LR.pmml") pipeline = make_fit_gbdtlr(RandomForestClassifier(n_estimators = 31, max_depth = 6), LogisticRegression()) sklearn2pmml(pipeline, "RF+LR.pmml") pipeline = make_fit_gbdtlr(XGBClassifier(n_estimators = 299, max_depth = 3), LogisticRegression()) sklearn2pmml(pipeline, "XGB+LR.pmml") def make_fit_lgbmlr(gbdt, lr): mapper = DataFrameMapper( [([cat_column], [CategoricalDomain(), LabelEncoder()]) for cat_column in cat_columns] + [(cont_columns, ContinuousDomain())] ) classifier = GBDTLRClassifier(gbdt, lr) pipeline = PMMLPipeline([ ("mapper", mapper), ("classifier", classifier) ]) pipeline.fit(df[cat_columns + cont_columns], df[label_column], classifier__gbdt__categorical_feature = range(0, len(cat_columns))) return pipeline pipeline = make_fit_lgbmlr(LGBMClassifier(n_estimators = 71, max_depth = 5), LogisticRegression()) sklearn2pmml(pipeline, "LGBM+LR.pmml")

注意:jdk必須是1.8及以上、sklearn的版本必須0.21.0及以上、sklearn2pmml版本最好經過git獲取安裝

import sklearn import sklearn2pmml print('The sklearn2pmml version is {}.'.format(sklearn2pmml.__version__)) print('The scikit-learn version is {}.'.format(sklearn.__version__)) #多個jdk版本設置切換
export JAVA_HOME=/software/servers/jdk1.8.0_121/ export PATH=$JAVA_HOME/bin:$PATH export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
相關文章
相關標籤/搜索