[Feature] Build pipeline

準備數據集

1、數據集

Ref: 6. Dataset loading utilities【各類數據集選項】html

第一部分,加載原始iris數據集的數據;python

第二部分,先增長一行,再增長一列;git

#%% part one.
from sklearn.datasets import load_iris
iris = load_iris()

iris.data
iris.target

print(type(iris.data))
print(type(iris.target))

print()
preview_line = 5
print("data X is: \n{}".format(iris.data[:preview_line]))
print("data Y is: \n{}".format(iris.target[:preview_line]))

#%% part two from numpy import hstack, vstack, array, median, nan from numpy.random import choice
##########################################################################################
# 1.特徵矩陣加工 # 1.1.1 使用vstack增長一行含缺失值的樣本(nan, nan, nan, nan), reshape至關於升維度 nan_tmp = array([nan, nan, nan, nan]).reshape(1,-1) print(nan_tmp) # 1.1.2 合併兩個array iris.data = vstack((iris.data, array([nan, nan, nan, nan]).reshape(1,-1)))
# 1.2.1 使用hstack增長一列表示花的顏色(0-白、1-黃、2-紅),花的顏色是隨機的,意味着顏色並不影響花的分類 random_feature = choice([0, 1, 2], size=iris.data.shape[0]).reshape(-1,1) # 1.2.2 合併兩個array iris.data = hstack((random_feature, iris.data)) preview_line = 5 print("data X is: \n{}".format(iris.data[:preview_line]))
##########################################################################################
# 2 目標值向量加工 # 2.1 增長一個目標值,對應含缺失值的樣本,值爲衆數 iris.target = hstack((iris.target, array([median(iris.target)])))

另外一個寫法版本,編輯器友好,但讀者不友好。 github

# Edit data
iris.data   = np.hstack((np.random.choice([0, 1, 2], size=iris.data.shape[0]+1).reshape(-1,1), np.vstack((iris.data, np.full(4, np.nan).reshape(1,-1)))))
iris.target = np.hstack((iris.target, np.array([np.median(iris.target)])))

 

 

 

串行、並行流

1、串行結合並行流

有若干知識點:FeatureUnion, fit, transform, pipeline。算法

大概的思路是:數組

(1)先構建FeatureUnion,找到合適的features.dom

(2)再利用這些features進一步地作分類、擬合或者聚類。編輯器

from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.model_selection import GridSearchCV
from sklearn.svm import SVC
from sklearn.datasets import load_iris
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest

iris = load_iris()
X, y = iris.data, iris.target

#############################################################################
# This dataset is way too high-dimensional. Better do PCA: pca = PCA(n_components=2) # 能夠依據「相關性」對特徵進行選擇,保留k個評分最高的特徵。
selection = SelectKBest(k=1)
# Build estimator from PCA and Univariate selection:
combined_features = FeatureUnion([("pca", pca), ("univ_select", selection)])

#############################################################################
# Use combined features to transform dataset: X_features = combined_features.fit(X, y).transform(X) print("Combined space has", X_features.shape[1], "features")
svm
= SVC(kernel="linear")
#############################################################################
# Do grid search over k, n_components and C: pipeline = Pipeline([("features", combined_features), ("svm", svm)])

#############################################################################
# pca, univ_select其實原本已設置,這裏呢,能夠設置「一組參數」, fit後找到最好的參數設置
param_grid
= dict(features__pca__n_components=[1, 2, 3], features__univ_select__k=[1, 2], svm__C=[0.1, 1, 10]) grid_search = GridSearchCV(pipeline, param_grid=param_grid, cv=5, verbose=10) grid_search.fit(X, y) print(grid_search.best_estimator_)

 

 

2、串行 Pipeline

(a) Pipleline中最後一個以外的全部estimators都必須是變換器(transformers),最後一個estimator能夠是任意類型(transformer,classifier,regresser)。ide

transformers --> transformers --> transformers --> ... --> transformer or classifier or regresser函數

 

(b) pipeline繼承最後一個estimator的全部方法。帶來兩點好處:

1. 直接調用fit和predict方法來對pipeline中的全部算法模型進行訓練和預測。
2. 能夠結合 grid search 對參數進行選擇。

繼承了哪些方法呢,以下:

