Alink漫談(十一) :線性迴歸 之 L-BFGS優化

Alink漫談(十一) :線性迴歸 之 L-BFGS優化

0x00 摘要

Alink 是阿里巴巴基於實時計算引擎 Flink 研發的新一代機器學習算法平臺,是業界首個同時支持批式算法、流式算法的機器學習平臺。本文介紹了線性迴歸的L-BFGS優化在Alink是如何實現的,但願能夠做爲你們看線性迴歸代碼的Roadmap。html

由於Alink的公開資料太少,因此如下均爲自行揣測,確定會有疏漏錯誤,但願你們指出,我會隨時更新。java

本系列目前已有十一篇,歡迎你們指點python

Alink漫談(一) : 從KMeans算法實現不一樣看Alink設計思想web

Alink漫談(二) : 從源碼看機器學習平臺Alink設計和架構算法

[Alink漫談之三] AllReduce通訊模型api

Alink漫談(四) : 模型的前因後果網絡

Alink漫談(五) : 迭代計算和Superstep架構

Alink漫談(六) : TF-IDF算法的實現app

Alink漫談(七) : 如何劃分訓練數據集和測試數據集機器學習

Alink漫談(八) : 二分類評估 AUC、K-S、PRC、Precision、Recall、LiftChart 如何實現

Alink漫談(九) :特徵工程 之 特徵哈希/標準化縮放

Alink漫談(十) :線性迴歸實現 之 數據預處理

0x01 回顧

到目前爲止,已經處理完畢輸入,接下來就是優化。優化的主要目標是找到一個方向,參數朝這個方向移動以後使得損失函數的值可以減少,這個方向每每由一階偏導或者二階偏導各類組合求得。 因此咱們再次複習下基本思路。

1.1 優化基本思路

對於線性迴歸模型 f(x) = w'x+e,咱們構造一個Cost函數(損失函數)J(θ),而且經過找到 J(θ) 函數的最小值,就能夠肯定線性模型的係數 w 了。

最終的優化函數是:min(L(Y, f(x)) + J(x)) ,即最優化經驗風險和結構風險,而這個函數就被稱爲目標函數

咱們要作的是依據咱們的訓練集,選取最優的θ,在咱們的訓練集中讓f(x)儘量接近真實的值。咱們定義了一個函數來描述 「f(x)和真實的值y之間的差距「,這個函數稱爲目標函數,表達式以下:

