Alink漫談(十) :線性迴歸實現 之 數據預處理

Alink漫談(十) :線性迴歸實現 之 數據預處理

0x00 摘要

Alink 是阿里巴巴基於實時計算引擎 Flink 研發的新一代機器學習算法平臺,是業界首個同時支持批式算法、流式算法的機器學習平臺。本文和下文將介紹線性迴歸在Alink中是如何實現的,但願能夠做爲你們看線性迴歸代碼的Roadmap。html

由於Alink的公開資料太少,因此如下均爲自行揣測,確定會有疏漏錯誤,但願你們指出,我會隨時更新。java

本系列目前已有十篇,歡迎你們指點git

0x01 概念

1.1 線性迴歸

線性迴歸是利用數理統計中迴歸分析,來肯定兩種或兩種以上變量間相互依賴的定量關係的一種統計分析方法,運用十分普遍。其表達形式爲y = w'x+e,e爲偏差服從均值爲0的正態分佈。算法

在線性迴歸中,目標值與特徵之間存在着線性相關的關係。即假設這個方程是一個線性方程,一個多元一次方程。數組

基本形式:給定由 d 個屬性描述的示例 ,線性模型試圖學得一個經過屬性的線性組合來進行預測的函數,即:session

\[f(x)=w_1x_1 +w_2x_2 ... +w_dx_d+b \]

其中w爲參數,也稱爲權重,能夠理解爲x1,x2...和 xd 對f(x)的影響度。機器學習

通常形式爲:分佈式

\[f(x)=w^Tx+b \]

假如咱們依據這個公式來預測 f(x),公式中的x是咱們已知的,然而w,b的取值殊不知道,只要咱們把w,b的取值求解出來,模型就得以肯定。咱們就能夠依據這個公式來作預測了。ide

那麼如何依據訓練數據求解 w 和 b 的最優取值呢?關鍵是衡量 f 和 y 之間的差異。這就牽扯到另一個概念:損失函數(Loss Function)。函數

1.2 優化模型

假若有一個模型 f(x),如何判斷這個模型是否優秀?這種定性的判斷能夠經過一個成爲經驗偏差風險的數值來進行衡量,也就是模型 f 在全部訓練樣本上所犯錯誤的總和 E(x)。

咱們經過在訓練集上最小化經驗損失來訓練模型。換言之,經過調節 f 的參數 w,使得經驗偏差風險 E(x) 不斷降低,最終達到最小值的時候,咱們就得到了一個 「最優」 的模型。

可是若是按照上面的定義,E(x) 是一組示性函數的和,所以是不連續不可導的函數,不易優化。爲了解決這個問題,人們提出了「損失函數」的概念。損失函數就是和偏差函數有必定關係(好比是偏差函數的上界),可是具備更好的數學性質(好比連續,可導,凸性等),比較容易進行優化。因此咱們就能夠對損失函數來優化。

損失函數若是連續可導,因此咱們能夠用梯度降低法等一階算法,也能夠用牛頓法,擬牛頓法等二階算法。當優化算法收斂後,咱們就獲得一個不錯的模型。若是損失函數是一個凸函數,咱們就能夠獲得最優模型。

典型的優化方法:

一階算法 二階算法
肯定性算法 梯度降低法 投影次梯度降低 近端梯度降低 Frank-Wolfe算法 Nesterov加速算法 座標降低法 對偶座標上升法 牛頓法,擬牛頓法
隨機算法 隨機梯度降低法 隨機座標降低法 隨機對偶座標上升法 隨機方差減少梯度法 隨機擬牛頓法

因此咱們能夠知道,優化LinearRegression模型 f 的手段必定是:肯定損失函數,用 x,y 做爲輸入訓練以求得損失函數最小值,從而肯定 f 的參數 w。過程大體以下:

  1. 處理輸入,把 x, y 轉換成算法須要的格式。

  2. 找一個合適的預測函數,通常表示爲 h 函數,該函數就是咱們須要找的分類函數,它用來預測輸入數據的判斷結果。

  3. 構造一個Cost函數(損失函數),該函數表示預測的輸出(h)與訓練數據類別(y)之間的誤差,能夠是兩者之間的差(h-y)或者是其餘的形式。綜合考慮全部訓練數據的 「損失」,將Cost求和或者求平均,記爲J(θ)函數,表示全部訓練數據預測值與實際類別的誤差。

  4. 顯然,損失函數 J(θ) 函數的值越小表示預測函數越準確(即h函數越準確),因此這一步須要作的是找到 J(θ) 函數的最小值。注意,損失函數是關於 θ 的函數!也就是說,對於損失函數來說,θ再也不是函數的參數,而是損失函數的自變量!

  5. 準備模型元數據,創建模型。

1.3 損失函數&目標函數

先歸納說明:

  • 損失函數:計算的是一個樣本的偏差;
  • 代價函數:是整個訓練集上全部樣本偏差的平均,常常和損失函數混用;
  • 目標函數:代價函數 + 正則化項;

再詳細闡釋:

假設咱們用 f(X) 來擬合真實值Y。這個輸出的f(X)與真實值Y多是相同的,也多是不一樣的,爲了表示咱們擬合的好壞,咱們就用一個函數來度量擬合的程度。這個函數就稱爲損失函數(loss function),或者叫代價函數(cost function)。

損失函數用來衡量算法的運行狀況,估量模型的預測值與真實值的不一致程度,是一個非負實值函數,一般使用 L(Y,f(x)) 來表示。損失函數越小,模型的魯棒性就越好。損失函數是經驗風險函數的核心部分。

目標函數是一個相關但更廣的概念,對於目標函數來講在有約束條件下的最小化就是損失函數(loss function)。