1 transform 依次執行各個學習器的transform方法;
2 inverse_transform 依次執行各個學習器的inverse_transform方法;
3 fit 依次對前n-1個學習器執行 fit 和 transform 方法,第n個學習器 (最後一個學習器) 執行fit方法;
4 predict 執行第n個學習器的 predict方法;
5 score 執行第n個學習器的 score方法;
6 set_params 設置第n個學習器的參數;
7 get_param 獲取第n個學習器的參數;

  

(c) 訓練和預測時的串行工做流。

Ref: sklearn :Pipeline 與 FeatureUnion入門指南

 

(d) 改變pipeline的參數:set_params

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.decomposition import PCA
from sklearn.datasets import load_iris
iris
=load_iris() pipe=Pipeline([('sc', StandardScaler()), ('pca',PCA()),('svc',SVC())])
#例如‘sc’是StandardScaler()的簡稱,亦或者是代稱 pipe.set_params(sc__copy=False)
#改變參數的格式爲 學習器簡稱__該學習器對應參數名=參數值 pipe.fit(iris.data, iris.target)

該參數後,能看到pipeline類中的子函數的對應的參數也變了。

#能夠看到sc中的copy確實由true改成false
Pipeline(memory=None,
     steps=[('sc', StandardScaler(copy=False, with_mean=True, with_std=True)), ('pca', PCA(copy=True, iterated_power='auto', n_components=None, random_state=None,
  svd_solver='auto', tol=0.0, whiten=False)), ('svc', SVC(C=1.0, cache_size=200, class_weight=None, coef0=0.0,
  decision_function_shape='ovr', degree=3, gamma='auto_deprecated',
  kernel='rbf', max_iter=-1, probability=False, random_state=None,
  shrinking=True, tol=0.001, verbose=False))])

 

 

3、並行 FeatureUnion 

特徵聯合 FeatureUnion 是個啥?與pipeline有啥區別?

    • pipeline至關於feature串行處理,後一個transformer處理前一個transformer的feature結果;
    • featureunion至關於feature的並行處理,將全部transformer的處理結果拼接成大的feature vector。

FeatureUnion提供了兩種服務:

    • Convenience: 你只需調用一次fit和transform就能夠在數據集上訓練一組estimators。
    • Joint parameter selection: 能夠把grid search用在FeatureUnion中全部的estimators的參數組合上面。

從以下例子可見:PCA和KernelPCA是同一個等級的 estimator,天然是使用 「並行」 策略。

 

例子一:

from sklearn.pipeline import FeatureUnion from sklearn.decomposition import PCA   # transformer from sklearn.decomposition import KernelPCA  # transformer
estimators
= [('linear_pca',PCA()),('kernel_pca',KernelPCA())] combined = FeatureUnion(estimators) print(combined) #FeatureUnion(n_jobs=1, transformer_list=[('linear_pca', PCA(copy=True, iterated_power='auto', n_components=None, random_state=None, svd_solver='auto', tol=0.0, whiten=False)), ('kernel_pca', KernelPCA(alpha=1.0, coef0=1, copy_X=True, degree=3, eigen_solver='auto', fit_inverse_transform=False, gamma=None, kernel='linear', kernel_params=None, max_iter=None, n_components=None, n_jobs=1, random_state=None, remove_zero_eig=False, tol=0))],transformer_weights=None)

例子二:

from numpy import log1p
from sklearn.preprocessing import FunctionTransformer  # transformer from sklearn.preprocessing import Binarizer # transformer from sklearn.pipeline import FeatureUnion # 新建將總體特徵矩陣進行對數函數轉換的對象。
step2_1 = ('ToLog', FunctionTransformer(log1p))
# 新建將總體特徵矩陣進行二值化類的對象。 step2_2 = ('ToBinary', Binarizer())
# 新建總體並行處理對象。 # 該對象也有fit和transform方法,fit和transform方法均是並行地調用須要並行處理的對象的fit和transform方法。 # 參數transformer_list爲須要並行處理的對象列表,該列表爲二元組列表,第一元爲對象的名稱,第二元爲對象。 step2 = ('FeatureUnion', FeatureUnion(transformer_list=[step2_1, step2_2]))

 

知識點:FunctionTransformer(log1p) 基於對數函數