\[J(θ)≈\frac{1}{2} \sum_{i≈1}^m(f_θ(x^{(i)} — y^{(i)})^2\\ \]

這裏的目標函數就是著名的最小二乘函數

咱們要選擇最優的θ,使得f(x)最近進真實值。這個問題就轉化爲求解最優的θ,使目標函數 J(θ) 取最小值。

1.2 各種優化方法

尋找合適的W令目標函數f(W) 最小,是一個無約束最優化問題,解決這個問題的通用作法是隨機給定一個初始的W0,經過迭代,在每次迭代中計算目標函數的降低方向並更新W,直到目標函數穩定在最小的點。

不一樣的優化算法的區別就在於目標函數降低方向Dt的計算。降低方向是經過對目標函數在當前的W下求一階倒數(梯度,Gradient)和求二階導數(海森矩陣,Hessian Matrix)獲得。常見的算法有梯度降低法、牛頓法、擬牛頓法。

  • 梯度降低法直接採用目標函數在當前W的梯度的反方向做爲降低方向。
  • 牛頓法是在當前W下,利用二次泰勒展開近似目標函數,而後利用該近似函數來求解目標函數的降低方向。其中Bt爲目標函數f(W)Wt處的海森矩陣。這個搜索方向也稱做牛頓方向。
  • 擬牛頓法只要求每一步迭代中計算目標函數的梯度,經過擬合的方式找到一個近似的海森矩陣用於計算牛頓方向。
  • L-BFGS(Limited-memory BFGS)則是解決了BFGS中每次迭代後都須要保存N*N階海森逆矩陣的問題,只須要保存每次迭代的兩組向量和一組標量便可。

Alink中,UnaryLossObjFunc是目標函數,SquareLossFunc 是損失函數,使用L-BFGS算法對模型進行優化

即 優化方向由擬牛頓法L-BFGS搞定(具體如何弄就是看f(x)的泰勒二階展開),損失函數最小值是平方損失函數來計算。

0x02 基本概念

由於L-BFGS算法是擬牛頓法的一種,因此咱們先從牛頓法的本質泰勒展開開始介紹。

2.1 泰勒展開

泰勒展開是但願基於某區間一點x_0展開,用一組簡單的冪函數來近似一個複雜的函數f(x)在該區間的局部。泰勒展開的應用場景例如:咱們很難直接求得sin(1)的值,但咱們能夠經過sin的泰勒級數求得sin(1)的近似值,且展開項越多,精度越高。計算機通常都是把 sin(x)進行泰勒展開進行計算的。

泰勒當年爲何要發明這條公式?

由於當時數學界對簡單函數的研究和應用已經趨於成熟,而複雜函數,好比:f(x) = sin(x)ln(1+x) 這種一看就頭疼的函數,還有那種根本就找不到表達式的曲線。除了代入一個x能夠獲得它的y,就啥事都很難幹了。因此泰勒同窗就迎難而上!決定讓這些式子通通現出原形,通通變簡單。

要讓一個複雜函數變簡單,能不能把它轉換成別的表達式?泰勒公式一句話描述:就是用多項式函數去逼近光滑函數。即,根據「以直代曲、化整爲零」的數學思想,產生了泰勒公式。

泰勒公式經過把【任意函數表達式】轉換(重寫)爲【多項式】形式,是一種極其強大的函數近似工具。爲何說它強大呢?

  • 多項式很是【友好】,三易,易計算,易求導,易積分
  • 幾何感受和計算感受都很直觀,如拋物線和幾回方就是底數本身乘本身乘幾回

如何通俗推理?

泰勒公式乾的事情就是:使用多項式表達式估計(近似) 在 附近的值

當咱們想要仿造一個東西的時候,無形之中都會按照以下思路,即先保證大致上類似,再保證局部類似,再保證細節類似,再保證更細微的地方類似……不斷地細化下去,無窮次細化之後,仿造的東西將無限接近真品。真假難辨。

物理學家得出結論:把生活中關於「仿造」的經驗運用到運動學問題中,若是想仿造一段曲線,那麼首先應該保證曲線的起始點同樣,其次保證起始點處位移隨時間的變化率同樣(速度相同),再次應該保證前二者相等的同時關於時間的二階變化率同樣(加速度相同)……若是隨時間每一階變化率(每一階導數)都同樣,那這倆曲線確定是徹底等價的。

因此若是泰勒想一個辦法讓本身避免接觸 sin(x)這類函數,即把這類函數替換掉。 就能夠根據這類函數的圖像,仿造一個圖像,與原來的圖像相相似,這種行爲在數學上叫近似。不扯這個名詞。講講如何仿造圖像。

仿造的第一步,就是讓仿造的曲線也過這個點。

完成了仿造的第一步,很粗糙,甚至徹底看不出來這倆有什麼類似的地方,那就繼續細節化。開始考慮曲線的變化趨勢,即導數,保證在此處的導數相等。

經歷了第二步,如今起始點相同了,總體變化趨勢相近了,可能看起來有那麼點意思了。想進一步精確化,應該考慮凹凸性。高中學過:表徵圖像的凹凸性的參數爲「導數的導數」。因此,下一步就讓兩者的導數的導數相等。

起始點相同,增減性相同,凹凸性相同後,仿造的函數更像了。若是再繼續細化下去,應該會無限接近。因此泰勒認爲「仿造一段曲線,要先保證起點相同,再保證在此處導數相同,繼續保證在此處的導數的導數相同……

泰勒展開式就是把一個三角函數或者指數函數或者其餘比較難纏的函數用多項式替換掉。

也就是說,有一個原函數f(x) ,我再造一個圖像與原函數圖像類似的多項式函數 g(x) ,爲了保證類似,我只須要保證這倆函數在某一點的初始值相等,1階導數相等,2階導數相等,……n階導數相等

2.2 牛頓法

牛頓法的基本思路是,在現有極小點估計值的附近對 f(x) 作二階泰勒展開,進而找到極小點的下一個估計值。其核心思想是利用迭代點 x_k 處的一階導數(梯度)和二階導數(Hessen矩陣)對目標函數進行二次函數近似,而後把二次模型的極小點做爲新的迭代點,並不斷重複這一過程,直至求得知足精度的近似極小值。

梯度降低算法是將函數在 xn 位置進行一次函數近似,也就是一條直線。計算梯度,從而決定下一步優化的方向是梯度的反方向。而牛頓法是將函數在 xn 位置進行二階函數近似,也就是二次曲線。計算梯度和二階導數,從而決定下一步的優化方向。

咱們要優化的都是多元函數,x每每不是一個實數,而是一個向量。因此f(x) 的一階導數也是一個向量,再求導的二階導數是一個矩陣,就是Hessian矩陣。

2.2.1 泰勒一階展開

牛頓法求根能夠按照泰勒一階展開。例如對於方程 f(x) = 0,咱們在任意一點 x0 處,進行一階泰勒展開:

\[f(x) = f(x_0) + (x - x_0)f^{ '}(x_0) \]

令 f(x) = 0,帶入上式,便可獲得:

\[x = x_0 - \frac{f(x_0)}{f^{'}(x_0)} \]

注意,這裏使用了近似,獲得的 x 並非方程的根,只是近似解。可是,x 比 x0 更接近於方程的根。

而後,利用迭代方法求解,以 x 爲 x0,求解下一個距離方程的根更近的位置。迭代公式能夠寫成:

\[x_{n+1} = x_n - \frac{f(x_n)}{f^{'}(x_n)} \]

2.2.2 泰勒二階展開

機器學習、深度學習中,損失函數的優化問題通常是基於一階導數梯度降低的。如今,從另外一個角度來看,想要讓損失函數最小化,這實際上是一個最值問題,對應函數的一階導數 f'(x) = 0。也就是說,若是咱們找到了能讓 f'(x) = 0 的點 x,損失函數取得最小值,也就實現了模型優化目標

如今的目標是計算 f’(x) = 0 對應的 x,即 f’(x) = 0 的根。轉化爲求根問題,就能夠利用上一節的牛頓法了。

與上一節有所不一樣,首先,對 f(x) 在 x0 處進行二階泰勒展開:

\[f(x) = f(x_0) + (x - x_0)f^{'}(x_0) + \frac{1}{2}(x-x_0)^2f^{''}(x_0) \]

上式成立的條件是 f(x) 近似等於 f(x0)。令 f(x) = f(x0),並對 (x - x0) 求導,可得:

\[x = x_0 - \frac{f^{'}(x_0)}{f^{''}(x_0)} \]

一樣,雖然 x 並非最優解點,可是 x 比 x0 更接近 f'(x) = 0 的根。這樣,就能獲得最優化的迭代公式:

\[x_{n+1} = x_n - \frac{f^{'}(x_n)}{f^{''}(x_n)} \]

經過迭代公式,就能不斷地找到 f'(x) = 0 的近似根,從而也就實現了損失函數最小化的優化目標。

2.2.3 高維空間

在機器學習的優化問題中,咱們要優化的都是多元函數,因此x每每不是一個實數,而是一個向量。因此將牛頓求根法利用到機器學習中時,x是一個向量,f(x) 的一階導數也是一個向量,再求導的二階導數是一個矩陣,就是Hessian矩陣。

在高維空間,咱們用梯度替代一階導數,用Hessian矩陣替代二階導數,牛頓法的迭代公式不變:

\[x_{k+1} = x_k - [Hf(x_k)^{-1}].J_f(x_k) \]

其中 J 定義爲 雅克比矩陣,對應一階偏導數。

推導以下 :

咱們假設f(x)是二階可微實函數,把f(x)在xk處Taylor展開並取二階近似爲

\[\begin{aligned}&f(x)≈f(x^k)+∇f(x^k)T(x−x^k)+\frac{1}{2}(x−x^k)^T∇^2f(x^k)(x−x^k)\\&x^k爲當前的極小值估計值\\&∇f(x^k)是f(x)在x^k處的一階導數\\&∇^2f(x) 是f(x)在x^k處的Hessen矩陣。\\\end{aligned} \]

咱們的目標是求f(x)的最小值,而導數爲0的點極有可能爲極值點,故在此對f(x)求導,並令其導數爲0,即∇f(x)=0,可得

\[∇f(x)=∇f(x^k)+∇^2f(x^k)(x−x^k)=0 \]

設∇2f(x)可逆,由(2)能夠獲得牛頓法的迭代公式

\[\begin{aligned}&x^{k+1}=x^k−∇^2f(x^k)^{−1}∇f(x^k) \\&d= −∇^2f(x^k)^{−1}∇f(x^k)被稱爲牛頓方向 \\\end{aligned} \]

當原函數存在一階二階連續可導時,能夠採用牛頓法進行一維搜索,收斂速度快,具備局部二階收斂速度。

2.2.4 牛頓法基本流程

總結(模仿)一下使用牛頓法求根的步驟:

​ a.已知函數的狀況下,隨機產生點.

​ b.由已知的點按照的公式進行k次迭代.

​ c.若是與上一次的迭代結果相同或者相差結果小於一個閾值時,本次結果就是函數的根.

僞代碼以下

def newton(feature, label, iterMax, sigma, delta):
    '''牛頓法
    input:  feature(mat):特徵
            label(mat):標籤
            iterMax(int):最大迭代次數
            sigma(float), delta(float):牛頓法中的參數
    output: w(mat):迴歸係數
    '''
    n = np.shape(feature)[1]
    w = np.mat(np.zeros((n, 1)))
    it = 0
    while it <= iterMax:
        g = first_derivativ(feature, label, w)  # 一階導數
        G = second_derivative(feature)  # 二階導數
        d = -G.I * g
        m = get_min_m(feature, label, sigma, delta, d, w, g)  # 獲得步長中最小的值m
        w = w + pow(sigma, m) * d
        it += 1       
    return w

2.2.5 問題點及解決

牛頓法不只須要計算 Hessian 矩陣,並且須要計算 Hessian 矩陣的逆。當數據量比較少的時候,運算速度不會受到大的影響。可是,當數據量很大,特別在深度神經網絡中,計算 Hessian 矩陣和它的逆矩陣是很是耗時的。從總體效果來看,牛頓法優化速度沒有梯度降低算法那麼快。因此,目前神經網絡損失函數的優化策略大多都是基於梯度降低。

另外一個問題是,若是某個點的Hessian矩陣接近奇異(條件數過大),逆矩陣會致使數值不穩定,甚至迭代可能不會收斂。

當x的維度特別多的時候,咱們想求得f(x) 的二階導數很困難,而牛頓法求駐點又是一個迭代算法,因此這個困難咱們還要面臨無限屢次,致使了牛頓法求駐點在機器學習中沒法使用。有沒有什麼解決辦法呢?

實際應用中,咱們一般不去求解逆矩陣,而是考慮求解Hessian矩陣的近似矩陣,最好是隻利用一階導數近似地獲得二階導數的信息,從而在較少的計算量下獲得接近二階的收斂速率。這就是擬牛頓法。

擬牛頓法的想法其實很簡單,就像是函數值的兩點之差能夠逼近導數同樣,一階導數的兩點之差也能夠逼近二階導數。幾何意義是求一階導數的「割線」,取極限時,割線會逼近切線,矩陣B就會逼近Hessian矩陣。

\[擬牛頓法,一樣能夠顧名思義,就是模擬牛頓法,用一個近似於∇^2f(x)^{−1}的矩陣H_{k+1}來替代∇^2f(x)^{−1} \]

2.3 擬牛頓法

擬牛頓法就是模擬出Hessen矩陣的構造過程,經過迭代來逼近。主要包括DFP擬牛頓法,BFGS擬牛頓法。擬牛頓法只要求每一步迭代時知道目標函數的梯度。

各類擬牛頓法使用迭代法分別近似海森矩陣的逆和它自身;

在各類擬牛頓法中,通常的構造Hk+1的策略是,H1一般選擇任意的一個n階對稱正定矩陣(通常爲I),而後經過不斷的修正Hk給出Hk+1,即

\[H_{k+1}=H_k+ΔH_k \\ ΔH_k稱爲校訂矩陣 \]

好比:BFGS法每次更新矩陣H(Hessian矩陣的逆矩陣)須要的是第k步的迭代點差s和梯度差y,第k+1步的H至關於須要從開始到第k步的所用s和y的值。

咱們要經過牛頓求駐點法和BFGS算法來求得一個函數的根,兩個算法都須要迭代,因此咱們乾脆讓他倆一塊兒迭代就行了。兩個算法都是慢慢逼近函數根,因此通過k次迭代之後,所獲得的解就是機器學習中目標函數導函數的根。這種兩個算法共同迭代的計算方式,咱們稱之爲On The Fly.

在BFGS算法迭代的第一步,單位矩陣與梯度 g 相乘,就等於梯度 g,形式上同梯度降低的表達式是相同的。因此BFGS算法能夠理解爲從梯度降低逐步轉換爲牛頓法求函數解的一個算法。

雖然咱們使用了BFGS算法來利用單位矩陣逐步逼近H矩陣,可是每次計算的時候都要存儲D矩陣,D矩陣有多大。呢。假設咱們的數據集有十萬個維度(不算特別大),那麼每次迭代所要存儲D矩陣的結果是74.5GB。咱們沒法保存如此巨大的矩陣內容,如何解決呢?使用L-BFGS算法.

2.4 L-BFGS算法

L-BFGS算法的基本思想是:算法只保存並利用最近m次迭代的曲率信息來構造海森矩陣的近似矩陣

咱們要經過牛頓求駐點法和BFGS算法來求得一個函數的根,兩個算法都須要迭代,因此咱們乾脆讓他倆一塊兒迭代就行了。兩個算法都是慢慢逼近函數根,因此通過k次迭代之後,所獲得的解就是機器學習中目標函數導函數的根。這種兩個算法共同迭代的計算方式,咱們稱之爲On The Fly.

在BFGS算法迭代的第一步,單位矩陣與梯度g相乘,就等於梯度g,形式上同梯度降低的表達式是相同的。因此BFGS算法能夠理解爲從梯度降低逐步轉換爲牛頓法求函數解的一個算法。

雖然咱們使用了BFGS算法來利用單位矩陣逐步逼近H矩陣,可是每次計算的時候都要存儲D矩陣,D矩陣有多大。呢。假設咱們的數據集有十萬個維度(不算特別大),那麼每次迭代所要存儲D矩陣的結果是74.5GB。咱們沒法保存如此巨大的矩陣內容,如何解決呢?使用L-BFGS算法.

咱們每一次對D矩陣的迭代,都是經過迭代計算sk和yk獲得的。咱們的內存存不下時候只能丟掉一些存不下的數據。假設咱們設置的存儲向量數爲100,當s和y迭代超過100時,就會扔掉第一個s和y。每多一次迭代就對應的扔掉最前邊的s和y。這樣雖然損失了精度,但確能夠保證使用有限的內存將函數的解經過BFGS算法求獲得。 因此L-BFGS算法能夠理解爲對BFGS算法的又一次近似或者逼近。

這裏不介紹數學論證,由於網上優秀文章有不少,這裏只是介紹工程實現。總結L-BFGS算法的大體步驟以下:

Step1:  選初始點x_0,存儲最近迭代次數m;
Step2:  k=0, H_0=I, r=f(x_0);
Step3:  根據更新的參數計算梯度和損失值,若是達到閾值,則返回最優解x_{k+1},不然轉Step4;
Step4:  計算本次迭代的可行梯度降低方向 p_k=-r_k;
Step5:  計算步長alpha_k,進行一維搜索;
Step6:  更新權重x;
Step7:  只保留最近m次的向量對;
Step8:  計算並保存 s_k, y_k
Step9:  用two-loop recursion算法求r_k;
k=k+1,轉Step3。

0x03 優化模型 -- L-BFGS算法

回到代碼,BaseLinearModelTrainBatchOp.optimize函數調用的是

return new Lbfgs(objFunc, trainData, coefficientDim, params).optimize();

優化後將返回線性模型的係數。

/**
 * optimizer api.
 *
 * @return the coefficient of linear problem.
 */
@Override
public DataSet <Tuple2 <DenseVector, double[]>> optimize() {

   /**
    * solving problem using iteration.
    * trainData is the distributed samples.
    * initCoef is the initial model coefficient, which will be broadcast to every worker.
    * objFuncSet is the object function in dataSet format
    * .add(new PreallocateCoefficient(OptimName.currentCoef)) allocate memory for current coefficient
    * .add(new PreallocateCoefficient(OptimName.minCoef))     allocate memory for min loss coefficient
    * .add(new PreallocateLossCurve(OptimVariable.lossCurve)) allocate memory for loss values
    * .add(new PreallocateVector(OptimName.dir ...))          allocate memory for dir
    * .add(new PreallocateVector(OptimName.grad))             allocate memory for grad
    * .add(new PreallocateSkyk())                             allocate memory for sK yK
    * .add(new CalcGradient(objFunc))                         calculate local sub gradient
    * .add(new AllReduce(OptimName.gradAllReduce))            sum all sub gradient with allReduce
    * .add(new CalDirection())                                get summed gradient and use it to calc descend dir
    * .add(new CalcLosses(objFunc, OptimMethod.GD))           calculate local losses for line search
    * .add(new AllReduce(OptimName.lossAllReduce))            sum all losses with allReduce
    * .add(new UpdateModel(maxIter, epsilon ...))             update coefficient
    * .setCompareCriterionOfNode0(new IterTermination())             judge stop of iteration
    */
   DataSet <Row> model = new IterativeComQueue()
      .initWithPartitionedData(OptimVariable.trainData, trainData)
      .initWithBroadcastData(OptimVariable.model, coefVec)
      .initWithBroadcastData(OptimVariable.objFunc, objFuncSet)
      .add(new PreallocateCoefficient(OptimVariable.currentCoef))
      .add(new PreallocateCoefficient(OptimVariable.minCoef))
      .add(new PreallocateLossCurve(OptimVariable.lossCurve, maxIter))
      .add(new PreallocateVector(OptimVariable.dir, new double[] {0.0, OptimVariable.learningRate}))
      .add(new PreallocateVector(OptimVariable.grad))
      .add(new PreallocateSkyk(OptimVariable.numCorrections))
      .add(new CalcGradient())
      .add(new AllReduce(OptimVariable.gradAllReduce))
      .add(new CalDirection(OptimVariable.numCorrections))
      .add(new CalcLosses(OptimMethod.LBFGS, numSearchStep))
      .add(new AllReduce(OptimVariable.lossAllReduce))
      .add(new UpdateModel(params, OptimVariable.grad, OptimMethod.LBFGS, numSearchStep))
      .setCompareCriterionOfNode0(new IterTermination())
      .closeWith(new OutputModel())
      .setMaxIter(maxIter)
      .exec();

   return model.mapPartition(new ParseRowModel());
}

因此咱們接下來的就是看Lbfgs,其重點就是參數更新的降低方向和搜索步長。

3.1 如何分佈式實施

若是因爲全部輸入的數據都是相同維度的;算法過程當中不會對輸入修改,就能夠將這些輸入數據進行切分。這樣的話,應該能夠經過一次map-reduce來計算。

咱們理一下L-BFGS中能夠分佈式並行計算的步驟:

  • 計算梯度 能夠並行化 ,好比在機器1計算梯度1,在機器2計算梯度2....,而後經過一個Reduce把這些合成一個完整的梯度向量。
  • 計算方向 能夠並行化,一樣能夠經過把數據分區,而後Map算各自分區上的值,Reduce合起來獲得方向。
  • 計算損失 能夠並行化,一樣能夠經過把數據分區,而後Map算各自分區上的值,Reduce合起來獲得損失。

Alink中,使用AllReduce功能而非Flink原生Map / Reduce來完成了以上三點的並行計算和通訊。

3.2 CalcGradient

線搜索只有兩步,肯定方向、肯定步長。肯定方向和模擬Hessian矩陣都須要計算梯度。目標函數的梯度向量計算中只須要進行向量間的點乘和相加,能夠很容易將每一個迭代過程拆分紅相互獨立的計算步驟,由不一樣的節點進行獨立計算,而後歸併計算結果。

Alink將樣本特徵向量分佈到不一樣的計算節點,由各計算節點完成本身所負責樣本的點乘與求和計算,而後將計算結果進行歸併,則實現了按行並行的LR。

實際狀況中也會存在針對高維特徵向量進行邏輯迴歸的場景,僅僅按行進行並行處理,沒法知足這類場景的需求,所以還須要按列將高維的特徵向量拆分紅若干小的向量進行求解。這個也許是Alink之後須要優化的一個點吧 。

CalcGradient是Alink迭代算子的派生類,函數總結以下:

  • 獲取通過處理的輸入數據labledVectors,
  • 從靜態內存中獲取 "迭代參數coef","優化函數objFunc" 和 "梯度";
  • 計算局部梯度 objFunc.calcGradient(labledVectors, coef, grad.f0); 這裏調用到了目標函數的梯度相關API;objFunc.calcGradient 根據採樣點計算梯度。Alink這裏就是把x, y代入,求損失函數的梯度就是對 Coef求偏導數。具體咱們在前文已經提到。
    • 對計算樣本中的每個樣本,分別計算不一樣特徵的計算梯度。
  • 將新計算出來的梯度乘以權重以後,存入靜態內存 gradAllReduce 中。
  • 後續會經過聚合函數,對全部計算樣本的特徵的梯度進行累加,獲得每個特徵的累積梯度以及損失。
public class CalcGradient extends ComputeFunction {

    /**
     * object function class, it supply the functions to calc local gradient (or loss).
     */
    private OptimObjFunc objFunc;

    @Override
    public void calc(ComContext context) {
        Iterable<Tuple3<Double, Double, Vector>> labledVectors = context.getObj(OptimVariable.trainData);
      
// 通過處理的輸入數據      
labledVectors = {ArrayList@9877}  size = 4
 0 = {Tuple3@9895} "(1.0,16.8,1.0 1.0 1.4657097546055162 1.4770978917519928)"
 1 = {Tuple3@9896} "(1.0,6.7,1.0 1.0 -0.338240712601273 -0.7385489458759964)"
 2 = {Tuple3@9897} "(1.0,6.9,1.0 1.0 -0.7892283294029703 -0.3692744729379982)"
 3 = {Tuple3@9898} "(1.0,8.0,1.0 1.0 -0.338240712601273 -0.3692744729379982)"
      
        // get iterative coefficient from static memory.
        Tuple2<DenseVector, Double> state = context.getObj(OptimVariable.currentCoef);
        int size = state.f0.size();
      
// 是Coef,1.7976931348623157E308是默認最大值
state = {Tuple2@9878} "(0.001 0.0 0.0 0.0,1.7976931348623157E308)"
 f0 = {DenseVector@9879} "0.001 0.0 0.0 0.0"
 f1 = {Double@9889} 1.7976931348623157E308
  
        DenseVector coef = state.f0;
        if (objFunc == null) {
            objFunc = ((List<OptimObjFunc>)context.getObj(OptimVariable.objFunc)).get(0);
        }
      
// 變量以下       
objFunc = {UnaryLossObjFunc@9882} 
 unaryLossFunc = {SquareLossFunc@9891} 
 l1 = 0.0
 l2 = 0.0
 params = {Params@9892} "Params {featureCols=["f0","f1","f2"], labelCol="label", predictionCol="linpred"}"      
      
        Tuple2<DenseVector, double[]> grad = context.getObj(OptimVariable.dir);

// 變量以下      
grad = {Tuple2@9952} "(0.0 0.0 0.0 0.0,[0.0, 0.1])"
 f0 = {DenseVector@9953} "0.0 0.0 0.0 0.0"
 f1 = {double[2]@9969}       
coef = {DenseVector@9951} "0.001 0.0 0.0 0.0"
 data = {double[4]@9982} 
      
        // calculate local gradient,使用目標函數
        Double weightSum = objFunc.calcGradient(labledVectors, coef, grad.f0);

        // prepare buffer vec for allReduce. the last element of vec is the weight Sum.
        double[] buffer = context.getObj(OptimVariable.gradAllReduce);
        if (buffer == null) {
            buffer = new double[size + 1];
            context.putObj(OptimVariable.gradAllReduce, buffer);
        }

        for (int i = 0; i < size; ++i) {
            buffer[i] = grad.f0.get(i) * weightSum;
        }

      /* the last element is the weight value */
        buffer[size] = weightSum;
    }
}

// 最後結果是
buffer = {double[5]@9910} 
 0 = -38.396
 1 = -38.396
 2 = -14.206109929253465
 3 = -14.364776997288134
 4 = 0.0

3.3 AllReduce

這裏是前面提到的 "經過聚合函數,對全部計算樣本的特徵的梯度進行累加,獲得每個特徵的累積梯度以及損失"。

具體關於AllReduce如何運做,能夠參見文章 [Alink漫談之三] AllReduce通訊模型

.add(new AllReduce(OptimVariable.gradAllReduce))

3.4 CalDirection

此時獲得的梯度,已是聚合以後的,因此能夠開始計算方向。

3.4.1 預先分配

OptimVariable.grad 是預先分配的。

public class PreallocateSkyk extends ComputeFunction {
    private int numCorrections;

    /**
     * prepare hessian matrix of lbfgs method. we allocate memory fo sK, yK at first iteration step.
     *
     * @param context context of iteration.
     */
    @Override
    public void calc(ComContext context) {
        if (context.getStepNo() == 1) {
            Tuple2<DenseVector, double[]> grad = context.getObj(OptimVariable.grad);
            int size = grad.f0.size();
            DenseVector[] sK = new DenseVector[numCorrections];
            DenseVector[] yK = new DenseVector[numCorrections];
            for (int i = 0; i < numCorrections; ++i) {
                sK[i] = new DenseVector(size);
                yK[i] = new DenseVector(size);
            }
            context.putObj(OptimVariable.sKyK, Tuple2.of(sK, yK));
        }
    }
}

3.4.2 計算方向

在計算的過程當中,須要不斷的計算和存儲歷史的Hessian矩陣,在L-BFGS算法,但願只保留最近的m次迭代信息,便可以擬合Hessian矩陣。在L-BFGS算法中,再也不保存完整的Hk,而是存儲向量序列{sk}和{yk},須要矩陣時Hk,使用向量序列{sk}和{yk}計算就能夠獲得,而向量序列{sk}和{yk}也不是全部都要保存,只要保存最新的m步向量便可。

具體原理和公式這裏再也不贅述,網上不少文章講解很是好。

重點說明,dir的各個數據用途是

dir = {Tuple2@9931} "(-9.599 -9.599 -3.5515274823133662 -3.5911942493220335,[4.0, 0.1])"
 f0 = {DenseVector@9954} "-9.599 -9.599 -3.5515274823133662 -3.5911942493220335" //梯度
 f1 = {double[2]@9938} 
  0 = 4.0 //權重
  1 = 0.1 //學習率 learning rate,0.1是初始化數值,後續UpdateModel時候會更新

代碼摘要以下:

@Override
public void calc(ComContext context) {
   Tuple2 <DenseVector, double[]> grad = context.getObj(OptimVariable.grad);
   Tuple2 <DenseVector, double[]> dir = context.getObj(OptimVariable.dir);
   Tuple2 <DenseVector[], DenseVector[]> hessian = context.getObj(OptimVariable.sKyK);
   int size = grad.f0.size();
   // gradarr是上一階段CalcGradient的結果
   double[] gradarr = context.getObj(OptimVariable.gradAllReduce);
// 變量爲
gradarr = {double[5]@9962} 
 0 = -38.396
 1 = -38.396
 2 = -14.206109929253465
 3 = -14.364776997288134
 4 = 4.0
   
   if (this.oldGradient == null) {
      oldGradient = new DenseVector(size);
   }
   // hessian用來看成隊列,存儲sK,yK,只保留最近m個
   DenseVector[] sK = hessian.f0;
   DenseVector[] yK = hessian.f1;
   for (int i = 0; i < size; ++i) {
      //gradarr[size]是權重
      grad.f0.set(i, gradarr[i] / gradarr[size]); //size = 4
   }
// 賦值梯度,這裏都除以權重
grad = {Tuple2@9930} "(-9.599 -9.599 -3.5515274823133662 -3.5911942493220335,[0.0])"
 f0 = {DenseVector@9937} "-9.599 -9.599 -3.5515274823133662 -3.5911942493220335"
  data = {double[4]@9963} 
   0 = -9.599
   1 = -9.599
   2 = -3.5515274823133662
   3 = -3.5911942493220335
 f1 = {double[1]@9961} 
  0 = 0.0  
  
   dir.f1[0] = gradarr[size]; //權重
   int k = context.getStepNo() - 1;

   if (k == 0) { //首次迭代
      dir.f0.setEqual(grad.f0); // 梯度賦予在這裏
      oldGradient.setEqual(grad.f0);
   } else {
      yK[(k - 1) % m].setEqual(grad.f0);
      yK[(k - 1) % m].minusEqual(oldGradient);
      oldGradient.setEqual(grad.f0);
   }
   // copy g_k and store in qL

   dir.f0.setEqual(grad.f0); //拷貝梯度到這裏
//  
dir = {Tuple2@9931} "(-9.599 -9.599 -3.5515274823133662 -3.5911942493220335,[4.0, 0.1])"
 f0 = {DenseVector@9954} "-9.599 -9.599 -3.5515274823133662 -3.5911942493220335" //梯度
 f1 = {double[2]@9938} 
  0 = 4.0 //權重
  1 = 0.1 //學習率 learning rate,0.1是初始化數值
  
   // compute H^-1 * g_k
   int delta = k > m ? k - m : 0;
   int l = k <= m ? k : m; // m = 10
   if (alpha == null) {
      alpha = new double[m];
   }
   // two-loop的過程,經過擬牛頓法計算Hessian矩陣       
   for (int i = l - 1; i >= 0; i--) {
      int j = (i + delta) % m;
      double dot = sK[j].dot(yK[j]);
      if (Math.abs(dot) > 0.0) {
         double rhoJ = 1.0 / dot;
         alpha[i] = rhoJ * (sK[j].dot(dir.f0)); // 計算alpha
         dir.f0.plusScaleEqual(yK[j], -alpha[i]); // 從新修正d
      }
   }
   for (int i = 0; i < l; i++) {
      int j = (i + delta) % m;
      double dot = sK[j].dot(yK[j]);
      if (Math.abs(dot) > 0.0) {
         double rhoJ = 1.0 / dot;
         double betaI = rhoJ * (yK[j].dot(dir.f0)); // 乘以rho
         dir.f0.plusScaleEqual(sK[j], (alpha[i] - betaI));// 從新修正d
      }
   }
}

//最後是存儲在 OptimVariable.dir

3.5 CalcLosses

根據更新的 dir 和 當前係數 計算損失函數偏差值,這個損失是爲後續的線性搜索準備的。目的是若是損失函數偏差值達到容許的範圍,那麼中止迭代,不然重複迭代。

CalcLosses基本邏輯以下:

  • 1)獲得本次步長 Double beta = dir.f1[1] / numSearchStep;
    • 後續UpdateModel 中會對下一次計算的步長(learning rate)進行更新,好比 dir.f1[1] *= 1.0 / (numSearchStep * numSearchStep); 或者 dir.f1[1] = Math.min(dir.f1[1], numSearchStep);
  • 2)調用目標函數的 calcSearchValues 來計算當前係數對應的損失;
  • 3)calcSearchValues 遍歷輸入labelVectors,對於每一個 labelVector 按照線性搜索的步驟進行計算損失。vec[i] += weight * this.unaryLossFunc.loss(etaCoef - i * etaDelta, labelVector.f1); 循環內部以下:
    • 3.1)用x-vec和coef計算出來的 Y ,etaCoef = getEta(labelVector, coefVector);
    • 3.2)以x-vec和dirVec計算出來的 deltaY,etaDelta = getEta(labelVector, dirVec) * beta;
    • 3.3)按照線性搜索的步驟進行計算損失。vec[i] += weight * this.unaryLossFunc.loss(etaCoef - i * etaDelta, labelVector.f1); 聯繫損失函數可知,etaCoef - i * etaDelta, labelVector.f1 是 訓練數據預測值 與 實際類別 的誤差;
  • 4)爲後續聚合 lossAllReduce 準備數據;

代碼以下:

public class CalcLosses extends ComputeFunction {

    @Override
    public void calc(ComContext context) {
        Iterable<Tuple3<Double, Double, Vector>> labledVectors = context.getObj(OptimVariable.trainData);
        Tuple2<DenseVector, double[]> dir = context.getObj(OptimVariable.dir);
        Tuple2<DenseVector, Double> coef = context.getObj(OptimVariable.currentCoef);
        if (objFunc == null) {
            objFunc = ((List<OptimObjFunc>)context.getObj(OptimVariable.objFunc)).get(0);
        }
        /**
         *  calculate losses of current coefficient.
         *  if optimizer is owlqn, constraint search will used, else without constraint.
         */
        Double beta = dir.f1[1] / numSearchStep;
        double[] vec = method.equals(OptimMethod.OWLQN) ?
            objFunc.constraintCalcSearchValues(labledVectors, coef.f0, dir.f0, beta, numSearchStep)
            : objFunc.calcSearchValues(labledVectors, coef.f0, dir.f0, beta, numSearchStep);

// 變量爲
dir = {Tuple2@9988} "(-9.599 -9.599 -3.5515274823133662 -3.5911942493220335,[4.0, 0.1])"
coef = {Tuple2@9989} "(0.001 0.0 0.0 0.0,1.7976931348623157E308)"
beta = {Double@10014} 0.025
vec = {double[5]@10015} 
 0 = 0.0
 1 = 0.0
 2 = 0.0
 3 = 0.0
 4 = 0.0  
      
        // prepare buffer vec for allReduce.
        double[] buffer = context.getObj(OptimVariable.lossAllReduce);
        if (buffer == null) {
            buffer = vec.clone();
            context.putObj(OptimVariable.lossAllReduce, buffer);
        } else {
            System.arraycopy(vec, 0, buffer, 0, vec.length);
        }
    }
}

其中搜索是一元目標函數提供的,其又調用了損失函數。

public class UnaryLossObjFunc extends OptimObjFunc {
   /**
     * Calculate loss values for line search in optimization.
     *
     * @param labelVectors train data.
     * @param coefVector   coefficient of current time.
     * @param dirVec       descend direction of optimization problem.
     * @param beta         step length of line search.
     * @param numStep      num of line search step.
     * @return double[] losses.
     */
    @Override
    public double[] calcSearchValues(Iterable<Tuple3<Double, Double, Vector>> labelVectors,
                                     DenseVector coefVector,
                                     DenseVector dirVec,
                                     double beta,
                                     int numStep) {
        double[] vec = new double[numStep + 1];
      
// labelVector是三元組Tuple3<weight, label, feature vector>      
labelVectors = {ArrayList@10007}  size = 4
 0 = {Tuple3@10027} "(1.0,16.8,1.0 1.0 1.4657097546055162 1.4770978917519928)"
 1 = {Tuple3@10034} "(1.0,6.7,1.0 1.0 -0.338240712601273 -0.7385489458759964)"
 2 = {Tuple3@10035} "(1.0,6.9,1.0 1.0 -0.7892283294029703 -0.3692744729379982)"
 3 = {Tuple3@10036} "(1.0,8.0,1.0 1.0 -0.338240712601273 -0.3692744729379982)"
coefVector = {DenseVector@10008} "0.001 0.0 0.0 0.0"
dirVec = {DenseVector@10009} "-9.599 -9.599 -3.5515274823133662 -3.5911942493220335"
beta = 0.025
numStep = 4
vec = {double[5]@10026} 
 0 = 0.0
 1 = 0.0
 2 = 0.0
 3 = 0.0
 4 = 0.0     
   
        for (Tuple3<Double, Double, Vector> labelVector : labelVectors) {
            double weight = labelVector.f0;
            //用x-vec和coef計算出來的 Y
            double etaCoef = getEta(labelVector, coefVector); 
            //以x-vec和dirVec計算出來的 deltaY
            double etaDelta = getEta(labelVector, dirVec) * beta;
weight = 1.0
etaCoef = 0.001
etaDelta = -0.7427013482280431          
            for (int i = 0; i < numStep + 1; ++i) {
                //labelVector.f1就是label y
                //聯繫下面損失函數可知,etaCoef - i * etaDelta, labelVector.f1 是 訓練數據預測值 與 實際類別 的誤差
                vec[i] += weight * this.unaryLossFunc.loss(etaCoef - i * etaDelta, labelVector.f1); 
            }
        }
        return vec;
    }  
  
    private double getEta(Tuple3<Double, Double, Vector> labelVector, DenseVector coefVector) {
      //labelVector.f2 = {DenseVector@9972} "1.0 1.0 1.4657097546055162 1.4770978917519928"
        return MatVecOp.dot(labelVector.f2, coefVector);
    }
}

vec = {double[5]@10026} 
 0 = 219.33160199999998
 1 = 198.85962729259512
 2 = 179.40202828917856
 3 = 160.95880498975038
 4 = 143.52995739431051

回顧損失函數以下

/**
 * Squared loss function.
 * https://en.wikipedia.org/wiki/Loss_functions_for_classification#Square_loss
 */
public class SquareLossFunc implements UnaryLossFunc {
   public SquareLossFunc() { }

   @Override
   public double loss(double eta, double y) {
      return 0.5 * (eta - y) * (eta - y);
   }

   @Override
   public double derivative(double eta, double y) {
      return eta - y;
   }

   @Override
   public double secondDerivative(double eta, double y) {
      return 1;
   }
}

3.6 UpdateModel

本模塊作兩件事

  • 基於dir和step length來更新coefficient,即依據方向和步長計算。
    • 若是簡化理解,參數更新公式爲 :下一刻參數 = 上一時刻參數 - 學習率 * (損失函數對這個參數的導數)。
  • 判斷循環的收斂。

由於變量太多,因此有時候就忘記誰是誰了,因此再次標示。

  • OptimVariable.dir是CalcGradient計算出來的梯度作修正以後的結果
  • OptimVariable.lossAllReduce 這個會變化,此時是上一階段計算的損失

代碼邏輯大體以下:

  • 1)得出"最新損失"的最小值位置pos
  • 2)得出學習率 beta = dir.f1[1] / numSearchStep;
  • 3)根據"最新損失pos"和 上一個最小值 last loss value判斷來進行分別處理:
    • 3.1)若是全部損失都比 last loss value 大,則
      • 3.1.1)縮減學習率 multiply 1.0 / (numSearchStep*numSearchStep)
      • 3.1.2)把 eta 設置爲 0;這個就是步長了
      • 3.1.3)把當前 loss 設置爲 last loss value
    • 3.2)若是losses[numSearchStep] 比 last loss value 小,則
      • 3.2.1)增大學習率 multiply numSearchStep
      • 3.2.2)設置eta是smallest value pos,eta = beta * pos; 這個eta就是步長了
      • 3.2.3)把當前 loss 設置爲當前loss最小值 losses[numSearchStep]
    • 3.3)不然
      • 3.3.1)學習率不更改
      • 3.3.2)設置eta是smallest value pos,eta = beta * pos; 這個eta就是步長了
      • 3.3.3)把當前 loss 設置爲當前loss最小值 losses[pos]
  • 4)修正Hessian矩陣
  • 5)用方向向量和步長來來更新系數向量 curCoef.f0.plusScaleEqual(dir.f0, -eta);
  • 6)若是當前loss比 min loss 小,則 用 current loss 更新 min loss
    • 6.1)minCoef.f1 = curCoef.f1; 更新最小loss
    • 6.2)minCoef.f0.setEqual(curCoef.f0); 更新最小loss所對應的Coef,即線性模型最後須要的係數