由於f(x)可能會過分學習歷史數據,致使它在真正預測時效果會很很差,這種狀況稱爲過擬合(over-fitting)。這樣獲得的函數會過於複雜。因此咱們不只要讓經驗風險最小化,還要讓結構風險最小化。這個時候就定義了一個函數 J(x),這個函數專門用來度量模型的複雜度,在機器學習中也叫正則化(regularization)。經常使用的有 L1, L2範數。

L1 正則的本質是爲模型增長了「模型參數服從零均值拉普拉斯分佈」這一先驗知識。

L2 正則的本質是爲模型增長了「模型參數服從零均值正態分佈」這一先驗知識。

L1 正則化增長了全部權重 w 參數的絕對值之和逼迫更多 w 爲零,也就是變稀疏( L2 由於其導數也趨 0, 奔向零的速度不如 L1 給力了)。L1 正則化的引入就是爲了完成特徵自動選擇的光榮使命,它會學習地去掉無用的特徵,也就是把這些特徵對應的權重置爲 0。

L2 正則化中增長全部權重 w 參數的平方之和,逼迫全部 w 儘量趨向零但不爲零(L2 的導數趨於零)。由於在未加入 L2 正則化發生過擬合時,擬合函數須要顧忌每個點,最終造成的擬合函數波動很大,在某些很小的區間裏,函數值的變化很劇烈,也就是某些 w 值很是大。爲此,L2 正則化的加入就懲罰了權重變大的趨勢。