把原始數據取對數後進一步處理。之因此這樣作是基於對數函數在其定義域內是單調增函數,取對數後不會改變數據的相對關係。

1. 縮小數據的絕對數值,方便計算。

2. 取對數後,能夠將乘法計算轉換稱加法計算。

3. 某些狀況下,在數據的整個值域中的在不一樣區間的差別帶來的影響不一樣。

4. 取對數以後不會改變數據的性質和相關關係,但壓縮了變量的尺度,例如800/200=4, 但log800/log200=1.2616,數據更加平穩,也消弱了模型的共線性、異方差性等。

5. 所獲得的數據易消除異方差問題。

6. 在經濟學中,常取天然對數再作迴歸,這時迴歸方程爲 lnY=a lnX+b ,兩邊同時對X求導,1/Y*(DY/DX) = a*1/X, b = (DY/DX)*(X/Y) = (DY*X)/(DX*Y) = (DY/Y)/(DX/X) 這正好是彈性的定義

 

知識點:Binarizer() 二值化特徵

使用的是歌曲的數據:對歌曲聽過的次數作二值化操做,聽過大於等於1的次數的設置爲1,否者設置爲0。

import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
import pandas as pd
 
plt.style.reload_library()
plt.style.use('classic')
# 設置顏色
mpl.rcParams['figure.facecolor'] = (1, 1, 1, 0)
# 設置圖形大小
mpl.rcParams['figure.figsize'] = (6.0, 4.0)
# 設置圖形的分辨率
mpl.rcParams['figure.dpi'] = 100 popsong_df = pd.read_csv('datasets/song_views.csv', encoding='utf-8')
# 咱們對listen_count聽歌的次數進行二值化操做, 聽過的次數大於等於1的爲1,次數爲0的爲0

# 第一種方法
# listened = popsong_df['listen_count'].copy() # listened[listened >= 1] = 1 # popsong_df['listened'] = listened # print(popsong_df[['listen_count', 'listened']])
# 第二種方法:使用 Binarizer from sklearn.preprocessing import Binarizer
bin = Binarizer(threshold=0.9) popsong_df['listened'] = bin.transform( popsong_df['listen_count'].values.reshape(-1, 1) )
print(popsong_df[['listen_count', 'listened']].iloc[:10])

在這裏,爲了知足 .transform() 的要求,參數需經過 reshape 變爲 「小列表」 元素的形式。

In [12]: df['score'].values                                                     
Out[12]: array([1, 4])

In [13]: df['score'].values.reshape(-1,1)   # 須要這個形式,而後給 bin.transform                                    
Out[13]: 
array([[1],
       [4]])

 

 

4、"部分並行」處理

使用需求

總體並行處理有其缺陷,在一些場景下,咱們只須要對特徵矩陣的某些列進行轉換,而不是全部列

pipeline並無提供相應的類(僅OneHotEncoder類實現了該功能),須要咱們在FeatureUnion的基礎上進行優化。

 

使用方法

(1) 這裏自定義並使用了 FeatureUnionExt(...) 函數接口,用起來比較方便。

from numpy import log1p
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import FunctionTransformer
from sklearn.preprocessing import Binarizer

#新建將部分特徵矩陣進行定性特徵編碼的對象
step2_1 = ('OneHotEncoder', OneHotEncoder(sparse=False))
#新建將部分特徵矩陣進行對數函數轉換的對象
step2_2 = ('ToLog', FunctionTransformer(log1p))
#新建將部分特徵矩陣進行二值化類的對象
step2_3 = ('ToBinary', Binarizer())

#新建部分並行處理對象
#參數transformer_list爲須要並行處理的對象列表,該列表爲二元組列表,第一元爲對象的名稱,第二元爲對象
#參數idx_list爲相應的須要讀取的特徵矩陣的列
step2 = ('FeatureUnionExt', FeatureUnionExt(transformer_list=[step2_1, step2_2, step2_3], idx_list=[[0], [1, 2, 3], [4]]))

 

(2) 這裏對 FeatureUnionExt(...) 函數接口 進行了實現,重點是使用了 Parallel方法【實現使用的老版本,調試請參考github中新版本】

from sklearn.pipeline import FeatureUnion, _fit_one_transformer, _fit_transform_one, _transform_one 
from sklearn.externals.joblib import Parallel, delayed
from scipy import sparse
import numpy as np

