Alink 是阿里巴巴基於實時計算引擎 Flink 研發的新一代機器學習算法平臺,是業界首個同時支持批式算法、流式算法的機器學習平臺。本文介紹了線性迴歸的L-BFGS優化在Alink是如何實現的,但願能夠做爲你們看線性迴歸代碼的Roadmap。html
由於Alink的公開資料太少,因此如下均爲自行揣測,確定會有疏漏錯誤,但願你們指出,我會隨時更新。java
本系列目前已有十一篇,歡迎你們指點python
Alink漫談(一) : 從KMeans算法實現不一樣看Alink設計思想web
Alink漫談(二) : 從源碼看機器學習平臺Alink設計和架構算法
Alink漫談(七) : 如何劃分訓練數據集和測試數據集機器學習
Alink漫談(八) : 二分類評估 AUC、K-S、PRC、Precision、Recall、LiftChart 如何實現
到目前爲止,已經處理完畢輸入,接下來就是優化。優化的主要目標是找到一個方向,參數朝這個方向移動以後使得損失函數的值可以減少,這個方向每每由一階偏導或者二階偏導各類組合求得。 因此咱們再次複習下基本思路。
對於線性迴歸模型 f(x) = w'x+e,咱們構造一個Cost函數(損失函數)J(θ),而且經過找到 J(θ) 函數的最小值,就能夠肯定線性模型的係數 w 了。
最終的優化函數是:min(L(Y, f(x)) + J(x)) ,即最優化經驗風險和結構風險,而這個函數就被稱爲目標函數。
咱們要作的是依據咱們的訓練集,選取最優的θ,在咱們的訓練集中讓f(x)儘量接近真實的值。咱們定義了一個函數來描述 「f(x)和真實的值y之間的差距「,這個函數稱爲目標函數,表達式以下:
這裏的目標函數就是著名的最小二乘函數。
咱們要選擇最優的θ,使得f(x)最近進真實值。這個問題就轉化爲求解最優的θ,使目標函數 J(θ) 取最小值。
尋找合適的W令目標函數f(W) 最小,是一個無約束最優化問題,解決這個問題的通用作法是隨機給定一個初始的W0,經過迭代,在每次迭代中計算目標函數的降低方向並更新W,直到目標函數穩定在最小的點。
不一樣的優化算法的區別就在於目標函數降低方向Dt的計算。降低方向是經過對目標函數在當前的W下求一階倒數(梯度,Gradient)和求二階導數(海森矩陣,Hessian Matrix)獲得。常見的算法有梯度降低法、牛頓法、擬牛頓法。
Alink中,UnaryLossObjFunc是目標函數,SquareLossFunc 是損失函數,使用L-BFGS算法對模型進行優化。
即 優化方向由擬牛頓法L-BFGS搞定(具體如何弄就是看f(x)的泰勒二階展開),損失函數最小值是平方損失函數來計算。
由於L-BFGS算法是擬牛頓法的一種,因此咱們先從牛頓法的本質泰勒展開開始介紹。
泰勒展開是但願基於某區間一點x_0展開,用一組簡單的冪函數來近似一個複雜的函數f(x)在該區間的局部。泰勒展開的應用場景例如:咱們很難直接求得sin(1)的值,但咱們能夠經過sin的泰勒級數求得sin(1)的近似值,且展開項越多,精度越高。計算機通常都是把 sin(x)進行泰勒展開進行計算的。
泰勒當年爲何要發明這條公式?
由於當時數學界對簡單函數的研究和應用已經趨於成熟,而複雜函數,好比:f(x) = sin(x)ln(1+x) 這種一看就頭疼的函數,還有那種根本就找不到表達式的曲線。除了代入一個x能夠獲得它的y,就啥事都很難幹了。因此泰勒同窗就迎難而上!決定讓這些式子通通現出原形,通通變簡單。
要讓一個複雜函數變簡單,能不能把它轉換成別的表達式?泰勒公式一句話描述:就是用多項式函數去逼近光滑函數。即,根據「以直代曲、化整爲零」的數學思想,產生了泰勒公式。
泰勒公式經過把【任意函數表達式】轉換(重寫)爲【多項式】形式,是一種極其強大的函數近似工具。爲何說它強大呢?
泰勒公式乾的事情就是:使用多項式表達式估計(近似) 在 附近的值。
當咱們想要仿造一個東西的時候,無形之中都會按照以下思路,即先保證大致上類似,再保證局部類似,再保證細節類似,再保證更細微的地方類似……不斷地細化下去,無窮次細化之後,仿造的東西將無限接近真品。真假難辨。
物理學家得出結論:把生活中關於「仿造」的經驗運用到運動學問題中,若是想仿造一段曲線,那麼首先應該保證曲線的起始點同樣,其次保證起始點處位移隨時間的變化率同樣(速度相同),再次應該保證前二者相等的同時關於時間的二階變化率同樣(加速度相同)……若是隨時間每一階變化率(每一階導數)都同樣,那這倆曲線確定是徹底等價的。
因此若是泰勒想一個辦法讓本身避免接觸 sin(x)這類函數,即把這類函數替換掉。 就能夠根據這類函數的圖像,仿造一個圖像,與原來的圖像相相似,這種行爲在數學上叫近似。不扯這個名詞。講講如何仿造圖像。
仿造的第一步,就是讓仿造的曲線也過這個點。
完成了仿造的第一步,很粗糙,甚至徹底看不出來這倆有什麼類似的地方,那就繼續細節化。開始考慮曲線的變化趨勢,即導數,保證在此處的導數相等。
經歷了第二步,如今起始點相同了,總體變化趨勢相近了,可能看起來有那麼點意思了。想進一步精確化,應該考慮凹凸性。高中學過:表徵圖像的凹凸性的參數爲「導數的導數」。因此,下一步就讓兩者的導數的導數相等。
起始點相同,增減性相同,凹凸性相同後,仿造的函數更像了。若是再繼續細化下去,應該會無限接近。因此泰勒認爲「仿造一段曲線,要先保證起點相同,再保證在此處導數相同,繼續保證在此處的導數的導數相同……」
泰勒展開式就是把一個三角函數或者指數函數或者其餘比較難纏的函數用多項式替換掉。
也就是說,有一個原函數f(x) ,我再造一個圖像與原函數圖像類似的多項式函數 g(x) ,爲了保證類似,我只須要保證這倆函數在某一點的初始值相等,1階導數相等,2階導數相等,……n階導數相等。
牛頓法的基本思路是,在現有極小點估計值的附近對 f(x) 作二階泰勒展開,進而找到極小點的下一個估計值。其核心思想是利用迭代點 x_k 處的一階導數(梯度)和二階導數(Hessen矩陣)對目標函數進行二次函數近似,而後把二次模型的極小點做爲新的迭代點,並不斷重複這一過程,直至求得知足精度的近似極小值。
梯度降低算法是將函數在 xn 位置進行一次函數近似,也就是一條直線。計算梯度,從而決定下一步優化的方向是梯度的反方向。而牛頓法是將函數在 xn 位置進行二階函數近似,也就是二次曲線。計算梯度和二階導數,從而決定下一步的優化方向。
咱們要優化的都是多元函數,x每每不是一個實數,而是一個向量。因此f(x) 的一階導數也是一個向量,再求導的二階導數是一個矩陣,就是Hessian矩陣。
牛頓法求根能夠按照泰勒一階展開。例如對於方程 f(x) = 0,咱們在任意一點 x0 處,進行一階泰勒展開:
令 f(x) = 0,帶入上式,便可獲得:
注意,這裏使用了近似,獲得的 x 並非方程的根,只是近似解。可是,x 比 x0 更接近於方程的根。
而後,利用迭代方法求解,以 x 爲 x0,求解下一個距離方程的根更近的位置。迭代公式能夠寫成:
機器學習、深度學習中,損失函數的優化問題通常是基於一階導數梯度降低的。如今,從另外一個角度來看,想要讓損失函數最小化,這實際上是一個最值問題,對應函數的一階導數 f'(x) = 0。也就是說,若是咱們找到了能讓 f'(x) = 0 的點 x,損失函數取得最小值,也就實現了模型優化目標。
如今的目標是計算 f’(x) = 0 對應的 x,即 f’(x) = 0 的根。轉化爲求根問題,就能夠利用上一節的牛頓法了。
與上一節有所不一樣,首先,對 f(x) 在 x0 處進行二階泰勒展開:
上式成立的條件是 f(x) 近似等於 f(x0)。令 f(x) = f(x0),並對 (x - x0) 求導,可得:
一樣,雖然 x 並非最優解點,可是 x 比 x0 更接近 f'(x) = 0 的根。這樣,就能獲得最優化的迭代公式:
經過迭代公式,就能不斷地找到 f'(x) = 0 的近似根,從而也就實現了損失函數最小化的優化目標。
在機器學習的優化問題中,咱們要優化的都是多元函數,因此x每每不是一個實數,而是一個向量。因此將牛頓求根法利用到機器學習中時,x是一個向量,f(x) 的一階導數也是一個向量,再求導的二階導數是一個矩陣,就是Hessian矩陣。
在高維空間,咱們用梯度替代一階導數,用Hessian矩陣替代二階導數,牛頓法的迭代公式不變:
其中 J 定義爲 雅克比矩陣,對應一階偏導數。
推導以下 :
咱們假設f(x)是二階可微實函數,把f(x)在xk處Taylor展開並取二階近似爲
咱們的目標是求f(x)的最小值,而導數爲0的點極有可能爲極值點,故在此對f(x)求導,並令其導數爲0,即∇f(x)=0,可得
設∇2f(x)可逆,由(2)能夠獲得牛頓法的迭代公式
當原函數存在一階二階連續可導時,能夠採用牛頓法進行一維搜索,收斂速度快,具備局部二階收斂速度。
總結(模仿)一下使用牛頓法求根的步驟:
a.已知函數的狀況下,隨機產生點.
b.由已知的點按照的公式進行k次迭代.
c.若是與上一次的迭代結果相同或者相差結果小於一個閾值時,本次結果就是函數的根.
僞代碼以下
def newton(feature, label, iterMax, sigma, delta): '''牛頓法 input: feature(mat):特徵 label(mat):標籤 iterMax(int):最大迭代次數 sigma(float), delta(float):牛頓法中的參數 output: w(mat):迴歸係數 ''' n = np.shape(feature)[1] w = np.mat(np.zeros((n, 1))) it = 0 while it <= iterMax: g = first_derivativ(feature, label, w) # 一階導數 G = second_derivative(feature) # 二階導數 d = -G.I * g m = get_min_m(feature, label, sigma, delta, d, w, g) # 獲得步長中最小的值m w = w + pow(sigma, m) * d it += 1 return w
牛頓法不只須要計算 Hessian 矩陣,並且須要計算 Hessian 矩陣的逆。當數據量比較少的時候,運算速度不會受到大的影響。可是,當數據量很大,特別在深度神經網絡中,計算 Hessian 矩陣和它的逆矩陣是很是耗時的。從總體效果來看,牛頓法優化速度沒有梯度降低算法那麼快。因此,目前神經網絡損失函數的優化策略大多都是基於梯度降低。
另外一個問題是,若是某個點的Hessian矩陣接近奇異(條件數過大),逆矩陣會致使數值不穩定,甚至迭代可能不會收斂。
當x的維度特別多的時候,咱們想求得f(x) 的二階導數很困難,而牛頓法求駐點又是一個迭代算法,因此這個困難咱們還要面臨無限屢次,致使了牛頓法求駐點在機器學習中沒法使用。有沒有什麼解決辦法呢?
實際應用中,咱們一般不去求解逆矩陣,而是考慮求解Hessian矩陣的近似矩陣,最好是隻利用一階導數近似地獲得二階導數的信息,從而在較少的計算量下獲得接近二階的收斂速率。這就是擬牛頓法。
擬牛頓法的想法其實很簡單,就像是函數值的兩點之差能夠逼近導數同樣,一階導數的兩點之差也能夠逼近二階導數。幾何意義是求一階導數的「割線」,取極限時,割線會逼近切線,矩陣B就會逼近Hessian矩陣。
擬牛頓法就是模擬出Hessen矩陣的構造過程,經過迭代來逼近。主要包括DFP擬牛頓法,BFGS擬牛頓法。擬牛頓法只要求每一步迭代時知道目標函數的梯度。
各類擬牛頓法使用迭代法分別近似海森矩陣的逆和它自身;
在各類擬牛頓法中,通常的構造Hk+1的策略是,H1一般選擇任意的一個n階對稱正定矩陣(通常爲I),而後經過不斷的修正Hk給出Hk+1,即
好比:BFGS法每次更新矩陣H(Hessian矩陣的逆矩陣)須要的是第k步的迭代點差s和梯度差y,第k+1步的H至關於須要從開始到第k步的所用s和y的值。
咱們要經過牛頓求駐點法和BFGS算法來求得一個函數的根,兩個算法都須要迭代,因此咱們乾脆讓他倆一塊兒迭代就行了。兩個算法都是慢慢逼近函數根,因此通過k次迭代之後,所獲得的解就是機器學習中目標函數導函數的根。這種兩個算法共同迭代的計算方式,咱們稱之爲On The Fly.
在BFGS算法迭代的第一步,單位矩陣與梯度 g 相乘,就等於梯度 g,形式上同梯度降低的表達式是相同的。因此BFGS算法能夠理解爲從梯度降低逐步轉換爲牛頓法求函數解的一個算法。
雖然咱們使用了BFGS算法來利用單位矩陣逐步逼近H矩陣,可是每次計算的時候都要存儲D矩陣,D矩陣有多大。呢。假設咱們的數據集有十萬個維度(不算特別大),那麼每次迭代所要存儲D矩陣的結果是74.5GB。咱們沒法保存如此巨大的矩陣內容,如何解決呢?使用L-BFGS算法.
L-BFGS算法的基本思想是:算法只保存並利用最近m次迭代的曲率信息來構造海森矩陣的近似矩陣。
咱們要經過牛頓求駐點法和BFGS算法來求得一個函數的根,兩個算法都須要迭代,因此咱們乾脆讓他倆一塊兒迭代就行了。兩個算法都是慢慢逼近函數根,因此通過k次迭代之後,所獲得的解就是機器學習中目標函數導函數的根。這種兩個算法共同迭代的計算方式,咱們稱之爲On The Fly.
在BFGS算法迭代的第一步,單位矩陣與梯度g相乘,就等於梯度g,形式上同梯度降低的表達式是相同的。因此BFGS算法能夠理解爲從梯度降低逐步轉換爲牛頓法求函數解的一個算法。
雖然咱們使用了BFGS算法來利用單位矩陣逐步逼近H矩陣,可是每次計算的時候都要存儲D矩陣,D矩陣有多大。呢。假設咱們的數據集有十萬個維度(不算特別大),那麼每次迭代所要存儲D矩陣的結果是74.5GB。咱們沒法保存如此巨大的矩陣內容,如何解決呢?使用L-BFGS算法.
咱們每一次對D矩陣的迭代,都是經過迭代計算sk和yk獲得的。咱們的內存存不下時候只能丟掉一些存不下的數據。假設咱們設置的存儲向量數爲100,當s和y迭代超過100時,就會扔掉第一個s和y。每多一次迭代就對應的扔掉最前邊的s和y。這樣雖然損失了精度,但確能夠保證使用有限的內存將函數的解經過BFGS算法求獲得。 因此L-BFGS算法能夠理解爲對BFGS算法的又一次近似或者逼近。
這裏不介紹數學論證,由於網上優秀文章有不少,這裏只是介紹工程實現。總結L-BFGS算法的大體步驟以下:
Step1: 選初始點x_0,存儲最近迭代次數m; Step2: k=0, H_0=I, r=f(x_0); Step3: 根據更新的參數計算梯度和損失值,若是達到閾值,則返回最優解x_{k+1},不然轉Step4; Step4: 計算本次迭代的可行梯度降低方向 p_k=-r_k; Step5: 計算步長alpha_k,進行一維搜索; Step6: 更新權重x; Step7: 只保留最近m次的向量對; Step8: 計算並保存 s_k, y_k Step9: 用two-loop recursion算法求r_k; k=k+1,轉Step3。
回到代碼,BaseLinearModelTrainBatchOp.optimize
函數調用的是
return new Lbfgs(objFunc, trainData, coefficientDim, params).optimize();
優化後將返回線性模型的係數。
/** * optimizer api. * * @return the coefficient of linear problem. */ @Override public DataSet <Tuple2 <DenseVector, double[]>> optimize() { /** * solving problem using iteration. * trainData is the distributed samples. * initCoef is the initial model coefficient, which will be broadcast to every worker. * objFuncSet is the object function in dataSet format * .add(new PreallocateCoefficient(OptimName.currentCoef)) allocate memory for current coefficient * .add(new PreallocateCoefficient(OptimName.minCoef)) allocate memory for min loss coefficient * .add(new PreallocateLossCurve(OptimVariable.lossCurve)) allocate memory for loss values * .add(new PreallocateVector(OptimName.dir ...)) allocate memory for dir * .add(new PreallocateVector(OptimName.grad)) allocate memory for grad * .add(new PreallocateSkyk()) allocate memory for sK yK * .add(new CalcGradient(objFunc)) calculate local sub gradient * .add(new AllReduce(OptimName.gradAllReduce)) sum all sub gradient with allReduce * .add(new CalDirection()) get summed gradient and use it to calc descend dir * .add(new CalcLosses(objFunc, OptimMethod.GD)) calculate local losses for line search * .add(new AllReduce(OptimName.lossAllReduce)) sum all losses with allReduce * .add(new UpdateModel(maxIter, epsilon ...)) update coefficient * .setCompareCriterionOfNode0(new IterTermination()) judge stop of iteration */ DataSet <Row> model = new IterativeComQueue() .initWithPartitionedData(OptimVariable.trainData, trainData) .initWithBroadcastData(OptimVariable.model, coefVec) .initWithBroadcastData(OptimVariable.objFunc, objFuncSet) .add(new PreallocateCoefficient(OptimVariable.currentCoef)) .add(new PreallocateCoefficient(OptimVariable.minCoef)) .add(new PreallocateLossCurve(OptimVariable.lossCurve, maxIter)) .add(new PreallocateVector(OptimVariable.dir, new double[] {0.0, OptimVariable.learningRate})) .add(new PreallocateVector(OptimVariable.grad)) .add(new PreallocateSkyk(OptimVariable.numCorrections)) .add(new CalcGradient()) .add(new AllReduce(OptimVariable.gradAllReduce)) .add(new CalDirection(OptimVariable.numCorrections)) .add(new CalcLosses(OptimMethod.LBFGS, numSearchStep)) .add(new AllReduce(OptimVariable.lossAllReduce)) .add(new UpdateModel(params, OptimVariable.grad, OptimMethod.LBFGS, numSearchStep)) .setCompareCriterionOfNode0(new IterTermination()) .closeWith(new OutputModel()) .setMaxIter(maxIter) .exec(); return model.mapPartition(new ParseRowModel()); }
因此咱們接下來的就是看Lbfgs,其重點就是參數更新的降低方向和搜索步長。
若是因爲全部輸入的數據都是相同維度的;算法過程當中不會對輸入修改,就能夠將這些輸入數據進行切分。這樣的話,應該能夠經過一次map-reduce來計算。
咱們理一下L-BFGS中能夠分佈式並行計算的步驟:
Alink中,使用AllReduce功能而非Flink原生Map / Reduce來完成了以上三點的並行計算和通訊。
線搜索只有兩步,肯定方向、肯定步長。肯定方向和模擬Hessian矩陣都須要計算梯度。目標函數的梯度向量計算中只須要進行向量間的點乘和相加,能夠很容易將每一個迭代過程拆分紅相互獨立的計算步驟,由不一樣的節點進行獨立計算,而後歸併計算結果。
Alink將樣本特徵向量分佈到不一樣的計算節點,由各計算節點完成本身所負責樣本的點乘與求和計算,而後將計算結果進行歸併,則實現了按行並行的LR。
實際狀況中也會存在針對高維特徵向量進行邏輯迴歸的場景,僅僅按行進行並行處理,沒法知足這類場景的需求,所以還須要按列將高維的特徵向量拆分紅若干小的向量進行求解。這個也許是Alink之後須要優化的一個點吧 。
CalcGradient是Alink迭代算子的派生類,函數總結以下:
objFunc.calcGradient
根據採樣點計算梯度。Alink這裏就是把x, y代入,求損失函數的梯度就是對 Coef求偏導數。具體咱們在前文已經提到。
public class CalcGradient extends ComputeFunction { /** * object function class, it supply the functions to calc local gradient (or loss). */ private OptimObjFunc objFunc; @Override public void calc(ComContext context) { Iterable<Tuple3<Double, Double, Vector>> labledVectors = context.getObj(OptimVariable.trainData); // 通過處理的輸入數據 labledVectors = {ArrayList@9877} size = 4 0 = {Tuple3@9895} "(1.0,16.8,1.0 1.0 1.4657097546055162 1.4770978917519928)" 1 = {Tuple3@9896} "(1.0,6.7,1.0 1.0 -0.338240712601273 -0.7385489458759964)" 2 = {Tuple3@9897} "(1.0,6.9,1.0 1.0 -0.7892283294029703 -0.3692744729379982)" 3 = {Tuple3@9898} "(1.0,8.0,1.0 1.0 -0.338240712601273 -0.3692744729379982)" // get iterative coefficient from static memory. Tuple2<DenseVector, Double> state = context.getObj(OptimVariable.currentCoef); int size = state.f0.size(); // 是Coef,1.7976931348623157E308是默認最大值 state = {Tuple2@9878} "(0.001 0.0 0.0 0.0,1.7976931348623157E308)" f0 = {DenseVector@9879} "0.001 0.0 0.0 0.0" f1 = {Double@9889} 1.7976931348623157E308 DenseVector coef = state.f0; if (objFunc == null) { objFunc = ((List<OptimObjFunc>)context.getObj(OptimVariable.objFunc)).get(0); } // 變量以下 objFunc = {UnaryLossObjFunc@9882} unaryLossFunc = {SquareLossFunc@9891} l1 = 0.0 l2 = 0.0 params = {Params@9892} "Params {featureCols=["f0","f1","f2"], labelCol="label", predictionCol="linpred"}" Tuple2<DenseVector, double[]> grad = context.getObj(OptimVariable.dir); // 變量以下 grad = {Tuple2@9952} "(0.0 0.0 0.0 0.0,[0.0, 0.1])" f0 = {DenseVector@9953} "0.0 0.0 0.0 0.0" f1 = {double[2]@9969} coef = {DenseVector@9951} "0.001 0.0 0.0 0.0" data = {double[4]@9982} // calculate local gradient,使用目標函數 Double weightSum = objFunc.calcGradient(labledVectors, coef, grad.f0); // prepare buffer vec for allReduce. the last element of vec is the weight Sum. double[] buffer = context.getObj(OptimVariable.gradAllReduce); if (buffer == null) { buffer = new double[size + 1]; context.putObj(OptimVariable.gradAllReduce, buffer); } for (int i = 0; i < size; ++i) { buffer[i] = grad.f0.get(i) * weightSum; } /* the last element is the weight value */ buffer[size] = weightSum; } } // 最後結果是 buffer = {double[5]@9910} 0 = -38.396 1 = -38.396 2 = -14.206109929253465 3 = -14.364776997288134 4 = 0.0
這裏是前面提到的 "經過聚合函數,對全部計算樣本的特徵的梯度進行累加,獲得每個特徵的累積梯度以及損失"。
具體關於AllReduce如何運做,能夠參見文章 [Alink漫談之三] AllReduce通訊模型
.add(new AllReduce(OptimVariable.gradAllReduce))
此時獲得的梯度,已是聚合以後的,因此能夠開始計算方向。
OptimVariable.grad 是預先分配的。
public class PreallocateSkyk extends ComputeFunction { private int numCorrections; /** * prepare hessian matrix of lbfgs method. we allocate memory fo sK, yK at first iteration step. * * @param context context of iteration. */ @Override public void calc(ComContext context) { if (context.getStepNo() == 1) { Tuple2<DenseVector, double[]> grad = context.getObj(OptimVariable.grad); int size = grad.f0.size(); DenseVector[] sK = new DenseVector[numCorrections]; DenseVector[] yK = new DenseVector[numCorrections]; for (int i = 0; i < numCorrections; ++i) { sK[i] = new DenseVector(size); yK[i] = new DenseVector(size); } context.putObj(OptimVariable.sKyK, Tuple2.of(sK, yK)); } } }
在計算的過程當中,須要不斷的計算和存儲歷史的Hessian矩陣,在L-BFGS算法,但願只保留最近的m次迭代信息,便可以擬合Hessian矩陣。在L-BFGS算法中,再也不保存完整的Hk,而是存儲向量序列{sk}和{yk},須要矩陣時Hk,使用向量序列{sk}和{yk}計算就能夠獲得,而向量序列{sk}和{yk}也不是全部都要保存,只要保存最新的m步向量便可。
具體原理和公式這裏再也不贅述,網上不少文章講解很是好。
重點說明,dir的各個數據用途是:
dir = {Tuple2@9931} "(-9.599 -9.599 -3.5515274823133662 -3.5911942493220335,[4.0, 0.1])" f0 = {DenseVector@9954} "-9.599 -9.599 -3.5515274823133662 -3.5911942493220335" //梯度 f1 = {double[2]@9938} 0 = 4.0 //權重 1 = 0.1 //學習率 learning rate,0.1是初始化數值,後續UpdateModel時候會更新
代碼摘要以下:
@Override public void calc(ComContext context) { Tuple2 <DenseVector, double[]> grad = context.getObj(OptimVariable.grad); Tuple2 <DenseVector, double[]> dir = context.getObj(OptimVariable.dir); Tuple2 <DenseVector[], DenseVector[]> hessian = context.getObj(OptimVariable.sKyK); int size = grad.f0.size(); // gradarr是上一階段CalcGradient的結果 double[] gradarr = context.getObj(OptimVariable.gradAllReduce); // 變量爲 gradarr = {double[5]@9962} 0 = -38.396 1 = -38.396 2 = -14.206109929253465 3 = -14.364776997288134 4 = 4.0 if (this.oldGradient == null) { oldGradient = new DenseVector(size); } // hessian用來看成隊列,存儲sK,yK,只保留最近m個 DenseVector[] sK = hessian.f0; DenseVector[] yK = hessian.f1; for (int i = 0; i < size; ++i) { //gradarr[size]是權重 grad.f0.set(i, gradarr[i] / gradarr[size]); //size = 4 } // 賦值梯度,這裏都除以權重 grad = {Tuple2@9930} "(-9.599 -9.599 -3.5515274823133662 -3.5911942493220335,[0.0])" f0 = {DenseVector@9937} "-9.599 -9.599 -3.5515274823133662 -3.5911942493220335" data = {double[4]@9963} 0 = -9.599 1 = -9.599 2 = -3.5515274823133662 3 = -3.5911942493220335 f1 = {double[1]@9961} 0 = 0.0 dir.f1[0] = gradarr[size]; //權重 int k = context.getStepNo() - 1; if (k == 0) { //首次迭代 dir.f0.setEqual(grad.f0); // 梯度賦予在這裏 oldGradient.setEqual(grad.f0); } else { yK[(k - 1) % m].setEqual(grad.f0); yK[(k - 1) % m].minusEqual(oldGradient); oldGradient.setEqual(grad.f0); } // copy g_k and store in qL dir.f0.setEqual(grad.f0); //拷貝梯度到這裏 // dir = {Tuple2@9931} "(-9.599 -9.599 -3.5515274823133662 -3.5911942493220335,[4.0, 0.1])" f0 = {DenseVector@9954} "-9.599 -9.599 -3.5515274823133662 -3.5911942493220335" //梯度 f1 = {double[2]@9938} 0 = 4.0 //權重 1 = 0.1 //學習率 learning rate,0.1是初始化數值 // compute H^-1 * g_k int delta = k > m ? k - m : 0; int l = k <= m ? k : m; // m = 10 if (alpha == null) { alpha = new double[m]; } // two-loop的過程,經過擬牛頓法計算Hessian矩陣 for (int i = l - 1; i >= 0; i--) { int j = (i + delta) % m; double dot = sK[j].dot(yK[j]); if (Math.abs(dot) > 0.0) { double rhoJ = 1.0 / dot; alpha[i] = rhoJ * (sK[j].dot(dir.f0)); // 計算alpha dir.f0.plusScaleEqual(yK[j], -alpha[i]); // 從新修正d } } for (int i = 0; i < l; i++) { int j = (i + delta) % m; double dot = sK[j].dot(yK[j]); if (Math.abs(dot) > 0.0) { double rhoJ = 1.0 / dot; double betaI = rhoJ * (yK[j].dot(dir.f0)); // 乘以rho dir.f0.plusScaleEqual(sK[j], (alpha[i] - betaI));// 從新修正d } } } //最後是存儲在 OptimVariable.dir
根據更新的 dir 和 當前係數 計算損失函數偏差值,這個損失是爲後續的線性搜索準備的。目的是若是損失函數偏差值達到容許的範圍,那麼中止迭代,不然重複迭代。
CalcLosses基本邏輯以下:
代碼以下:
public class CalcLosses extends ComputeFunction { @Override public void calc(ComContext context) { Iterable<Tuple3<Double, Double, Vector>> labledVectors = context.getObj(OptimVariable.trainData); Tuple2<DenseVector, double[]> dir = context.getObj(OptimVariable.dir); Tuple2<DenseVector, Double> coef = context.getObj(OptimVariable.currentCoef); if (objFunc == null) { objFunc = ((List<OptimObjFunc>)context.getObj(OptimVariable.objFunc)).get(0); } /** * calculate losses of current coefficient. * if optimizer is owlqn, constraint search will used, else without constraint. */ Double beta = dir.f1[1] / numSearchStep; double[] vec = method.equals(OptimMethod.OWLQN) ? objFunc.constraintCalcSearchValues(labledVectors, coef.f0, dir.f0, beta, numSearchStep) : objFunc.calcSearchValues(labledVectors, coef.f0, dir.f0, beta, numSearchStep); // 變量爲 dir = {Tuple2@9988} "(-9.599 -9.599 -3.5515274823133662 -3.5911942493220335,[4.0, 0.1])" coef = {Tuple2@9989} "(0.001 0.0 0.0 0.0,1.7976931348623157E308)" beta = {Double@10014} 0.025 vec = {double[5]@10015} 0 = 0.0 1 = 0.0 2 = 0.0 3 = 0.0 4 = 0.0 // prepare buffer vec for allReduce. double[] buffer = context.getObj(OptimVariable.lossAllReduce); if (buffer == null) { buffer = vec.clone(); context.putObj(OptimVariable.lossAllReduce, buffer); } else { System.arraycopy(vec, 0, buffer, 0, vec.length); } } }
其中搜索是一元目標函數提供的,其又調用了損失函數。
public class UnaryLossObjFunc extends OptimObjFunc { /** * Calculate loss values for line search in optimization. * * @param labelVectors train data. * @param coefVector coefficient of current time. * @param dirVec descend direction of optimization problem. * @param beta step length of line search. * @param numStep num of line search step. * @return double[] losses. */ @Override public double[] calcSearchValues(Iterable<Tuple3<Double, Double, Vector>> labelVectors, DenseVector coefVector, DenseVector dirVec, double beta, int numStep) { double[] vec = new double[numStep + 1]; // labelVector是三元組Tuple3<weight, label, feature vector> labelVectors = {ArrayList@10007} size = 4 0 = {Tuple3@10027} "(1.0,16.8,1.0 1.0 1.4657097546055162 1.4770978917519928)" 1 = {Tuple3@10034} "(1.0,6.7,1.0 1.0 -0.338240712601273 -0.7385489458759964)" 2 = {Tuple3@10035} "(1.0,6.9,1.0 1.0 -0.7892283294029703 -0.3692744729379982)" 3 = {Tuple3@10036} "(1.0,8.0,1.0 1.0 -0.338240712601273 -0.3692744729379982)" coefVector = {DenseVector@10008} "0.001 0.0 0.0 0.0" dirVec = {DenseVector@10009} "-9.599 -9.599 -3.5515274823133662 -3.5911942493220335" beta = 0.025 numStep = 4 vec = {double[5]@10026} 0 = 0.0 1 = 0.0 2 = 0.0 3 = 0.0 4 = 0.0 for (Tuple3<Double, Double, Vector> labelVector : labelVectors) { double weight = labelVector.f0; //用x-vec和coef計算出來的 Y double etaCoef = getEta(labelVector, coefVector); //以x-vec和dirVec計算出來的 deltaY double etaDelta = getEta(labelVector, dirVec) * beta; weight = 1.0 etaCoef = 0.001 etaDelta = -0.7427013482280431 for (int i = 0; i < numStep + 1; ++i) { //labelVector.f1就是label y //聯繫下面損失函數可知,etaCoef - i * etaDelta, labelVector.f1 是 訓練數據預測值 與 實際類別 的誤差 vec[i] += weight * this.unaryLossFunc.loss(etaCoef - i * etaDelta, labelVector.f1); } } return vec; } private double getEta(Tuple3<Double, Double, Vector> labelVector, DenseVector coefVector) { //labelVector.f2 = {DenseVector@9972} "1.0 1.0 1.4657097546055162 1.4770978917519928" return MatVecOp.dot(labelVector.f2, coefVector); } } vec = {double[5]@10026} 0 = 219.33160199999998 1 = 198.85962729259512 2 = 179.40202828917856 3 = 160.95880498975038 4 = 143.52995739431051
回顧損失函數以下
/** * Squared loss function. * https://en.wikipedia.org/wiki/Loss_functions_for_classification#Square_loss */ public class SquareLossFunc implements UnaryLossFunc { public SquareLossFunc() { } @Override public double loss(double eta, double y) { return 0.5 * (eta - y) * (eta - y); } @Override public double derivative(double eta, double y) { return eta - y; } @Override public double secondDerivative(double eta, double y) { return 1; } }
本模塊作兩件事
由於變量太多,因此有時候就忘記誰是誰了,因此再次標示。
代碼邏輯大體以下:
在這裏求步長,我沒有發現 Wolf-Powell 準則的使用,Alink作了某種優化。若是有朋友能看出來Wolf-Powell如何使用,還請留言指點 ,謝謝。
代碼以下:
public class UpdateModel extends ComputeFunction { @Override public void calc(ComContext context) { double[] losses = context.getObj(OptimVariable.lossAllReduce); Tuple2<DenseVector, double[]> dir = context.getObj(OptimVariable.dir); Tuple2<DenseVector, double[]> pseGrad = context.getObj(OptimVariable.pseGrad); Tuple2<DenseVector, Double> curCoef = context.getObj(OptimVariable.currentCoef); Tuple2<DenseVector, Double> minCoef = context.getObj(OptimVariable.minCoef); double lossChangeRatio = 1.0; double[] lossCurve = context.getObj(OptimVariable.lossCurve); for (int j = 0; j < losses.length; ++j) { losses[j] /= dir.f1[0]; //dir.f1[0]是權重 } int pos = -1; //get the min value of losses, and remember the position. for (int j = 0; j < losses.length; ++j) { if (losses[j] < losses[0]) { losses[0] = losses[j]; pos = j; } } // adaptive learningRate strategy double beta = dir.f1[1] / numSearchStep; double eta; // 變量以下 losses = {double[5]@10001} 0 = 35.88248934857763 1 = 49.71490682314878 2 = 44.85050707229464 3 = 40.239701247437594 4 = 35.88248934857763 dir = {Tuple2@10002} "(-9.599 -9.599 -3.5515274823133662 -3.5911942493220335,[4.0, 0.1])" pseGrad = null curCoef = {Tuple2@10003} "(0.001 0.0 0.0 0.0,1.7976931348623157E308)" minCoef = {Tuple2@10004} "(0.001 0.0 0.0 0.0,1.7976931348623157E308)" lossChangeRatio = 1.0 lossCurve = {double[100]@10005} pos = 4 beta = 0.025 curCoef.f1 = {Double@10006} 1.7976931348623157E308 if (pos == -1) { /** * if all losses larger than last loss value. we'll do the below things: * 1. reduce learning rate by multiply 1.0 / (numSearchStep*numSearchStep). * 2. set eta with zero. * 3. set current loss equals last loss value. */ eta = 0; dir.f1[1] *= 1.0 / (numSearchStep * numSearchStep); // 學習率 curCoef.f1 = losses[0]; // 最小loss } else if (pos == numSearchStep) { /** * if losses[numSearchStep] smaller than last loss value. we'll do the below things: * 1. enlarge learning rate by multiply numSearchStep. * 2. set eta with the smallest value pos. * 3. set current loss equals smallest loss value. */ eta = beta * pos; dir.f1[1] *= numSearchStep; dir.f1[1] = Math.min(dir.f1[1], numSearchStep); // 學習率 lossChangeRatio = Math.abs((curCoef.f1 - losses[pos]) / curCoef.f1); curCoef.f1 = losses[numSearchStep]; // 最小loss // 當前數值 numSearchStep = 4 是NUM_SEARCH_STEP缺省值,是線性搜索的參數,Line search parameter, which define the search value num of one step. lossChangeRatio = 1.0 pos = 4 eta = 0.1 curCoef.f1 = {Double@10049} 35.88248934857763 dir.f1[1] = 0.4 } else { /** * else : * 1. learning rate not changed. * 2. set eta with the smallest value pos. * 3. set current loss equals smallest loss value. */ eta = beta * pos; lossChangeRatio = Math.abs((curCoef.f1 - losses[pos]) / curCoef.f1); curCoef.f1 = losses[pos]; // 最小loss } /* update loss value in loss curve at this step */ lossCurve[context.getStepNo() - 1] = curCoef.f1; lossCurve = {double[100]@9998} 0 = 35.88248934857763 1 = Infinity if (method.equals(OptimMethod.OWLQN)) { ..... } else if (method.equals(OptimMethod.LBFGS)) { Tuple2<DenseVector[], DenseVector[]> sKyK = context.getObj(OptimVariable.sKyK); int size = dir.f0.size(); int k = context.getStepNo() - 1; DenseVector[] sK = sKyK.f0; for (int s = 0; s < size; ++s) { // 修正矩陣 sK[k % OptimVariable.numCorrections].set(s, dir.f0.get(s) * (-eta)); } curCoef.f0.plusScaleEqual(dir.f0, -eta); // 這裏是用方向向量和步長來更新系數向量 sKyK = {Tuple2@10043} "([0.9599000000000001 0.9599000000000001 0.35515274823133663 0.3591194249322034, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0],[0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0])" f0 = {DenseVector[10]@10044} 0 = {DenseVector@10074} "0.9599000000000001 0.9599000000000001 0.35515274823133663 0.3591194249322034" } /** * if current loss is smaller than min loss, then update the min loss and min coefficient by current. */ if (curCoef.f1 < minCoef.f1) { minCoef.f1 = curCoef.f1; // 最小loss minCoef.f0.setEqual(curCoef.f0); // 最小loss所對應的Coef,即線性模型最後須要的係數 } curCoef = {Tuple2@9996} "(0.9609000000000001 0.9599000000000001 0.35515274823133663 0.3591194249322034,35.88248934857763)" f0 = {DenseVector@10059} "0.9609000000000001 0.9599000000000001 0.35515274823133663 0.3591194249322034" f1 = {Double@10048} 35.88248934857763 minCoef = {Tuple2@9997} "(0.9609000000000001 0.9599000000000001 0.35515274823133663 0.3591194249322034,35.88248934857763)" f0 = {DenseVector@10059} "0.9609000000000001 0.9599000000000001 0.35515274823133663 0.3591194249322034" f1 = {Double@10048} 35.88248934857763 // judge the convergence of iteration. filter(dir, curCoef, minCoef, context, lossChangeRatio); } }
filter 判斷是否收斂,裏面的打印log很清晰的說明了函數邏輯。
/** * judge the convergence of iteration. */ public void filter(Tuple2<DenseVector, double[]> grad, Tuple2<DenseVector, Double> c, Tuple2<DenseVector, Double> m, ComContext context, double lossChangeRatio) { double epsilon = params.get(HasEpsilonDv0000001.EPSILON); int maxIter = params.get(HasMaxIterDefaultAs100.MAX_ITER); double gradNorm = ((Tuple2<DenseVector, double[]>)context.getObj(gradName)).f0.normL2(); if (c.f1 < epsilon || gradNorm < epsilon) { printLog(" method converged at step : ", c.f1, m.f1, grad.f1[1], gradNorm, context, lossChangeRatio); grad.f1[0] = -1.0; } else if (context.getStepNo() > maxIter - 1) { printLog(" method stop at max step : ", c.f1, m.f1, grad.f1[1], gradNorm, context, lossChangeRatio); grad.f1[0] = -1.0; } else if (grad.f1[1] < EPS) { printLog(" learning rate is too small, method stops at step : ", c.f1, m.f1, grad.f1[1], gradNorm, context, lossChangeRatio); grad.f1[0] = -1.0; } else if (lossChangeRatio < epsilon && gradNorm < Math.sqrt(epsilon)) { printLog(" loss change ratio is too small, method stops at step : ", c.f1, m.f1, grad.f1[1], gradNorm, context, lossChangeRatio); grad.f1[0] = -1.0; } else { printLog(" method continue at step : ", c.f1, m.f1, grad.f1[1], gradNorm, context, lossChangeRatio); } }
.closeWith(new OutputModel())
這部分是每次迭代結束,臨時輸出模型,把數據轉換成Flink通用的Row類型,Transfer the state to model rows。
public class OutputModel extends CompleteResultFunction { @Override public List <Row> calc(ComContext context) { // get the coefficient of min loss. Tuple2 <DenseVector, double[]> minCoef = context.getObj(OptimVariable.minCoef); double[] lossCurve = context.getObj(OptimVariable.lossCurve); int effectiveSize = lossCurve.length; for (int i = 0; i < lossCurve.length; ++i) { if (Double.isInfinite(lossCurve[i])) { effectiveSize = i; break; } } double[] effectiveCurve = new double[effectiveSize]; System.arraycopy(lossCurve, 0, effectiveCurve, 0, effectiveSize); Params params = new Params(); params.set(ModelParamName.COEF, minCoef.f0);// 重點在這裏,minCoef是咱們真正須要的 params.set(ModelParamName.LOSS_CURVE, effectiveCurve); List <Row> model = new ArrayList <>(1); model.add(Row.of(params.toJson())); return model; } }
這裏設置了並行度爲1。
// Prepare the meta info of linear model. DataSet<Params> meta = labelInfo.f0 .mapPartition(new CreateMeta(modelName, linearModelType, isRegProc, params)) .setParallelism(1);
具體代碼
public static class CreateMeta implements MapPartitionFunction<Object, Params> { @Override public void mapPartition(Iterable<Object> rows, Collector<Params> metas) throws Exception { Object[] labels = null; if (!this.isRegProc) { labels = orderLabels(rows); } Params meta = new Params(); meta.set(ModelParamName.MODEL_NAME, this.modelName); meta.set(ModelParamName.LINEAR_MODEL_TYPE, this.modelType); meta.set(ModelParamName.LABEL_VALUES, labels); meta.set(ModelParamName.HAS_INTERCEPT_ITEM, this.hasInterceptItem); meta.set(ModelParamName.VECTOR_COL_NAME, vectorColName); meta.set(LinearTrainParams.LABEL_COL, labelName); metas.collect(meta); } } // 變量爲 meta = {Params@9667} "Params {hasInterceptItem=true, vectorColName=null, modelName="Linear Regression", labelValues=null, labelCol="label", linearModelType="LinearReg"}"
當迭代循環結束以後,Alink就根據Coef數據來創建模型。
/** * build the linear model rows, the format to be output. */ public static class BuildModelFromCoefs extends AbstractRichFunction implements @Override public void mapPartition(Iterable<Tuple2<DenseVector, double[]>> iterable, Collector<Row> collector) throws Exception { for (Tuple2<DenseVector, double[]> coefVector : iterable) { LinearModelData modelData = buildLinearModelData(meta, featureNames, labelType, meanVar, hasIntercept, standardization, coefVector); new LinearModelDataConverter(this.labelType).save(modelData, collector); } } }
獲得模型數據爲,裏面coef就是 f(x)=w^Tx+b 中的 w, b。是最終用來計算的。
modelData = {LinearModelData@10584} featureNames = {String[3]@9787} featureTypes = null vectorColName = null coefVector = {DenseVector@10485} "-3.938937407856857 4.799499941426075 0.8929571907809862 1.078169576770847" coefVectors = null vectorSize = 3 modelName = "Linear Regression" labelName = null labelValues = null linearModelType = {LinearModelType@4674} "LinearReg" hasInterceptItem = true lossCurve = {double[12]@10593} 0 = 35.88248934857763 1 = 12.807366842002144 2 = 0.5228366663917704 3 = 0.031112070740366038 4 = 0.01098914933042993 5 = 0.009765757443537283 6 = 0.008750523231785415 7 = 0.004210085397869248 8 = 0.0039042232755530704 9 = 0.0038821509860327537 10 = 0.003882042680010676 11 = 0.0038820422536391033 labelType = {FractionalTypeInfo@9790} "Double"
預測時候,使用的是LinearModelMapper,其內部部分變量打印出來以下,可以看出來模型數據。
this = {LinearModelMapper@10704} vectorColIndex = -1 model = {LinearModelData@10597} featureNames = {String[3]@10632} featureTypes = null vectorColName = null coefVector = {DenseVector@10612} "-3.938937407856857 4.799499941426075 0.8929571907809862 1.078169576770847" coefVectors = null vectorSize = 0 modelName = "Linear Regression" labelName = null labelValues = {Object[0]@10613} linearModelType = {LinearModelType@10611} "LinearReg" hasInterceptItem = true lossCurve = null labelType = null
具體預測是在 LinearModelMapper.predict
中完成,具體以下:
Row.of("$3$0:1.0 1:7.0 2:9.0", "1.0 7.0 9.0", 1.0, 7.0, 9.0, 16.8),
,FeatureLabelUtil.getFeatureVector
處理以後,"1.0 1.0 7.0 9.0"
,其中第一個 1.0 是經過 aVector.set(0, 1.0);
專門設定的固定值。好比模型是 f(x) = ax + b,這個固定值 1.0 就是 b 的初始化值,隨着優化過程會獲得 b。因此這裏仍是須要有一個 1.0 來進行預測。"-3.938937407856857 4.799499941426075 0.8929571907809862 1.078169576770847"
。dotValue = 16.814789059973744
。這樣就能看出來模型係數如何使用的了。
public Object predict(Vector vector) throws Exception { double dotValue = MatVecOp.dot(vector, model.coefVector); switch (model.linearModelType) { case LR: case SVM: case Perceptron: return dotValue >= 0 ? model.labelValues[0] : model.labelValues[1]; case LinearReg: case SVR: return dotValue; } } vector = {DenseVector@10610} "1.0 1.0 7.0 9.0" data = {double[4]@10619} 0 = 1.0 1 = 1.0 2 = 7.0 3 = 9.0 model.coefVector = {DenseVector@10612} "-3.938937407856857 4.799499941426075 0.8929571907809862 1.078169576770847" data = {double[4]@10620} 0 = -3.938937407856857 1 = 4.799499941426075 2 = 0.8929571907809862 3 = 1.078169576770847
Alink漫談(一) : 從KMeans算法實現不一樣看Alink設計思想
Alink漫談(二) : 從源碼看機器學習平臺Alink設計和架構
Alink漫談(八) : 二分類評估 AUC、K-S、PRC、Precision、Recall、LiftChart 如何實現
http://www.mamicode.com/info-detail-2508527.html)
LogisticRegressionWithLBFGS--邏輯迴歸
[原創]用「人話」解釋不精確線搜索中的Armijo-Goldstein準則及Wolfe-Powell準則
https://www.zhihu.com/question/36425542
https://zhuanlan.zhihu.com/p/32821110