到這一步咱們就能夠說咱們最終的優化函數是:min(L(Y, f(x) + J(x)) ,即最優化經驗風險和結構風險,而這個函數就被稱爲目標函數

在迴歸問題中,經過目標函數來求解最優解,經常使用的是平方偏差(最小二乘線性迴歸)代價函數。損失函數則是平方損失函數

1.4 最小二乘法

均方偏差是迴歸任務中最經常使用的性能度量,所以可使均方偏差最小。基於均方偏差最小化來進行模型求解的方法稱爲「最小二乘法」。在線性迴歸中,最小二乘法就是找到一條直線,使全部樣本到直線的 "歐式距離和" 最小。因而線性迴歸中損失函數就是平方損失函數

有了這些基礎概念,下面咱們就開始動手分析Alink的代碼。

0x02 示例代碼

首先,咱們給出線性迴歸的示例。

public class LinearRegressionExample {
    static Row[] vecrows = new Row[] {
            Row.of("$3$0:1.0 1:7.0 2:9.0", "1.0 7.0 9.0", 1.0, 7.0, 9.0, 16.8),
            Row.of("$3$0:1.0 1:3.0 2:3.0", "1.0 3.0 3.0", 1.0, 3.0, 3.0, 6.7),
            Row.of("$3$0:1.0 1:2.0 2:4.0", "1.0 2.0 4.0", 1.0, 2.0, 4.0, 6.9),
            Row.of("$3$0:1.0 1:3.0 2:4.0", "1.0 3.0 4.0", 1.0, 3.0, 4.0, 8.0)
    };
    static String[] veccolNames = new String[] {"svec", "vec", "f0", "f1", "f2", "label"};
    static BatchOperator vecdata = new MemSourceBatchOp(Arrays.asList(vecrows), veccolNames);
    static StreamOperator svecdata = new MemSourceStreamOp(Arrays.asList(vecrows), veccolNames);

    public static void main(String[] args) throws Exception {
        String[] xVars = new String[] {"f0", "f1", "f2"};
        String yVar = "label";
        String vec = "vec";
        String svec = "svec";
        LinearRegression linear = new LinearRegression()
                .setLabelCol(yVar)  // 這裏把變量都設置好了,後續會用到
                .setFeatureCols(xVars)
                .setPredictionCol("linpred");

        Pipeline pl = new Pipeline().add(linear);
        PipelineModel model = pl.fit(vecdata);

        BatchOperator result = model.transform(vecdata).select(
                new String[] {"label", "linpred"});

        List<Row> data = result.collect();
    }
}

輸出是

svec|vec|f0|f1|f2|label|linpred
----|---|--|--|--|-----|-------
$3$0:1.0 1:7.0 2:9.0|1.0 7.0 9.0|1.0000|7.0000|9.0000|16.8000|16.8148
$3$0:1.0 1:3.0 2:4.0|1.0 3.0 4.0|1.0000|3.0000|4.0000|8.0000|7.8521
$3$0:1.0 1:3.0 2:3.0|1.0 3.0 3.0|1.0000|3.0000|3.0000|6.7000|6.7739
$3$0:1.0 1:2.0 2:4.0|1.0 2.0 4.0|1.0000|2.0000|4.0000|6.9000|6.959

根據前文咱們能夠知道,在迴歸問題中,經過優化目標函數來求解最優解,經常使用的是平方偏差(最小二乘線性迴歸)代價函數。損失函數則是平方損失函數。

對應到Alink,優化函數或者優化器是擬牛頓法的L-BFGS算法,目標函數是UnaryLossObjFunc,損失函數是SquareLossFunc。線性迴歸訓練整體邏輯是LinearRegTrainBatchOp。因此咱們下面一一論述。

0x03 總體概述

LinearRegression 訓練 用到LinearRegTrainBatchOp,而LinearRegTrainBatchOp的基類是BaseLinearModelTrainBatchOp。因此咱們來看BaseLinearModelTrainBatchOp。

public class LinearRegression extends Trainer <LinearRegression, LinearRegressionModel> implements LinearRegTrainParams <LinearRegression>, LinearRegPredictParams <LinearRegression> {
   @Override
   protected BatchOperator train(BatchOperator in) {
      return new LinearRegTrainBatchOp(this.getParams()).linkFrom(in);
   }
}

BaseLinearModelTrainBatchOp.linkFrom 代碼以下,註釋中給出了清晰的邏輯 :

大致是:

  • 獲取算法參數,label信息;
  • 準備,轉換數據到 Tuple3 format <weight, label, feature vector>;
  • 得到統計信息,好比向量大小,均值和方差;
  • 對訓練數據作標準化和插值;
  • 使用L-BFGS算法,經過對損失函數求最小值從而對模型優化;
  • 準備模型元數據;
  • 創建模型;
public T linkFrom(BatchOperator<?>... inputs) {
    BatchOperator<?> in = checkAndGetFirst(inputs);
    // Get parameters of this algorithm.
    Params params = getParams();
    // Get type of processing: regression or not
    boolean isRegProc = getIsRegProc(params, linearModelType, modelName);
    // Get label info : including label values and label type.
    Tuple2<DataSet<Object>, TypeInformation> labelInfo = getLabelInfo(in, params, isRegProc);
    // Transform data to Tuple3 format.//weight, label, feature vector.
    DataSet<Tuple3<Double, Double, Vector>> initData = transform(in, params, labelInfo.f0, isRegProc);
    // Get statistics variables : including vector size, mean and variance of train data.
    Tuple2<DataSet<Integer>, DataSet<DenseVector[]>>
        statInfo = getStatInfo(initData, params.get(LinearTrainParams.STANDARDIZATION));
    // Do standardization and interception to train data.
    DataSet<Tuple3<Double, Double, Vector>> trainData = preProcess(initData, params, statInfo.f1);
    // Solve the optimization problem.
    DataSet<Tuple2<DenseVector, double[]>> coefVectorSet = optimize(params, statInfo.f0,
        trainData, linearModelType, MLEnvironmentFactory.get(getMLEnvironmentId()));
    // Prepare the meta info of linear model.
    DataSet<Params> meta = labelInfo.f0
        .mapPartition(new CreateMeta(modelName, linearModelType, isRegProc, params))
        .setParallelism(1);
    // Build linear model rows, the format to be output.
    DataSet<Row> modelRows;
    String[] featureColTypes = getFeatureTypes(in, params.get(LinearTrainParams.FEATURE_COLS));
    modelRows = coefVectorSet
        .mapPartition(new BuildModelFromCoefs(labelInfo.f1,
            params.get(LinearTrainParams.FEATURE_COLS),
            params.get(LinearTrainParams.STANDARDIZATION),
            params.get(LinearTrainParams.WITH_INTERCEPT), featureColTypes))
        .withBroadcastSet(meta, META)
        .withBroadcastSet(statInfo.f1, MEAN_VAR)
        .setParallelism(1);
    // Convert the model rows to table.
    this.setOutput(modelRows, new LinearModelDataConverter(labelInfo.f1).getModelSchema());
    return (T)this;
}

咱們後續還會對此邏輯進行細化。

0x04 基礎功能

咱們首先介紹下相關基礎功能和相關概念,好比損失函數,目標函數,梯度等。

4.1 損失函數

損失函數涉及到若干概念。

4.1.1 導數和偏導數

導數也是函數,是函數的變化率與位置的關係。導數表明了在自變量變化趨於無窮小的時候,函數值的變化與自變量的變化的比值。幾何意義是這個點的切線。物理意義是該時刻的(瞬時)變化率。

導數反映的是函數y=f(x)在某一點處沿x軸正方向的變化率。直觀地看,也就是在x軸上某一點處,若是f’(x)>0,說明f(x)的函數值在x點沿x軸正方向是趨於增長的;若是f’(x)<0,說明f(x)的函數值在x點沿x軸正方向是趨於減小的。

一元導數表徵的是:一元函數 f(x)與自變量 x 在某點附近變化的比率(變化率,斜率)。

若是是多元函數呢?則爲偏導數。偏導數是多元函數「退化」成一元函數時的導數,這裏「退化」的意思是固定其餘變量的值,只保留一個變量,依次保留每一個變量,則N元函數有N個偏導數。偏導數爲函數在每一個位置處沿着自變量座標軸方向上的導數(切線斜率)。二元函數的偏導數表徵的是:函數 F(x,y) 與自變量 x(或y) 在某點附近變化的比率(變化率)。

4.1.2 方向導數

導數和偏導數的定義中,均是沿座標軸正方向討論函數的變化率。那麼當咱們討論函數沿任意方向的變化率時,也就引出了方向導數的定義,即:某一點在某一趨近方向上的導數值。

方向導數就是偏導數合成向量與方向向量的內積。方向導數的本質是一個數值,簡單來講其定義爲:一個函數沿指定方向的變化率。

4.1.3 Hessian矩陣

在一元函數求解的問題中,咱們能夠很愉快的使用牛頓法求駐點。但在機器學習的優化問題中,咱們要優化的都是多元函數,x每每不是一個實數,而是一個向量,因此將牛頓求根法利用到機器學習中時,x 是一個向量, y 也是一個向量,對 x 求導之後獲得的是一個矩陣,就是Hessian矩陣。

在數學中,海森矩陣(Hessian matrix 或 Hessian)是一個自變量爲向量的實值函數的二階偏導數組成的方塊矩陣多元函數的二階導數就是一個海森矩陣

前面提到,線性迴歸中損失函數就是平方損失函數。咱們來看看實現。後續實現將調用此類的 loss 和 derivative,具體遇到時候再講。

UnaryLossFunc是接口,表明一元損失函數。它定義的每一個函數都有兩個輸入 (eta and y),Alink把這兩個輸入的差做爲損失函數的一元變量。基本API是求損失,求導數,求二階導數。

public interface UnaryLossFunc extends Serializable {
	// Loss function.
	double loss(double eta, double y);
	// The derivative of loss function.
	double derivative(double eta, double y);
	// The second derivative of the loss function.
	double secondDerivative(double eta, double y);
}

平方損失函數具體實現以下:

public class SquareLossFunc implements UnaryLossFunc {

   @Override
   public double loss(double eta, double y) {
      return 0.5 * (eta - y) * (eta - y);
   }

   @Override
   public double derivative(double eta, double y) {
      return eta - y;
   }

   @Override
   public double secondDerivative(double eta, double y) {
      return 1;
   }
}

4.2 目標函數

這裏涉及的概念是梯度,梯度降低法。

4.2.1 梯度

對於模型優化,咱們要選擇最優的 θ,使得 f(x) 最接近真實值。這個問題就轉化爲求解最優的 θ,使損失函數 J(θ) 取最小值。那麼如何解決這個轉化後的問題呢?這又牽扯到一個概念:梯度降低(Radient Descent)

因此咱們首先要溫習下梯度。

  • 向量的定義是有方向(direction)有大小(magnitude)的量。
  • 梯度實際上是一個向量,即有方向有大小;其定義爲:一個多元函數對於其自變量分別求偏導數,這些偏導數所組成的向量就是函數的梯度。
  • 梯度即函數在某一點最大的方向導數,函數沿梯度方向函數有最大的變化率。
  • 梯度的第一層含義就是「方向導數的最大值」
  • 當前位置的梯度方向,爲函數在該位置處方向導數最大的方向,也是函數值上升最快的方向,反方向爲降低最快的方向;
  • 梯度的幾何含義就是:沿向量所在直線的方向變化率最大。

4.2.2 梯度降低法

梯度降低法是一個一階最優化算法,它的核心思想是:要想最快找到一個函數的局部極小值,必須沿函數當前點對應「梯度」(或者近似梯度)的反方向(降低)進行規定步長「迭代」搜索。沿梯度(斜率)的反方向移動,這就是「梯度降低法」

既然在變量空間的某一點處,函數沿梯度方向具備最大的變化率,那麼在優化目標函數的時候,天然是沿着負梯度方向去減少函數值,以此達到咱們的優化目標。

梯度降低中的降低,意思是讓函數的未知數隨着梯度的方向運動。什麼是梯度的方向呢?把這一點帶入到梯度函數中,結果爲正,那咱們就把這一點的值變小一些,同時就是讓梯度變小些;當這一點帶入梯度函數中的結果爲負的時候,就給這一點的值增大一些。

如何沿着負梯度方向減少函數值呢?既然梯度是偏導數的集合,同時梯度和偏導數都是向量,那麼參考向量運算法則,咱們在每一個變量軸上減少對應變量值便可

梯度降低就是讓梯度中全部偏導函數都降低到最低點的過程.(劃重點:降低)。都降低到最低點了,那每一個未知數(或者叫維度)的最優解就獲得了,因此他是解決函數最優化問題的算法。

「最小二乘法」和「梯度降低法」,前者用於「搜索最小偏差」,後者用於「用最快的速度搜索」,兩者經常配合使用。對最小二乘法的參數調優就轉變爲了求這個二元函數的極值問題,也就是說能夠應用「梯度降低法」了。

在最小二乘函數中,已擁有的條件是一些樣本點和樣本點的結果,就是矩陣X和每一條X樣本的lable值y。X是矩陣,y是向量。因此咱們要知道,梯度降低中求偏導數的未知數不是x和y,而是x的參數w

目標函數的基類是OptimObjFunc,其提供API 好比計算梯度,損失,hessian矩陣,以及依據採樣點更新梯度和hessian矩陣。 其幾個派生類以下,從註釋中能夠看到使用範圍。

咱們能夠看到正則化(regularization) L1, L2範數,這是相比損失函數增長的模塊。

public abstract class OptimObjFunc implements Serializable {
    protected final double l1;
    protected final double l2; // 正則化(regularization) L1, L2範數。
    protected Params params;   
  .....
}

// Unary loss object function.
public class UnaryLossObjFunc extends OptimObjFunc

// The OptimObjFunc for multilayer perceptron.
public class AnnObjFunc extends OptimObjFunc
  
// Accelerated failure time Regression object function.
public class AftRegObjFunc extends OptimObjFunc

// Softmax object function.
public class SoftmaxObjFunc extends OptimObjFunc

對於線性模型,BaseLinearModelTrainBatchOp 中會根據模型類型來生成目標函數,能夠看到在生成目標函數同時,也相應設置了不一樣的損失函數,其中 SquareLossFunc 就是咱們以前提到的。

public static OptimObjFunc getObjFunction(LinearModelType modelType, Params params) {
    OptimObjFunc objFunc;
    // For different model type, we must set corresponding loss object function.
    switch (modelType) {
        case LinearReg:
            // 咱們這裏!
            objFunc = new UnaryLossObjFunc(new SquareLossFunc(), params);
            break;
        case SVR:
            double svrTau = params.get(LinearSvrTrainParams.TAU);
            objFunc = new UnaryLossObjFunc(new SvrLossFunc(svrTau), params);
            break;
        case LR:
            objFunc = new UnaryLossObjFunc(new LogLossFunc(), params);
            break;
        case SVM:
            objFunc = new UnaryLossObjFunc(new SmoothHingeLossFunc(), params);
            break;
        case Perceptron:
            objFunc = new UnaryLossObjFunc(new PerceptronLossFunc(), params);
            break;
        case AFT:
            objFunc = new AftRegObjFunc(params);
            break;
        default:
            throw new RuntimeException("Not implemented yet!");
    }
    return objFunc;
}

一元目標函數就是咱們線性迴歸用到的目標函數,其只有一個新增變量 :unaryLossFunc。就是一元損失函數。

/**
 * Unary loss object function.
 */
public class UnaryLossObjFunc extends OptimObjFunc {
    private UnaryLossFunc unaryLossFunc;
}

一元目標函數提供了不少功能,咱們這裏用到主要是:

  • calcGradient :根據一組採樣點計算梯度,這是從基類OptimObjFunc集成的。
  • updateGradient :根據一個採樣點更新梯度;
  • calcSearchValues :爲線性搜索計算損失;

4.2.4.1 依據一組採樣點計算梯度

對於本文,這裏更新的是損失函數的梯度。

再次囉嗦下,損失函數用來度量擬合的程度,從而評估模型擬合的好壞,記爲 J(θ)。注意,損失函數是關於 θ 的函數!也就是說,對於損失函數來說,θ再也不是函數的參數,而是損失函數的自變量!

當咱們計算損失時,是將每一個樣本中的特徵 xi 和對應的目標變量真實值 yi 帶入損失函數,此時,損失函數中就只剩下 θ 是未知的。

損失函數的梯度即對 θi 求偏導,因爲損失函數是關於 θ 的函數,所以,θ 的取值不一樣,得出來的的梯度向量也是不一樣的。借用「下山」的比喻來解釋,θ 的不一樣取值,至關於處於山上的不一樣位置,每個位置都會計算出一個梯度向量▽J(θ)。

這裏的 l1, l2 就是以前提到的正則化(regularization) L1, L2範數。

/**
 * Calculate gradient by a set of samples.
 *
 * @param labelVectors train data.
 * @param coefVector   coefficient of current time.
 * @param grad         gradient.
 * @return weight sum
 */
public double calcGradient(Iterable<Tuple3<Double, Double, Vector>> labelVectors,
                           DenseVector coefVector, DenseVector grad) {
    double weightSum = 0.0;
    for (int i = 0; i < grad.size(); i++) {
        grad.set(i, 0.0);
    }
  
// 對輸入的樣本集合labelVectors逐個計算梯度  
    for (Tuple3<Double, Double, Vector> labelVector : labelVectors) {
        if (labelVector.f2 instanceof SparseVector) {
           ((SparseVector)(labelVector.f2)).setSize(coefVector.size());
        }
      
// 以這個樣本爲例 
labelVector = {Tuple3@9895} "(1.0,16.8,1.0 1.0 1.4657097546055162 1.4770978917519928)"
 f0 = {Double@9903} 1.0
 f1 = {Double@9904} 16.8
 f2 = {DenseVector@9905} "1.0 1.0 1.4657097546055162 1.4770978917519928"
  
        weightSum += labelVector.f0; // labelVector.f0是權重
        updateGradient(labelVector, coefVector, grad);
    }
    if (weightSum > 0.0) {
        grad.scaleEqual(1.0 / weightSum);
    }
// l2正則化    
    if (0.0 != this.l2) {
        grad.plusScaleEqual(coefVector, this.l2 * 2);
    }
// l1正則化   
    if (0.0 != this.l1) {
        double[] coefArray = coefVector.getData();
        for (int i = 0; i < coefVector.size(); i++) {
            grad.add(i, Math.signum(coefArray[i]) * this.l1);
        }
    }
    return weightSum;
}

4.2.4.2 根據一個採樣點更新梯度

這裏 labelVector.f0是權重,labelVector.f1是 y,labelVector.f2是 x-vec 四維向量,coefVector是w係數向量。

  • getEta是點積,即 x向量 與 當前w係數的點積,就是當前計算的 y。
  • labelVector.f0 * unaryLossFunc.derivative(eta, labelVector.f1); 就是調用SquareLossFunc.derivative 函數來計算一階導數。
  • updateGrad.plusScaleEqual(labelVector.f2, div); 就是在原有梯度基礎上更新梯度
public class UnaryLossObjFunc extends OptimObjFunc {
    /**
     * Update gradient by one sample.
     *
     * @param labelVector a sample of train data.
     * @param coefVector  coefficient of current time.
     * @param updateGrad  gradient need to update.
     */
    @Override
    protected void updateGradient(Tuple3<Double, Double, Vector> labelVector, DenseVector coefVector, DenseVector updateGrad) {
        // 點積,就是當前計算出來的y
        double eta = getEta(labelVector, coefVector); 
        // 一階導數。labelVector.f0是權重
        double div = labelVector.f0 * unaryLossFunc.derivative(eta, labelVector.f1); 
        // 點乘以後還須要相加。labelVector.f2 就是x—vec,好比 1.0 1.0 1.4657097546055162 1.4770978917519928
        updateGrad.plusScaleEqual(labelVector.f2, div);
    }
  
    private double getEta(Tuple3<Double, Double, Vector> labelVector, DenseVector coefVector) {
        // 點積,表示第 i 次迭代中節點上的第 k 個特徵向量與特徵權重份量的點乘。coefVector中第 c 項表示爲第 i 次迭代中特徵權重向量在第 c 列節點上的份量
        return MatVecOp.dot(labelVector.f2, coefVector);
    }
}

/**
* Plus with another vector scaled by "alpha".
*/
public void plusScaleEqual(Vector other, double alpha) {
	if (other instanceof DenseVector) {
		BLAS.axpy(alpha, (DenseVector) other, this);
	} else {
		BLAS.axpy(alpha, (SparseVector) other, this);
	}
}

4.3 優化函數

Alink中提供給了一系列並行優化函數,好比GD, SGD, LBFGS, OWLQN, NEWTON method。

其基類是Optimizer。

public abstract class Optimizer {
    protected final DataSet<?> objFuncSet; // 具體目標函數,計算梯度和損失
    protected final DataSet<Tuple3<Double, Double, Vector>> trainData; //訓練數據
    protected final Params params; //參數
    protected DataSet<Integer> coefDim; //dimension of features.
    protected DataSet<DenseVector> coefVec = null; //最終係數w
    .......
}

線性迴歸主要用到了LBFGS算法。

public class Lbfgs extends Optimizer

具體調用以下

public static DataSet<Tuple2<DenseVector, double[]>> optimize(.....) {
    // Loss object function
    DataSet<OptimObjFunc> objFunc = session.getExecutionEnvironment()
        .fromElements(getObjFunction(modelType, params));

    if (params.contains(LinearTrainParams.OPTIM_METHOD)) {
        LinearTrainParams.OptimMethod method = params.get(LinearTrainParams.OPTIM_METHOD);
        return OptimizerFactory.create(objFunc, trainData, coefficientDim, params, method).optimize();
    } else if (params.get(HasL1.L_1) > 0) {
        return new Owlqn(objFunc, trainData, coefficientDim, params).optimize();
    } else {
        // 咱們的程序將運行到這裏
        return new Lbfgs(objFunc, trainData, coefficientDim, params).optimize();
    }
}

機器學習基本優化套路是:

準備數據 ----> 優化函數 ----> 目標函數 ----> 損失函數

對應咱們這裏是

BaseLinearModelTrainBatchOp.linkFrom(總體邏輯) -----> Lbfgs(繼承Optimizer) ----> UnaryLossObjFunc(繼承OptimObjFunc) ----> SquareLossFunc(繼承UnaryLossFunc)

0x05 數據準備

看完完底層功能,咱們再次回到線性迴歸整體流程。

總結 BaseLinearModelTrainBatchOp.linkFrom 的基本流程以下:(發現某些媒體對於列表排版支持很差,因此加上序號)。

首先再給出輸入一個例子:Row.of("$3$0:1.0 1:7.0 2:9.0", "1.0 7.0 9.0", 1.0, 7.0, 9.0, 16.8),這裏後面 4 項對應列名是 "f0", "f1", "f2", "label"

  • 1)獲取到label的信息,包括label數值和種類。 labelInfo = getLabelInfo() 這裏有一個 distinct 操做,因此會去重。最後獲得label的可能取值範圍 :0,1,類型是 Double。
  • 2)用transform函數把輸入轉換成三元組Tuple3<weight, label, feature vector>。具體說,會把輸入中的三個特徵"f0", "f1", "f2" 轉換爲一個向量 vec, 咱們之後稱之爲x-vec。重點就在於特徵變成了一個向量。因此這個三元組能夠認爲是 <權重, y-value, x-vec>
  • 3)用statInfo = getStatInfo() 獲取統計變量,包括vector size, mean和variance。這裏流程比較複雜。
    • 3.1)用trainData.map{return value.f2;}來獲取訓練數據中的 x-vec。
    • 3.2)調用StatisticsHelper.summary來對 x-vec 作處理
      • 3.2.1)調用 summarizer
        • 3.2.1.1)調用 mapPartition(new VectorSummarizerPartition(bCov))
          • 3.2.1.1.1)調用VectorSummarizerPartition.mapPartition,其遍歷列表,列表中的每個變量 sv 是 x-vec。srt = srt.visit(sv),會根據每個新輸入從新計算count,sum,squareSum,normL1..,這樣就獲得了本partiton中輸入每列的這些統計數值。
        • 3.2.1.2)調用 reduce(VectorSummarizerUtil.merge(value1, value2)) 來歸併每個partition的結果。
      • 3.2.2)調用map(BaseVectorSummarizer summarizer),其實調用到DenseVectorSummarizer,就是生成一個DenseVectorSummary向量,裏面是count,sum,squareSum,normL1,min,max,numNonZero。
    • 3.3)調用 coefficientDim = summary.map
    • 3.4)調用 meanVar = coefficientDim.map,最後獲得 Tuple2.of(coefficientDim, meanVar)
  • 4)preProcess(initData, params, statInfo.f1) 用3) 計算的結果 對輸入數據作標準化和插值 standardization and interception。上面獲得的 meanVar 將會做爲參數傳入。這裏是對 x-vec 作標準化。好比原始輸入Row是"(1.0,16.8,1.0 7.0 9.0)",其中 x-vec 是"1.0 7.0 9.0",進行標準化以後,x-vec 變成了 4 項 :{ 第1項是固定值 "1.0 ", 因此4 項 是 "1.0 1.0 1.4657097546055162 1.4770978917519928" },因此轉換後的Row是"(1.0,16.8,1.0 1.0 1.4657097546055162 1.4770978917519928)"。即weight 是1.0,y-value是16.8,後續4個是x-vec。
  • 以上完成了對數據的處理。
  • 5)調用 optimize(params, statInfo.f0, trainData, linearModelType) 經過對損失函數求最小值從而對模型優化。(使用L-BFGS算法,會單獨拿出來說解)
  • 6)調用 mapPartition(new CreateMeta()) 來準備模型元數據。
  • 7)調用 mapPartition(new BuildModelFromCoefs) 來創建模型。