在這裏求步長,我沒有發現 Wolf-Powell 準則的使用,Alink作了某種優化。若是有朋友能看出來Wolf-Powell如何使用,還請留言指點 ,謝謝。

代碼以下:

public class UpdateModel extends ComputeFunction {
    @Override
    public void calc(ComContext context) {
        double[] losses = context.getObj(OptimVariable.lossAllReduce);
        Tuple2<DenseVector, double[]> dir = context.getObj(OptimVariable.dir);
        Tuple2<DenseVector, double[]> pseGrad = context.getObj(OptimVariable.pseGrad);
        Tuple2<DenseVector, Double> curCoef = context.getObj(OptimVariable.currentCoef);
        Tuple2<DenseVector, Double> minCoef = context.getObj(OptimVariable.minCoef);

        double lossChangeRatio = 1.0;
        double[] lossCurve = context.getObj(OptimVariable.lossCurve);
        for (int j = 0; j < losses.length; ++j) {
            losses[j] /= dir.f1[0]; //dir.f1[0]是權重
        }
        int pos = -1;
        //get the min value of losses, and remember the position.
        for (int j = 0; j < losses.length; ++j) {
            if (losses[j] < losses[0]) {
                losses[0] = losses[j];
                pos = j;
            }
        }

        // adaptive learningRate strategy
        double beta = dir.f1[1] / numSearchStep;
        double eta;
      
// 變量以下      
losses = {double[5]@10001} 
 0 = 35.88248934857763
 1 = 49.71490682314878
 2 = 44.85050707229464
 3 = 40.239701247437594
 4 = 35.88248934857763
dir = {Tuple2@10002} "(-9.599 -9.599 -3.5515274823133662 -3.5911942493220335,[4.0, 0.1])"
pseGrad = null
curCoef = {Tuple2@10003} "(0.001 0.0 0.0 0.0,1.7976931348623157E308)"
minCoef = {Tuple2@10004} "(0.001 0.0 0.0 0.0,1.7976931348623157E308)"
lossChangeRatio = 1.0
lossCurve = {double[100]@10005} 
pos = 4
beta = 0.025
curCoef.f1 = {Double@10006} 1.7976931348623157E308 
      
        if (pos == -1) {
            /**
             * if all losses larger than last loss value. we'll do the below things:
             * 1. reduce learning rate by multiply 1.0 / (numSearchStep*numSearchStep).
             * 2. set eta with zero.
             * 3. set current loss equals last loss value.
             */
            eta = 0;
            dir.f1[1] *= 1.0 / (numSearchStep * numSearchStep);  // 學習率 
            curCoef.f1 = losses[0]; // 最小loss
        } else if (pos == numSearchStep) {
            /**
             * if losses[numSearchStep] smaller than last loss value. we'll do the below things:
             * 1. enlarge learning rate by multiply numSearchStep.
             * 2. set eta with the smallest value pos.
             * 3. set current loss equals smallest loss value.
             */
            eta = beta * pos;
            dir.f1[1] *= numSearchStep;
            dir.f1[1] = Math.min(dir.f1[1], numSearchStep); // 學習率
            lossChangeRatio = Math.abs((curCoef.f1 - losses[pos]) / curCoef.f1);
            curCoef.f1 = losses[numSearchStep]; // 最小loss
          
// 當前數值
numSearchStep = 4 是NUM_SEARCH_STEP缺省值,是線性搜索的參數,Line search parameter, which define the search value num of one step.        
lossChangeRatio = 1.0
pos = 4
eta = 0.1
curCoef.f1 = {Double@10049} 35.88248934857763   
dir.f1[1] = 0.4  
  
        } else {
            /**
             * else :
             * 1. learning rate not changed.
             * 2. set eta with the smallest value pos.
             * 3. set current loss equals smallest loss value.
             */
            eta = beta * pos;
            lossChangeRatio = Math.abs((curCoef.f1 - losses[pos]) / curCoef.f1);
            curCoef.f1 = losses[pos]; // 最小loss
        }

		/* update loss value in loss curve at this step */
        lossCurve[context.getStepNo() - 1] = curCoef.f1;
      
lossCurve = {double[100]@9998} 
 0 = 35.88248934857763
 1 = Infinity
   
        if (method.equals(OptimMethod.OWLQN)) {
           .....          
        } else if (method.equals(OptimMethod.LBFGS)) {          
            Tuple2<DenseVector[], DenseVector[]> sKyK = context.getObj(OptimVariable.sKyK);
            int size = dir.f0.size();
            int k = context.getStepNo() - 1;
            DenseVector[] sK = sKyK.f0;
            for (int s = 0; s < size; ++s) {
                // 修正矩陣
                sK[k % OptimVariable.numCorrections].set(s, dir.f0.get(s) * (-eta));
            }
            curCoef.f0.plusScaleEqual(dir.f0, -eta); // 這裏是用方向向量和步長來更新系數向量
          
sKyK = {Tuple2@10043} "([0.9599000000000001 0.9599000000000001 0.35515274823133663 0.3591194249322034, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0],[0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0])"
 f0 = {DenseVector[10]@10044} 
  0 = {DenseVector@10074} "0.9599000000000001 0.9599000000000001 0.35515274823133663 0.3591194249322034"
    
        } 
      
        /**
         * if current loss is smaller than min loss, then update the min loss and min coefficient by current.
         */
        if (curCoef.f1 < minCoef.f1) {
            minCoef.f1 = curCoef.f1; // 最小loss
            minCoef.f0.setEqual(curCoef.f0); // 最小loss所對應的Coef,即線性模型最後須要的係數
        }

curCoef = {Tuple2@9996} "(0.9609000000000001 0.9599000000000001 0.35515274823133663 0.3591194249322034,35.88248934857763)"
 f0 = {DenseVector@10059} "0.9609000000000001 0.9599000000000001 0.35515274823133663 0.3591194249322034"
 f1 = {Double@10048} 35.88248934857763
minCoef = {Tuple2@9997} "(0.9609000000000001 0.9599000000000001 0.35515274823133663 0.3591194249322034,35.88248934857763)"
 f0 = {DenseVector@10059} "0.9609000000000001 0.9599000000000001 0.35515274823133663 0.3591194249322034"
 f1 = {Double@10048} 35.88248934857763
      
        // judge the convergence of iteration.
        filter(dir, curCoef, minCoef, context, lossChangeRatio);
    }
}