# 部分並行處理,繼承FeatureUnion class FeatureUnionExt(FeatureUnion):
# 相比FeatureUnion,多了idx_list參數,其表示每一個 "並行工做" 須要讀取的特徵矩陣的列。 def __init__(self, transformer_list, idx_list, n_jobs=1, transformer_weights=None): self.idx_list = idx_list FeatureUnion.__init__(self, transformer_list=map(lambda trans:(trans[0], trans[1]), transformer_list), n_jobs=n_jobs, transformer_weights=transformer_weights)

# 因爲只部分讀取特徵矩陣,方法fit_transform須要重構 def fit_transform(self, X, y=None, **fit_params): transformer_idx_list = map(lambda trans, idx:(trans[0], trans[1], idx), self.transformer_list, self.idx_list) result = Parallel(n_jobs=self.n_jobs)( #從特徵矩陣中提取部分輸入fit_transform方法 delayed(_fit_transform_one)(trans, name, X[:,idx], y, self.transformer_weights, **fit_params) for name, trans, idx in transformer_idx_list) Xs, transformers = zip(*result) self._update_transformer_list(transformers)
if any(sparse.issparse(f) for f in Xs): Xs = sparse.hstack(Xs).tocsr() else: Xs = np.hstack(Xs)
return Xs

... 其餘代碼,略

 

 

5、流水線處理

from numpy import log1p
from sklearn.preprocessing import Imputer
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import FunctionTransformer
from sklearn.preprocessing import Binarizer
from sklearn.preprocessing import MinMaxScaler
from sklearn.feature_selection import SelectKBest
from sklearn.feature_selection import chi2
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline

#新建計算缺失值的對象
step1 = ('Imputer', Imputer())
#新建將部分特徵矩陣進行定性特徵編碼的對象 step2_1 = ('OneHotEncoder', OneHotEncoder(sparse=False)) #新建將部分特徵矩陣進行對數函數轉換的對象 step2_2 = ('ToLog', FunctionTransformer(log1p)) #新建將部分特徵矩陣進行二值化類的對象 step2_3 = ('ToBinary', Binarizer())
#新建部分並行處理對象,返回值爲每一個並行工做的輸出的合併 step2 = ('FeatureUnionExt', FeatureUnionExt(transformer_list=[step2_1, step2_2, step2_3], idx_list=[[0], [1, 2, 3], [4]]))
#新建無量綱化對象 step3 = ('MinMaxScaler', MinMaxScaler()) #新建卡方校驗選擇特徵的對象 step4 = ('SelectKBest', SelectKBest(chi2, k=3)) #新建PCA降維的對象 step5 = ('PCA', PCA(n_components=2)) #新建邏輯迴歸的對象,其爲待訓練的模型做爲流水線的最後一步 step6 = ('LogisticRegression', LogisticRegression(penalty='l2'))
#新建流水線處理對象 #參數steps爲須要流水線處理的對象列表,該列表爲二元組列表,第一元爲對象的名稱,第二元爲對象 pipeline = Pipeline(steps=[step1, step2, step3, step4, step5, step6])

 

 

自動化調參

1、自動化調參

經過GridSearchCV函數接口,對比不一樣結果,找到最棒的參數設置【可能會出現Runtime Error】

from sklearn.grid_search import GridSearchCV

#新建網格搜索對象
#第一參數爲待訓練的模型
#param_grid爲待調參數組成的網格,字典格式,鍵爲參數名稱(格式「對象名稱__子對象名稱__參數名稱」),值爲可取的參數值列表
grid_search = GridSearchCV(pipeline, param_grid={'FeatureUnionExt__ToBinary__threshold':[1.0, 2.0, 3.0, 4.0], 'LogisticRegression__C':[0.1, 0.2, 0.4, 0.8]})
#訓練以及調參
grid_search.fit(iris.data, iris.target)

Ref: sklearn :Pipeline 與 FeatureUnion入門指南【代碼可運行】

# Author: Andreas Mueller <amueller@ais.uni-bonn.de>
#
# License: BSD 3 clause

from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.grid_search import GridSearchCV
from sklearn.svm import SVC
from sklearn.datasets import load_iris
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest

iris = load_iris()