能夠看到,數據準備佔據了很大部分,下面咱們看看數據準備的幾個步驟。

5.1 獲取label信息

此處代碼對應上面基本流程的 1)

由於以前有一個distinct操做,因此會去重。最後獲得label的可能取值範圍 :0,1,類型是 Double。

private Tuple2<DataSet<Object>, TypeInformation> getLabelInfo(BatchOperator in,
                                                                  Params params,
                                                                  boolean isRegProc) {
        String labelName = params.get(LinearTrainParams.LABEL_COL);
        // Prepare label values
        DataSet<Object> labelValues;
        TypeInformation<?> labelType = null;
        if (isRegProc) {
            // 由於是迴歸,因此是這裏
            labelType = Types.DOUBLE;
            labelValues = MLEnvironmentFactory.get(in.getMLEnvironmentId())
                .getExecutionEnvironment().fromElements(new Object());
        } else {
          .....
        }
        return Tuple2.of(labelValues, labelType);
}

5.2 把輸入轉換成三元組

此處代碼對應上面基本流程的 2) 。

用transform函數把輸入轉換成三元組Tuple3<weight, label, feature vector>。具體說,會把輸入中的三個特徵"f0", "f1", "f2" 轉換爲一個向量 vec, 咱們之後稱之爲x-vec。重點就在於特徵變成了一個向量。因此這個三元組能夠認爲是 <權重, y-value, x-vec>