filter 判斷是否收斂,裏面的打印log很清晰的說明了函數邏輯。

/**
 * judge the convergence of iteration.
 */
public void filter(Tuple2<DenseVector, double[]> grad,
                   Tuple2<DenseVector, Double> c,
                   Tuple2<DenseVector, Double> m,
                   ComContext context,
                   double lossChangeRatio) {
    double epsilon = params.get(HasEpsilonDv0000001.EPSILON);
    int maxIter = params.get(HasMaxIterDefaultAs100.MAX_ITER);
    double gradNorm = ((Tuple2<DenseVector, double[]>)context.getObj(gradName)).f0.normL2();
    if (c.f1 < epsilon || gradNorm < epsilon) {
        printLog(" method converged at step : ", c.f1, m.f1, grad.f1[1], gradNorm, context, lossChangeRatio);
        grad.f1[0] = -1.0;
    } else if (context.getStepNo() > maxIter - 1) {
        printLog(" method stop at max step : ", c.f1, m.f1, grad.f1[1], gradNorm, context, lossChangeRatio);
        grad.f1[0] = -1.0;
    } else if (grad.f1[1] < EPS) {
        printLog(" learning rate is too small, method stops at step : ", c.f1, m.f1, grad.f1[1], gradNorm,
            context, lossChangeRatio);
        grad.f1[0] = -1.0;
    } else if (lossChangeRatio < epsilon && gradNorm < Math.sqrt(epsilon)) {
        printLog(" loss change ratio is too small, method stops at step : ", c.f1, m.f1, grad.f1[1], gradNorm,
            context, lossChangeRatio);
        grad.f1[0] = -1.0;
    } else {
        printLog(" method continue at step : ", c.f1, m.f1, grad.f1[1], gradNorm, context, lossChangeRatio);
    }
}