X, y = iris.data, iris.target

# This dataset is way to high-dimensional. Better do PCA:
pca = PCA(n_components=2)

# Maybe some original features where good, too?
selection = SelectKBest(k=1)

# Build estimator from PCA and Univariate selection:

combined_features = FeatureUnion([("pca", pca), ("univ_select", selection)])

# Use combined features to transform dataset:
X_features = combined_features.fit(X, y).transform(X)

svm = SVC(kernel="linear")

# Do grid search over k, n_components and C:
pipeline = Pipeline([("features", combined_features), ("svm", svm)])
param_grid = dict(features__pca__n_components=[1, 2, 3],
                  features__univ_select__k=[1, 2],
                  svm__C=[0.1, 1, 10])

grid_search = GridSearchCV(pipeline, param_grid=param_grid, verbose=10)
grid_search.fit(X, y)
print(grid_search.best_estimator_)

 

運行時錯誤  RuntimeError

$ python dm.py 
/usr/local/anaconda3/lib/python3.7/site-packages/sklearn/model_selection/_split.py:1978: FutureWarning: The default value of cv will change from 3 to 5 in version 0.22. Specify it explicitly to silence this warning.
  warnings.warn(CV_WARNING, FutureWarning)
Traceback (most recent call last):
  File "dm.py", line 77, in <module>
    main()
  File "dm.py", line 70, in main
    datamining(iris, featureList)
  File "dm.py", line 36, in datamining
    grid_search.fit(iris.data, iris.target)
  File "/usr/local/anaconda3/lib/python3.7/site-packages/sklearn/model_selection/_search.py", line 632, in fit
    base_estimator = clone(self.estimator)
  File "/usr/local/anaconda3/lib/python3.7/site-packages/sklearn/base.py", line 64, in clone
    new_object_params[name] = clone(param, safe=False)
  File "/usr/local/anaconda3/lib/python3.7/site-packages/sklearn/base.py", line 52, in clone
    return estimator_type([clone(e, safe=safe) for e in estimator])
  File "/usr/local/anaconda3/lib/python3.7/site-packages/sklearn/base.py", line 52, in <listcomp>
    return estimator_type([clone(e, safe=safe) for e in estimator])
  File "/usr/local/anaconda3/lib/python3.7/site-packages/sklearn/base.py", line 52, in clone
    return estimator_type([clone(e, safe=safe) for e in estimator])
  File "/usr/local/anaconda3/lib/python3.7/site-packages/sklearn/base.py", line 52, in <listcomp>
    return estimator_type([clone(e, safe=safe) for e in estimator])
  File "/usr/local/anaconda3/lib/python3.7/site-packages/sklearn/base.py", line 75, in clone
    (estimator, name))
RuntimeError: Cannot clone object FeatureUnionExt(idx_list=[[0], [1, 2, 3], [4]], n_jobs=1,
                transformer_list=[('OneHotEncoder',
                                   OneHotEncoder(categorical_features=None,
                                                 categories=None, drop=None,
                                                 dtype=<class 'numpy.float64'>,
                                                 handle_unknown='error',
                                                 n_values=None, sparse=False)),
                                  ('ToLog',
                                   FunctionTransformer(accept_sparse=False,
                                                       check_inverse=True,
                                                       func=<ufunc 'log1p'>,
                                                       inv_kw_args=None,
                                                       inverse_func=None,
                                                       kw_args=None,
                                                       pass_y='deprecated',
                                                       validate=None)),
                                  ('ToBinary',
                                   Binarizer(copy=True, threshold=0.0))],
                transformer_weights=None), as the constructor either does not set or modifies parameter transformer_list
調參fit時報錯

Error log來源;https://gplearn.readthedocs.io/en/stable/_modules/sklearn/base.html

 

 

 

模型保存

1、持久化

參考:[Python] 05 - Load data from Files

#持久化數據
#  第一個參數,爲內存中的對象
#  第二個參數,爲保存在文件系統中的名稱
#  第三個參數,爲壓縮級別,0爲不壓縮,3爲合適的壓縮級別
dump(grid_search, 'grid_search.dmp', compress=3)
#從文件系統中加載數據到內存中 grid_search = load('grid_search.dmp')

 

End.

 End.

相關文章
相關標籤/搜索