private DataSet<Tuple3<Double, Double, Vector>> transform(BatchOperator in,
                                                          Params params,
                                                          DataSet<Object> labelValues,
                                                          boolean isRegProc) {
    ......
    // 獲取Schema
    TableSchema dataSchema = in.getSchema();
    // 獲取各類index 
    int labelIdx = TableUtil.findColIndexWithAssertAndHint(dataSchema.getFieldNames(), labelName);
    ......
    int weightIdx = weightColName != null ? TableUtil.findColIndexWithAssertAndHint(in.getColNames(), weightColName) : -1;
    int vecIdx = vectorColName != null ? TableUtil.findColIndexWithAssertAndHint(in.getColNames(), vectorColName) : -1;
    // 用transform函數把輸入轉換成三元組Tuple3<weight, label, feature vector>
    return in.getDataSet().map(new Transform(isRegProc, weightIdx, vecIdx, featureIndices, labelIdx)).withBroadcastSet(labelValues, LABEL_VALUES);
}

這裏對應的變量打印出來爲

params = {Params@2745} "Params {featureCols=["f0","f1","f2"], labelCol="label", predictionCol="linpred"}"
labelValues = {DataSource@2845} 
isRegProc = true
featureColNames = {String[3]@2864} 
 0 = "f0"
 1 = "f1"
 2 = "f2"