3.7 OutputModel

.closeWith(new OutputModel()) 這部分是每次迭代結束,臨時輸出模型,把數據轉換成Flink通用的Row類型,Transfer the state to model rows。

public class OutputModel extends CompleteResultFunction {

   @Override
   public List <Row> calc(ComContext context) {
      // get the coefficient of min loss.
      Tuple2 <DenseVector, double[]> minCoef = context.getObj(OptimVariable.minCoef);
      double[] lossCurve = context.getObj(OptimVariable.lossCurve);

      int effectiveSize = lossCurve.length;
      for (int i = 0; i < lossCurve.length; ++i) {
         if (Double.isInfinite(lossCurve[i])) {
            effectiveSize = i;
            break;
         }
      }

      double[] effectiveCurve = new double[effectiveSize];
      System.arraycopy(lossCurve, 0, effectiveCurve, 0, effectiveSize);

      Params params = new Params();
      params.set(ModelParamName.COEF, minCoef.f0);// 重點在這裏,minCoef是咱們真正須要的
      params.set(ModelParamName.LOSS_CURVE, effectiveCurve);
      List <Row> model = new ArrayList <>(1);
      model.add(Row.of(params.toJson()));
      return model;
   }
}

0x04 準備模型元數據

這裏設置了並行度爲1。

