Alink 是阿里巴巴基於實時計算引擎 Flink 研發的新一代機器學習算法平臺,是業界首個同時支持批式算法、流式算法的機器學習平臺。本文和下文將介紹在線學習算法FTRL在Alink中是如何實現的,但願對你們有所幫助。html
由於 Alink 實現的是 LR + FTRL,因此咱們須要從邏輯迴歸 LR 開始介紹。java
Logistic Regression 雖然被稱爲迴歸,但其其實是分類模型,並經常使用於二分類。Logistic 迴歸的本質是:假設數據服從這個分佈,而後使用極大似然估計作參數的估計。python
邏輯迴歸的思路是,先擬合決策邊界(不侷限於線性,還能夠是多項式),再創建這個邊界與分類的機率聯繫,從而獲得了二分類狀況下的機率。算法
咱們從線性迴歸開始提及。某些狀況下,使用線性的函數來擬合規律後取閾值的辦法是行不通的。行不通的緣由在於擬合的函數太直,離羣值(也叫異常值)對結果的影響過大。可是咱們的總體思路是沒有錯的,錯的是用太"直"的擬合函數,若是咱們用來擬合的函數是非線性的,不這麼直,是否是就好一些呢?機器學習
因此咱們下面來作兩件事:分佈式
對於第一件事,咱們用sigmod函數把迴歸函數掰彎。ide
對於二分類問題,1表示正例,0表示負例。邏輯迴歸是在線性函數輸出預測實際值的基礎上,尋找一個假設函數函數h_θ(x) = g(θ,x),將實際值映射到到0,1之間。邏輯迴歸中選擇對數概率函數(logistic function)做爲激活函數,對數概率函數是Sigmoid函數(形狀爲S的函數)的重要表明。函數
對於第二件事,咱們選定閾值是0.5。學習
意思就是,當我選閾值爲0.5,那麼小於0.5的必定是負例,哪怕他是0.49。此時咱們判斷一個樣本爲負例必定是準確的嗎?其實不必定,由於它仍是有49%的機率爲正例的。可是,即使它是正例的機率爲0.1,則咱們隨機選擇1w個樣原本作預測,仍是會有接近100個預測它是負例結果它實際是正例的偏差。不管怎麼選,偏差都是存在的,因此咱們選定閾值就是在選擇能夠接受偏差的程度。測試
到這裏,邏輯迴歸的由來咱們就基本理清楚了,咱們知道邏輯迴歸的判別函數就是
如何求解邏輯迴歸?也就是如何找到一組可讓 h(z) 全都預測正確機率最大的W。
求解邏輯迴歸的方法有很是多,咱們這裏主要聊下梯度降低和牛頓法。優化的主要目標是找到一個方向,參數朝這個方向移動以後使得損失函數的值可以減少,這個方向每每由一階偏導或者二階偏導各類組合求得。
梯度降低是經過 J(w) 對 w 的一階導數來找降低方向,而且以迭代的方式來更新參數。
牛頓法的基本思路是,在現有極小點估計值的附近對 J(w) 作二階泰勒展開,進而找到極小點的下一個估計值。
當樣本數據裏N很大的時候,一般採用的是隨機梯度降低法,算法以下所示:
while { for i in range(0,m): w_j = w_j + a * g_j }
隨機梯度降低的好處是能夠實現分佈式並行化,具體計算流程是:
從邏輯迴歸的求解方法中咱們能夠發現這些算法都是須要計算梯度的,所以邏輯迴歸的並行化最主要的就是對目標函數梯度計算的並行化。
咱們看到目標函數的梯度向量計算中只須要進行向量間的點乘和相加,能夠很容易將每一個迭代過程拆分紅相互獨立的計算步驟,由不一樣的節點進行獨立計算,而後歸併計算結果。
因此並行 LR 實際上就是在求解損失函數最優解的過程當中,針對尋找損失函數降低方向中的梯度方向計算做了並行化處理,而在利用梯度肯定降低方向的過程當中也能夠採用並行化。
若是將樣本矩陣按行劃分,將樣本特徵向量分佈到不一樣的計算節點,由各計算節點完成本身所負責樣本的點乘與求和計算,而後將計算結果進行歸併,則實現了「 按行 並行的LR」。
按行並行的LR解決了樣本數量的問題,可是實際狀況中會存在針對高維特徵向量進行邏輯迴歸的場景(如廣告系統中的特徵維度高達上億),僅僅按行進行並行處理,沒法知足這類場景的需求,所以還須要 按列 將高維的特徵向量拆分紅若干小的向量進行求解。
傳統的機器學習開發流程基本是如下步驟:
這種方式主要存在兩個瓶頸:
好比,傳統Batch算法中每次迭代對全體訓練數據集進行計算(例如計算全局梯度),優勢是精度和收斂還能夠,缺點是沒法有效處理大數據集(此時全局梯度計算代價太大),且無法應用於數據流作在線學習。
針對這些問題,通常而言有兩種解決方式:
在線學習 ( OnlineLearningOnlineLearning ) 表明了一系列機器學習算法,特色是每來一個樣本就能訓練,可以根據線上反饋數據,實時快速地進行模型調整,使得模型及時反映線上的變化,提升線上預測的準確率。
傳統的訓練方法在模型訓練上線後,通常是靜態的,不會與線上的情況有任何的互動,加入預測錯誤,只能在下一次更新的時候完成修正,可是這個更新的時間通常比較長。
Online Learning訓練方法不一樣,會根據線上的預測結果動態調整模型,加入模型預測錯誤,從而及時作出修正,所以Online Learning可以更加及時地反應線上變化。
Online Learning的優化目標是使得總體的損失函數最小化,它須要快速求解目標函數的最優解。
在線學習算法的特色是:每來一個訓練樣本,就用該樣本產生的loss和梯度對模型迭代一次,一個一個數據地進行訓練,所以能夠處理大數據量訓練和在線訓練。經常使用的有在線梯度降低(OGD)和隨機梯度降低(SGD)等,本質思想是對上面【問題描述】中的未加和的單個數據的loss函數 L(w,zi)作梯度降低,由於每一步的方向並非全局最優的,因此總體呈現出來的會是一個看似隨機的降低路線。
FTR是FTRL的前身,思想是每次找到讓以前全部樣本的損失函數之和最小的參數。
FTRL,即 Follow The Regularized Leader,是在以前的幾個工做上產生的,主要出發點就是爲了提升稀疏度且知足精度要求。FTRL 在FTL的優化目標的基礎上,加入了正則化,防止過擬合。
FTRL的損失函數通常也不容易求解,這種狀況下,通常須要找一個代理的損失函數。
代理損失函數須要知足如下條件:
爲了衡量條件2中的兩個解的差距,引入regret的概念。
通常對於在線學習來講,咱們致力於解決兩個問題: 下降 regret 和提升 sparsity。其中 regret 的定義爲:
其中 t 表示總共 T 輪中的第 t 輪迭代,ℓt 表示損失函數,w 表示要學習的參數。Regret 表示 "代理函數求出來的解" 離 "真正損失函數求出來的解" 的損失差距。
第二項 表示獲得了全部樣本後損失函數的最優解,由於在線學習一次只能根據少數幾個樣本更新參數,隨機性較大,因此須要一種穩健的優化方式,而 regret 字面意思是 「後悔度」,意即更新完不後悔。
在理論上能夠證實,若是一個在線學習算法能夠保證其 regret 是 t 的次線性函數,則:
那麼隨着訓練樣本的增多,在線學習出來的模型無限接近於最優模型。即隨着訓練樣本的增長,代理損失函數和原損失函數求出來的參數的實際損失值差距愈來愈小。而絕不意外的,FTRL 正是知足這一特性。
另外一方面,現實中對於 sparsity,也就是模型的稀疏性也很看重。上億的特徵並不鮮見,模型越複雜,須要的存儲、時間資源也隨之升高,而稀疏的模型會大大減小預測時的內存和複雜度。另外稀疏的模型相對可解釋性也較好,這也正是一般所說的 L1 正則化的優勢。
Per-Coordinate 意思是FTRL是對w每一維分開訓練更新的,每一維使用的是不一樣的學習速率,也是上面代碼中lamda2以前的那一項。與w全部特徵維度使用統一的學習速率相比,這種方法考慮了訓練樣本自己在不一樣特徵上分佈的不均勻性,若是包含w某一個維度特徵的訓練樣本不多,每個樣本都很珍貴,那麼該特徵維度對應的訓練速率能夠獨自保持比較大的值,每來一個包含該特徵的樣本,就能夠在該樣本的梯度上前進一大步,而不須要與其餘特徵維度的前進步調強行保持一致。
咱們再看看下一時刻的特徵權重的更新公式,增長理解(我我的以爲找到的這個解釋相對易於理解):
式中第一項是對損失函數的貢獻的一個估計,第二項是控制w(也就是model)在每次迭代中變化不要太大,第三項表明L1正則(得到稀疏解)。
咱們採用的就是Alink官方示例代碼。咱們能夠看到大體分爲幾部分:
你大概已經看出來了,爲了剖析FTRL,我前面作了不少工做......
public class FTRLExample { public static void main(String[] args) throws Exception { ...... // setup feature engineering pipeline Pipeline featurePipeline = new Pipeline() .add( new StandardScaler() // 標準縮放 .setSelectedCols(numericalColNames) ) .add( new FeatureHasher() // 特徵哈希 .setSelectedCols(selectedColNames) .setCategoricalCols(categoryColNames) .setOutputCol(vecColName) .setNumFeatures(numHashFeatures) ); // 構建特徵工程流水線 // fit feature pipeline model PipelineModel featurePipelineModel = featurePipeline.fit(trainBatchData); // prepare stream train data CsvSourceStreamOp data = new CsvSourceStreamOp() .setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/data-files/avazu-ctr-train-8M.csv") .setSchemaStr(schemaStr) .setIgnoreFirstLine(true); // 對於流數據源進行實時切分獲得原始訓練數據和原始預測數據 // split stream to train and eval data SplitStreamOp splitter = new SplitStreamOp().setFraction(0.5).linkFrom(data); // 訓練出一個邏輯迴歸模型做爲FTRL算法的初始模型,這是爲了系統冷啓動的須要。 // train initial batch model LogisticRegressionTrainBatchOp lr = new LogisticRegressionTrainBatchOp() .setVectorCol(vecColName) .setLabelCol(labelColName) .setWithIntercept(true) .setMaxIter(10); BatchOperator<?> initModel = featurePipelineModel.transform(trainBatchData).link(lr); // 在初始模型基礎上進行FTRL在線訓練 // ftrl train FtrlTrainStreamOp model = new FtrlTrainStreamOp(initModel) .setVectorCol(vecColName) .setLabelCol(labelColName) .setWithIntercept(true) .setAlpha(0.1) .setBeta(0.1) .setL1(0.01) .setL2(0.01) .setTimeInterval(10) .setVectorSize(numHashFeatures) .linkFrom(featurePipelineModel.transform(splitter)); // 在FTRL在線模型的基礎上,鏈接預測數據進行預測 // ftrl predict FtrlPredictStreamOp predictResult = new FtrlPredictStreamOp(initModel) .setVectorCol(vecColName) .setPredictionCol("pred") .setReservedCols(new String[]{labelColName}) .setPredictionDetailCol("details") .linkFrom(model, featurePipelineModel.transform(splitter.getSideOutput(0))); // 對預測結果流進行評估 // ftrl eval predictResult .link( new EvalBinaryClassStreamOp() .setLabelCol(labelColName) .setPredictionCol("pred") .setPredictionDetailCol("details") .setTimeInterval(10) ) .link( new JsonValueStreamOp() .setSelectedCol("Data") .setReservedCols(new String[]{"Statistics"}) .setOutputCols(new String[]{"Accuracy", "AUC", "ConfusionMatrix"}) .setJsonPath(new String[]{"$.Accuracy", "$.AUC", "$.ConfusionMatrix"}) ) .print(); StreamOperator.execute(); } }
用問題來引導剖析比較好,如下是咱們容易想到的一些問題。
後續咱們會一一探究這些問題。
在線訓練是在 FtrlTrainStreamOp 類中實現的,其 linkFrom 函數實現了基本邏輯。
主要邏輯是:
代碼摘要是:
@Override public FtrlTrainStreamOp linkFrom(StreamOperator<?>... inputs) { ...... // 3)獲取切分信息 final int[] splitInfo = getSplitInfo(featureSize, hasInterceptItem, parallelism); DataStream<Row> initData = inputs[0].getDataStream(); // 4)切分高維向量。 // Tuple5<SampleId, taskId, numSubVec, SubVec, label> DataStream<Tuple5<Long, Integer, Integer, Vector, Object>> input = initData.flatMap(new SplitVector(splitInfo, hasInterceptItem, vectorSize, vectorTrainIdx, featureIdx, labelIdx)) .partitionCustom(new CustomBlockPartitioner(), 1); // train data format = <sampleId, subSampleTaskId, subNum, SparseVector(subSample), label> // feedback format = Tuple7<sampleId, subSampleTaskId, subNum, SparseVector(subSample), label, wx, timeStamps> // 5)構建一個 IterativeStream.ConnectedIterativeStreams iteration,這樣會構建(或者說鏈接)兩個數據流:反饋流和訓練流; IterativeStream.ConnectedIterativeStreams<Tuple5<Long, Integer, Integer, Vector, Object>, Tuple7<Long, Integer, Integer, Vector, Object, Double, Long>> iteration = input.iterate(Long.MAX_VALUE) .withFeedbackType(TypeInformation .of(new TypeHint<Tuple7<Long, Integer, Integer, Vector, Object, Double, Long>>() {})); // 6)用iteration來構建迭代體 iterativeBody,其包括兩部分:CalcTask,ReduceTask; DataStream iterativeBody = iteration.flatMap( new CalcTask(dataBridge, splitInfo, getParams())) .keyBy(0) .flatMap(new ReduceTask(parallelism, splitInfo)) .partitionCustom(new CustomBlockPartitioner(), 1); // 7)result = iterativeBody.filter;基本是以時間間隔爲標準來判斷(也能夠認爲是時間驅動),"時間未過時&向量有意義" 的數據將被髮送回反饋數據流,繼續迭代,回到步驟 6),進入flatMap2; DataStream<Tuple7<Long, Integer, Integer, Vector, Object, Double, Long>> result = iterativeBody.filter( new FilterFunction<Tuple7<Long, Integer, Integer, Vector, Object, Double, Long>>() { @Override public boolean filter(Tuple7<Long, Integer, Integer, Vector, Object, Double, Long> t3) throws Exception { // if t3.f0 > 0 && t3.f2 > 0 then feedback return (t3.f0 > 0 && t3.f2 > 0); } }); // 8)output = iterativeBody.filter;符合標準(時間過時了)的數據將跳出迭代,而後算法會調用WriteModel將LineModelData轉換爲多條Row,轉發給下游operator(也就是在線預測階段);即定時把模型更新給在線預測階段。 DataStream<Row> output = iterativeBody.filter( new FilterFunction<Tuple7<Long, Integer, Integer, Vector, Object, Double, Long>>() { @Override public boolean filter(Tuple7<Long, Integer, Integer, Vector, Object, Double, Long> value) throws Exception { /* if value.f0 small than 0, then output */ return value.f0 < 0; } }).flatMap(new WriteModel(labelType, getVectorCol(), featureCols, hasInterceptItem)); // 指定了某個流將成爲迭代程序的結束,而且這個流將做爲輸入的第二部分(second input)被反饋回迭代 iteration.closeWith(result); TableSchema schema = new LinearModelDataConverter(labelType).getModelSchema(); ...... this.setOutput(output, names, types); return this; }
爲了方便閱讀,咱們給出流程圖以下(這裏省略了split 訓練數據集/測試數據集):
原諒我用這種辦法畫圖,由於我最討厭看到一篇好文,結果發現圖沒了…
-------------------------------------------------------------------------------------------- │ 初始模型訓練階段 │ │ │ ┌─────────────────┐ ┌─────────────────┐ │ trainBatchData │ │ trainStreamData │ └─────────────────┘ └─────────────────┘ │ │ │ │ ┌──────────────────┐ │ │ featurePipeline │ │ └──────────────────┘ │ │ │ │ │ ┌─────────────┐ │ │ 線性迴歸模型 │ │ └─────────────┘ │ │ │ │ │ -------------------------------------------------------------------------------------------- │ 在線訓練階段 │ │ │ ┌─────────────┐ ┌──────────────────┐ │ dataBridge │ 加載初始化模型 │ featurePipeline │ └─────────────┘ └──────────────────┘ │ │ │ │ │ │ ┌─────────────┐ ┌──────────────────────────┐ │ 獲取切分信息 │ getSplitInfo │ inputs[0].getDataStream()│ └─────────────┘ └──────────────────────────┘ │ │ │ │ │ │ │ SplitInfo │ │ │ │ │ ┌──────────────────────────┐ 特徵向量 │ │ SplitVector │ <--------------------------│ └──────────────────────────┘ │ │ 解析輸入,獲得DataStream<Tuple5<SampleId, taskId, numSubVec, SubVec, label>> input │ │ ┌───────────────────────────┐ │ <Tuple5,Tuple7> iteration │ 迭代構建,兩個輸入train data Tuple5<>,feedback data Tuple7<> └───────────────────────────┘ │ │ CalcTask從邏輯上分紅兩個模塊:flatMap1, flatMap2 │ │ ┌───────────────────┐ ┌───────────────────┐ │ CalcTask.flatMap1 │ 輸入Tuple5<> │CalcTask.flatMap2 │ 輸入Tuple7 <--------------- └───────────────────┘ └───────────────────┘ │ │ 分佈計算FTRL算法中的predict部分 │ 分佈處理反饋數據/更新參數/累積參數到期後發出 │ │ │ │ │ │ │ │<----------------------------------------- │ │ 以上兩個flatmap都輸出到下面ReduceTask │ │ │ │ │ ┌──────────────────────┐ │ │ ReduceTask.flatMap │ 1. 若是時間過時&所有收集完成,歸併/輸出模型(value.f0 < 0) │ └──────────────────────┘ 2. 未過時,歸併每一個CalcTask計算的predict,造成一個 lable y │ │ │ │ │ ┌────────────────────┐ │ │ result = filter │ if t3.f0 > 0 && t3.f2 > 0 or not ? │ └────────────────────┘ │ │ │ │ │ │ │ │ if t3.f0 > 0 && t3.f2 > 0 then ┌───────────────────┐ │ │------------------------------------------>│CalcTask.flatMap2 │輸出Tuple7 --------- │ "時間未過時&向量有意義" 將送回反饋,繼續迭代 └───────────────────┘ │ │ │ 若是未造成反饋數據流,則繼續過濾 │ │ ┌────────────────────┐ │ output = filter │ if value.f0 small than 0 or not ? └────────────────────┘ │ │ │ if value.f0 small than 0, then output │ 符合標準(時間過時了)的數據將跳出迭代,輸出模型 │ │ ┌────────────┐ │ WriteModel │ 由於filter out,因此按期輸出模型 └────────────┘ │ │ -------------------------------------------------------------------------------------------- │ 在線預測階段 │ │ ┌─────────────────┐ │ │ testStreamData │ │ └─────────────────┘ │ │ │ │ │ │ ┌──────────────┐ ┌──────────────────┐ │ FTRL Predict │ <----------------------------│ featurePipeline │ └──────────────┘ └──────────────────┘
由於上圖在手機上會變形,如下這個圖爲你們在手機上瀏覽。
在線機器學習FTRL(Follow-the-regularized-Leader)算法介紹