labelName = "label"
weightColName = null
vectorColName = null
dataSchema = {TableSchema@2866} "root\n |-- svec: STRING\n |-- vec: STRING\n |-- f0: DOUBLE\n |-- f1: DOUBLE\n |-- f2: DOUBLE\n |-- label: DOUBLE\n"
featureIndices = {int[3]@2878} 
 0 = 2
 1 = 3
 2 = 4
labelIdx = 5
weightIdx = -1
vecIdx = -1

具體在runtime時候,會進入到Transform.map函數。咱們能夠看到,會把輸入中的三個特徵"f0", "f1", "f2",轉換爲一個向量 vec, 咱們之後稱之爲x-vec。

private static class Transform extends RichMapFunction<Row, Tuple3<Double, Double, Vector>> {
    @Override
    public Tuple3<Double, Double, Vector> map(Row row) throws Exception {
        // 獲取權重
        Double weight = weightIdx != -1 ? ((Number)row.getField(weightIdx)).doubleValue() : 1.0;
        // 獲取label
        Double val = FeatureLabelUtil.getLabelValue(row, this.isRegProc,
            labelIdx, this.positiveLableValueString);
        if (featureIndices != null) {
            // 獲取x-vec
            DenseVector vec = new DenseVector(featureIndices.length);
            for (int i = 0; i < featureIndices.length; ++i) {
                vec.set(i, ((Number)row.getField(featureIndices[i])).doubleValue());
            }
            // 構建三元組
            return Tuple3.of(weight, val, vec);
        } else {
            Vector vec = VectorUtil.getVector(row.getField(vecIdx));
            return Tuple3.of(weight, val, vec);
        }
    }
}