// Prepare the meta info of linear model.
DataSet<Params> meta = labelInfo.f0
    .mapPartition(new CreateMeta(modelName, linearModelType, isRegProc, params))
    .setParallelism(1);

具體代碼

public static class CreateMeta implements MapPartitionFunction<Object, Params> {
        @Override
        public void mapPartition(Iterable<Object> rows, Collector<Params> metas) throws Exception {
            Object[] labels = null;
            if (!this.isRegProc) {
                labels = orderLabels(rows);
            }

            Params meta = new Params();
            meta.set(ModelParamName.MODEL_NAME, this.modelName);
            meta.set(ModelParamName.LINEAR_MODEL_TYPE, this.modelType);
            meta.set(ModelParamName.LABEL_VALUES, labels);
            meta.set(ModelParamName.HAS_INTERCEPT_ITEM, this.hasInterceptItem);
            meta.set(ModelParamName.VECTOR_COL_NAME, vectorColName);
            meta.set(LinearTrainParams.LABEL_COL, labelName);
            metas.collect(meta);
        }  
}

// 變量爲
meta = {Params@9667} "Params {hasInterceptItem=true, vectorColName=null, modelName="Linear Regression", labelValues=null, labelCol="label", linearModelType="LinearReg"}"

0x05 創建模型