若是對應原始輸入 Row.of("$3$0:1.0 1:7.0 2:9.0", "1.0 7.0 9.0", 1.0, 7.0, 9.0, 16.8), ,則程序中各類變量爲:

row = {Row@9723} "$3$0:1.0 1:7.0 2:9.0,1.0 7.0 9.0,1.0,7.0,9.0,16.8"
weight = {Double@9724} 1.0
val = {Double@9725} 16.8
vec = {DenseVector@9729} "1.0 7.0 9.0"
vecIdx = -1
featureIndices = {int[3]@9726} 
 0 = 2
 1 = 3
 2 = 4

5.3 獲取統計變量

用getStatInfo() 對輸入數據作標準化和插值 standardization and interception。

此處代碼對應上面基本流程的 3)

  1. 用statInfo = getStatInfo() 獲取統計變量,包括vector size, mean和variance。這裏流程比較複雜。
  • 3.1)用trainData.map{return value.f2;}來獲取訓練數據中的 x-vec。
  • 3.2)調用StatisticsHelper.summary來對 x-vec 作處理
    • 3.2.1)調用 summarizer
      • 3.2.1.1)調用 mapPartition(new VectorSummarizerPartition(bCov))
        • 3.2.1.1.1)調用VectorSummarizerPartition.mapPartition,其遍歷列表,列表中的每個變量 sv 是 x-vec。srt = srt.visit(sv),會根據每個新輸入從新計算count,sum,squareSum,normL1..,這樣就獲得了本partiton中輸入每列的這些統計數值。
      • 3.2.1.2)調用 reduce(VectorSummarizerUtil.merge(value1, value2)) 來歸併每個partition的結果。
    • 3.2.2)調用map(BaseVectorSummarizer summarizer),其實調用到DenseVectorSummarizer,就是生成一個DenseVectorSummary向量,裏面是count,sum,squareSum,normL1,min,max,numNonZero。
  • 3.3)調用 coefficientDim = summary.map
  • 3.4)調用 meanVar = coefficientDim.map,最後獲得 Tuple2.of(coefficientDim, meanVar)