當迭代循環結束以後,Alink就根據Coef數據來創建模型。

/**
 * build the linear model rows, the format to be output.
 */
public static class BuildModelFromCoefs extends AbstractRichFunction implements
        @Override
        public void mapPartition(Iterable<Tuple2<DenseVector, double[]>> iterable,
                                 Collector<Row> collector) throws Exception {
            for (Tuple2<DenseVector, double[]> coefVector : iterable) {
                LinearModelData modelData = buildLinearModelData(meta,
                    featureNames,
                    labelType,
                    meanVar,
                    hasIntercept,
                    standardization,
                    coefVector);

                new LinearModelDataConverter(this.labelType).save(modelData, collector);
            }
        }  
}

獲得模型數據爲,裏面coef就是 f(x)=w^Tx+b 中的 w, b。是最終用來計算的。

modelData = {LinearModelData@10584} 
 featureNames = {String[3]@9787} 
 featureTypes = null
 vectorColName = null
 coefVector = {DenseVector@10485} "-3.938937407856857 4.799499941426075 0.8929571907809862 1.078169576770847"
 coefVectors = null
 vectorSize = 3
 modelName = "Linear Regression"
 labelName = null
 labelValues = null
 linearModelType = {LinearModelType@4674} "LinearReg"
 hasInterceptItem = true
 lossCurve = {double[12]@10593} 
  0 = 35.88248934857763
  1 = 12.807366842002144
  2 = 0.5228366663917704
  3 = 0.031112070740366038
  4 = 0.01098914933042993
  5 = 0.009765757443537283
  6 = 0.008750523231785415
  7 = 0.004210085397869248
  8 = 0.0039042232755530704
  9 = 0.0038821509860327537
  10 = 0.003882042680010676
  11 = 0.0038820422536391033
 labelType = {FractionalTypeInfo@9790} "Double"