private Tuple2<DataSet<Integer>, DataSet<DenseVector[]>> getStatInfo(
    DataSet<Tuple3<Double, Double, Vector>> trainData, final boolean standardization) {
    if (standardization) {
        DataSet<BaseVectorSummary> summary = StatisticsHelper.summary(trainData.map(
            new MapFunction<Tuple3<Double, Double, Vector>, Vector>() {
                @Override
                public Vector map(Tuple3<Double, Double, Vector> value) throws Exception {
                    return value.f2; //獲取訓練數據中的 x-vec
                }
            }).withForwardedFields());

        DataSet<Integer> coefficientDim = summary.map(new MapFunction<BaseVectorSummary, Integer>() {
            public Integer map(BaseVectorSummary value) throws Exception {
                return value.vectorSize(); // 獲取dimension
            }
        });

        DataSet<DenseVector[]> meanVar = summary.map(new MapFunction<BaseVectorSummary, DenseVector[]>() {
            public DenseVector[] map(BaseVectorSummary value) {
                if (value instanceof SparseVectorSummary) {
                    // 計算min, max
                    DenseVector max = ((SparseVector)value.max()).toDenseVector();
                    DenseVector min = ((SparseVector)value.min()).toDenseVector();
                    for (int i = 0; i < max.size(); ++i) {
                        max.set(i, Math.max(Math.abs(max.get(i)), Math.abs(min.get(i))));
                        min.set(i, 0.0);
                    }
                    return new DenseVector[] {min, max};
                } else {
                    // 計算standardDeviation
                    return new DenseVector[] {(DenseVector)value.mean(),
                        (DenseVector)value.standardDeviation()};
                }
            }
        });
        return Tuple2.of(coefficientDim, meanVar);
    } 
}

5.4 對輸入數據作標準化和插值

這裏對應基本流程的 4) 。

對輸入數據作標準化和插值 standardization and interception。上面獲得的 meanVar 做爲參數傳入。這裏是對 x-vec 作標準化。

好比原始輸入Row是"(1.0,16.8,1.0 7.0 9.0)",其中 x-vec 是"1.0 7.0 9.0",進行標準化以後,x-vec 變成了 4 項,第一項是固定值 "1.0 ", 4 項 是 "1.0 1.0 1.4657097546055162 1.4770978917519928",因此轉換後的Row是"(1.0,16.8,1.0 1.0 1.4657097546055162 1.4770978917519928)"

爲何第一項是固定值 "1.0 " ?由於按照線性模型 f(x)=w^Tx+b,咱們應該得出一個常數 b,這裏設定 "1.0 ",就是 b 的初始值。

private DataSet<Tuple3<Double, Double, Vector>> preProcess(
    return initData.map(
        new RichMapFunction<Tuple3<Double, Double, Vector>, Tuple3<Double, Double, Vector>>() {
            private DenseVector[] meanVar;

            @Override
            public Tuple3<Double, Double, Vector> map(Tuple3<Double, Double, Vector> value){
// value = {Tuple3@9791} "(1.0,16.8,1.0 7.0 9.0)"
                Vector aVector = value.f2;
// aVector = {DenseVector@9792} "1.0 7.0 9.0"
                if (aVector instanceof DenseVector) {
                    DenseVector bVector;
                    if (standardization) {
                        if (hasInterceptItem) {
                            bVector = new DenseVector(aVector.size() + 1);
                            bVector.set(0, 1.0); // 設定了固定值
                            for (int i = 0; i < aVector.size(); ++i) {
                                // 對輸入數據作標準化和插值
                                bVector.set(i + 1, (aVector.get(i) - meanVar[0].get(i)) / meanVar[1].get(i));
                            }
                        } 
                    } 
// bVector = {DenseVector@9814} "1.0 1.0 1.4657097546055162 1.4770978917519928"
                    return Tuple3.of(value.f0, value.f1, bVector);
                } 
            }
        }).withBroadcastSet(meanVar, MEAN_VAR);
}

// 這裏是對 x-vec 作標準化。好比原始輸入Row是"(1.0,16.8,1.0 7.0 9.0)",其中 x-vec 是"1.0 7.0 9.0",進行標準化以後,x-vec 變成了 4 項,第一項是 "1.0 ",是 "1.0 1.0 1.4657097546055162 1.4770978917519928",因此轉換後的Row是"(1.0,16.8,1.0 1.0 1.4657097546055162 1.4770978917519928)"

至此,輸入處理完畢。

好比原始輸入Row是"(1.0,16.8,1.0 7.0 9.0)",其中 x-vec 是"1.0 7.0 9.0"。

進行標準化以後,x-vec 變成了 4 項 :{ 第1項是固定值 "1.0 ", 因此4 項 是 "1.0 1.0 1.4657097546055162 1.4770978917519928" },

轉換後的Row是"(1.0,16.8,1.0 1.0 1.4657097546055162 1.4770978917519928)"。即weight 是1.0,y-value是16.8,後續4個是x-vec。

下面咱們能夠開始進行優化模型了,敬請期待下文。

0xFF 參考

終於理解了方向導數與梯度

導數,方向導數,梯度(Gradient)與梯度降低法(Gradient Descent)的介紹(非原創)

梯度向量與梯度降低法

直觀理解梯度,以及偏導數、方向導數和法向量等

梯度(Gradient)與梯度降低法(Gradient Descent)

梯度與梯度降低法

梯度降低算法過程詳細解讀

https://www.zhihu.com/question/25627482/answer/321719657)

Hessian矩陣以及在圖像中的應用

https://blog.csdn.net/weixin_39445556/article/details/84502260)

《分佈式機器學習算法、理論與實踐_劉鐵巖》

https://zhuanlan.zhihu.com/p/29672873)

https://www.zhihu.com/question/36425542

https://zhuanlan.zhihu.com/p/32821110)

https://blog.csdn.net/hei653779919/article/details/106409818)

CRF L-BFGS Line Search原理及代碼分析

步長與學習率

https://blog.csdn.net/IMWTJ123/article/details/88709023)

線性迴歸、梯度降低(Linear Regression、Gradient Descent)

機器學習系列(三)——目標函數和損失函數

相關文章
相關標籤/搜索