0x06 使用模型預測

預測時候,使用的是LinearModelMapper,其內部部分變量打印出來以下,可以看出來模型數據。

this = {LinearModelMapper@10704} 
 vectorColIndex = -1
model = {LinearModelData@10597} 
 featureNames = {String[3]@10632} 
 featureTypes = null
 vectorColName = null
 coefVector = {DenseVector@10612} "-3.938937407856857 4.799499941426075 0.8929571907809862 1.078169576770847"
 coefVectors = null
 vectorSize = 0
 modelName = "Linear Regression"
 labelName = null
 labelValues = {Object[0]@10613} 
 linearModelType = {LinearModelType@10611} "LinearReg"
 hasInterceptItem = true
 lossCurve = null
 labelType = null

具體預測是在 LinearModelMapper.predict 中完成,具體以下:

  • 對應原始輸入 Row.of("$3$0:1.0 1:7.0 2:9.0", "1.0 7.0 9.0", 1.0, 7.0, 9.0, 16.8),
  • 經過 FeatureLabelUtil.getFeatureVector 處理以後,
  • 獲得的四元組是 "1.0 1.0 7.0 9.0",其中第一個 1.0 是經過 aVector.set(0, 1.0); 專門設定的固定值。好比模型是 f(x) = ax + b,這個固定值 1.0 就是 b 的初始化值,隨着優化過程會獲得 b。因此這裏仍是須要有一個 1.0 來進行預測。
  • 模型係數是:"-3.938937407856857 4.799499941426075 0.8929571907809862 1.078169576770847"
  • 四元組 和 模型係數 點積的結果就是 dotValue = 16.814789059973744

這樣就能看出來模型係數如何使用的了。

public Object predict(Vector vector) throws Exception {
   double dotValue = MatVecOp.dot(vector, model.coefVector);

   switch (model.linearModelType) {
      case LR:
      case SVM:
      case Perceptron:
         return dotValue >= 0 ? model.labelValues[0] : model.labelValues[1];
      case LinearReg:
      case SVR:
         return dotValue;
   }
}

vector = {DenseVector@10610} "1.0 1.0 7.0 9.0"
 data = {double[4]@10619} 
  0 = 1.0
  1 = 1.0
  2 = 7.0
  3 = 9.0
model.coefVector = {DenseVector@10612} "-3.938937407856857 4.799499941426075 0.8929571907809862 1.078169576770847"
 data = {double[4]@10620} 
  0 = -3.938937407856857
  1 = 4.799499941426075
  2 = 0.8929571907809862
  3 = 1.078169576770847

0x07 本系列其餘文章

Alink漫談(一) : 從KMeans算法實現不一樣看Alink設計思想

Alink漫談(二) : 從源碼看機器學習平臺Alink設計和架構

[Alink漫談之三] AllReduce通訊模型

Alink漫談(四) : 模型的前因後果

Alink漫談(五) : 迭代計算和Superstep

Alink漫談(六) : TF-IDF算法的實現

Alink漫談(七) : 如何劃分訓練數據集和測試數據集

Alink漫談(八) : 二分類評估 AUC、K-S、PRC、Precision、Recall、LiftChart 如何實現

Alink漫談(九) :特徵工程 之 特徵哈希/標準化縮放

Alink漫談(十) :線性迴歸實現 之 數據預處理

0xFF 參考

http://www.mamicode.com/info-detail-2508527.html)

機器學習算法實現解析——liblbfgs之L-BFGS算法

深刻機器學習系列之BFGS & L-BFGS

擬牛頓法公式推導以及python代碼實現(一)

淺顯易懂——泰勒展開式

泰勒展開 — Taylor Expansion

從牛頓插值法到泰勒公式

怎樣更好地理解並記憶泰勒展開式?

優化算法——牛頓法(Newton Method)

優化算法——擬牛頓法之L-BFGS算法

一文讀懂L-BFGS算法

《分佈式機器學習算法、理論與實踐_劉鐵巖》

LogisticRegressionWithLBFGS--邏輯迴歸

邏輯迴歸優化方法-L-BFGS

機器學習與運籌優化(三)從牛頓法到L-BFGS

機器學習中牛頓法凸優化的通俗解釋

深刻機器學習系列17-BFGS & L-BFGS

優化方法基礎系列-精確的一維搜索技術

優化方法基礎系列-非精確的一維搜索技術

[原創]用「人話」解釋不精確線搜索中的Armijo-Goldstein準則及Wolfe-Powell準則

https://www.zhihu.com/question/36425542

一維搜索算法介紹及其實現

數值優化|筆記整理(1)——引入,線搜索:步長選取條件

https://zhuanlan.zhihu.com/p/32821110

線搜索(一):步長的選取

最優化問題——一維搜索(二)

CRF L-BFGS Line Search原理及代碼分析

步長與學習率

機器學習優化算法L-BFGS及其分佈式實現

L-BFGS算法詳解(邏輯迴歸的默認優化算法)

線性迴歸的原理及實踐(牛頓法)

線性迴歸、梯度降低(Linear Regression、Gradient Descent)

L-BFGS算法

並行邏輯迴歸

相關文章
相關標籤/搜索