機器學習及flinkML算法學習

參考文章:機器學習及flinkML算法學習javascript

基於Flink流處理的動態實時億級全端用戶畫像系統html

 

機器學習概念

機器學習算法根據訓練數據(training data)使得表示算法行爲的數學目標最大化,並以此來進行預測或者作出決定。機器學習分爲分類、迴歸、聚類等,每種都有不同的目標。java

應用場景和處理流程

  • 全部的算法都須要定義每一個數據點的特徵(feature)集->輸入;
  • 正確的定義特徵纔是機器學習中最有挑戰的部分。
  • 大多數算法都是專爲數據特徵(就是一個表明各個特徵值的數字向量)定義的,所以提取特徵並轉化爲特徵向量是機器學習過程當中重要的一步。
  • 輸入數據分爲「訓練集」和「測試集」,而且只使用前者進行訓練,這樣就能夠用後者來檢驗模型是否過分擬合了訓練數據。
  • 機器學習流水線會訓練出多個不一樣版本的模型,而後分別對其進行評估。Ml提供幾個算法進行模型評估。

常見的算法

分類算法 

基於已經被標註的其餘數據點做爲例子來識別一個數據點屬於幾個類別中的哪種;好比判斷一封郵件是否爲垃圾郵件。
垃圾郵件分類作法:
算法

  1. HashingTF

文本數據構建詞頻特徵向量apache

  1. LogisticRegressionWithSGD

使用隨機梯度降低法實現邏輯迴歸。api


監督學習

  • SVM使用通訊高效的分佈式雙座標上升(CoCoA)
  • 多元線性迴歸
  • 優化框架
  • L-BFGS
  • Generalized Linear Models
  • Multiple linear regression
  • LASSO, Ridge regression
  • Multi-class Logistic regression
  • Random forests
  • Support Vector Machines
  • Decision trees

無監督學習

  • k-最近鄰居關聯
  • Unsupervised learning
  • Clustering
  • K-means clustering
  • Principal Components Analysis
  • Recommendation
  • ALS
  • Text analytics
  • LDA

數據預處理

  • 多項式特徵
  • 標準尺度
    MinMax Scaler

降維處理

  • 模型選擇和性能評估
  • 多種評分功能模型評估
  • 模型選擇和評估的交叉驗證

flinkML目前支持的算法

監督學習

  • 基於通訊效率的SVM分佈式雙座標提高(CoCoA)
  • 多元線性迴歸
  • 優化框架

非監督學習

  • kNN(K最鄰近算法)算法

數據預處理

  • 多項式特徵
  • 標準定標器
  • 極大極小標量

推薦算法

  • 交替最小二乘法(ALS)

離羣值選擇

  • 隨機離羣點選擇

公用事業公司

  • 距離度量
  • 交叉驗證

分類代碼開始

  • 添加FlinkML依賴到 pom.xml.
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-ml_2.11</artifactId>
  <version>1.8.0</version>
</dependency>

1. 導入數據

格式化數據 好比 LibSVN ,監督學習問題通常使用LabeledVector類來表示(Label,Features)安全

  • 下載UCI ML倉庫數據集,該數據集「包含了一項關於乳腺癌手術患者存活率的研究的案例」,數據格式以下所示:
30,64,1,1
30,62,3,1
30,65,0,2
31,59,2,1

前三列表明特徵,最後一列表明分類,the 4th column indicates whether the patient survived 5 years or longer (label 1), or died within 5 years (label 2)bash

import org.apache.flink.api.scala._

val env = ExecutionEnvironment.getExecutionEnvironment

val survival = env.readCsvFile[(String, String, String, String)]("/path/to/haberman.data")

數據轉換爲DataSet[LabeledVector],用FlinkML分類算法,第四列是分類標籤,構建LabeledVector元素以下框架

import org.apache.flink.ml.common.LabeledVector
import org.apache.flink.ml.math.DenseVector

val survivalLV = survival
  .map{tuple =>
    val list = tuple.productIterator.toList
    val numList = list.map(_.asInstanceOf[String].toDouble)
    LabeledVector(numList(3), DenseVector(numList.take(3).toArray))
  }

後咱們能夠用這些數據來訓練學習者。不過,咱們將使用另外一個數據集來舉例說明如何構建學習者;如何導入其餘數據集格式,舉例以下:
LibSVM 文件
ML數據集的一種常見格式是LibSVM格式。FlinkML經過MLUtils對象提供的readLibSVM函數,提供了使用LibSVM格式加載數據集的實用程序。您還可使用writeLibSVM函數以LibSVM格式保存數據集。
讓咱們導入svmguide1數據集。你能夠在這裏下載訓練集和測試集。這是一個天文粒子二分類數據集,由Hsu等人在他們的實用支持向量機(SVM)指南中使用。它包含4個數值特徵,以及類標籤。
導入數據集合以下:



dom

import org.apache.flink.ml.MLUtils

val astroTrainLibSVM: DataSet[LabeledVector] = MLUtils.readLibSVM(env, "/path/to/svmguide1")
val astroTestLibSVM: DataSet[LabeledVector] = MLUtils.readLibSVM(env, "/path/to/svmguide1.t")

2. 數據分類

導入訓練和測試數據集後,爲分類作好準備。因爲Flink SVM只支持閾值爲+1.0和-1.0的二進制值,加載LibSVM數據集後須要進行轉換,由於它使用了1和0進行標記。
轉換能夠用一個簡單的歸一化映射函數:

import org.apache.flink.ml.math.Vector

def normalizer : LabeledVector => LabeledVector = { 
    lv => LabeledVector(if (lv.label > 0.0) 1.0 else -1.0, lv.vector)
}
val astroTrain: DataSet[LabeledVector] = astroTrainLibSVM.map(normalizer)
val astroTest: DataSet[(Vector, Double)] = astroTestLibSVM.map(normalizer).map(x => (x.vector, x.label))

換了數據集後,開始訓練一個預測器,如線性SVM分類器。咱們能夠爲分類器設置一些參數。在這裏,咱們設置Blocks參數,它被使用的底層CoCoA算法用於分割輸入。正則化參數決定了l2正則化的應用數量,用於避免過擬合。步驟大小決定權重向量更新到下一個權重向量值的貢獻。此參數設置初始步長。

import org.apache.flink.ml.classification.SVM

val svm = SVM()
  .setBlocks(env.getParallelism)
  .setIterations(100)
  .setRegularization(0.001)
  .setStepsize(0.1)
  .setSeed(42)

svm.fit(astroTrain)

在咱們能夠對測試集進行預測,並使用evaluate函數建立(真值、預測)對。

val evaluationPairs: DataSet[(Double, Double)] = svm.evaluate(astroTest)

接下來,咱們將瞭解如何預處理數據,並使用FlinkML的ML管道功能。

3. 數據預處理和管道(pipelines)

當使用SVM分類時,一般增益的預處理步驟是將輸入特徵縮放到[0,1]範圍,以免極值特徵占主導地位。FlinkML有許多轉換器(如用於預處理數據的MinMaxScaler),其中一個關鍵特性是可以將轉換器和預測器連接在一塊兒。這容許咱們運行相同的轉換管道,並以一種直接且類型安全的方式對火車和測試數據進行預測。您能夠在pipeline文檔中閱讀更多關於FlinkML管道系統的信息。
首先爲數據集中的特徵建立一個歸一化轉換器,並將其連接到一個新的SVM分類器。

import org.apache.flink.ml.preprocessing.MinMaxScaler

val scaler = MinMaxScaler()

val scaledSVM = scaler.chainPredictor(svm)

如今,咱們可使用新建立的管道對測試集進行預測。
首先,咱們再次調用fit,來訓練定標器和SVM分類器。而後將測試集的數據自動縮放,而後將其傳遞給SVM進行預測。

scaledSVM.fit(astroTrain)

val evaluationPairsScaled: DataSet[(Double, Double)] = scaledSVM.evaluate(astroTest)

按比例的輸入應該能給咱們更好的預測性能。

FlinkML文檔


FlinkML算法二-交替最小二乘法(Alternating Least Squares)

交替最小二乘法(ALS)算法將一個給定的R矩陣因式分解爲 U 和 V 兩個因子,例如 R≈UTV。 未知的行的維度被用做算法的參數,叫作潛在因子。 因爲矩陣因式分解能夠用在推薦系統的場景,U和V矩陣能夠分別稱爲用戶和商品矩陣。 用戶矩陣的第i列用ui表示,商品矩陣的第i列用vi表示。 R 矩陣稱爲評價矩陣能夠用(R)i,j=ri,j 表示。 爲了找到用戶和商品矩陣,以下問題獲得瞭解決:
argminarg minargmin
argminU,V∑{i,j∣ri,j≠0}(ri,j−uTivj)2+λ(∑inui∥ui∥2+∑jnvj∥vj∥2)
λ做爲正則化因子,nui做爲用戶i評過度的商品數量, nvj做爲商品j被評分的次數。 這個因式分解方案避免了稱做加權λ​因式分解的過擬合。


經過修復U 和 V矩陣,咱們得到能夠直接解析的二次形式。 問題的解決辦法是保證總消耗函數的單調遞減。經過對U 或 V矩陣的這一步操做,咱們逐步的改進了矩陣的因式分解。

R 矩陣做爲 (i,j,r) 元組的疏鬆表示。i 爲行索引,j 爲列索引,r 爲 (i,j) 位置上的矩陣值。

操做

ALS 是一個預測模型(Predictor)。 所以,它支持擬合(fit)與預測(predict)兩種操做。

擬合

ALS用於評價矩陣的疏鬆表示過程的訓練:
fit: DataSet[(Int, Int, Double)] => Unit

預測

ALS會對每一個元組行列的全部索引進行評分預測:
predict: DataSet[(Int, Int)] => DataSet[(Int, Int, Double)]

參數

ALS的實現能夠經過下面的參數進行控制:

參數 描述
NumFactors 底層模型中使用的潛在因子數目。等價於計算用戶和商品向量的維度。 (默認值:10)
Lambda 因式分解的因子。 該值用於避免過擬合或者因爲強生成致使的低性能。 (默認值:1)
Iterations 最大迭代次數。 (默認值:10)
Blocks 設定用戶和商品矩陣被分組後的塊數量。塊越少,發送的冗餘數據越少。然而,塊越大意味着堆中須要存儲的更新消息越大。若是因爲OOM致使算法失敗,試着下降塊的數量。 (默認值:None)
Seed 用於算法生成初始矩陣的隨機種子。 (默認值:0)
TemporaryPath 致使結果被當即存儲到臨時目錄的路徑。若是該值被設定,算法會被分爲兩個預處理階段,ALS迭代和計算最後ALS半階段的處理中階段。預處理階段計算給定評分矩陣的OutBlockInformation和InBlockInformation。每步的結果存儲在特定的目錄。經過將算法分爲更多小的步驟,Flink不會在多個算子中分割可用的內存。這讓系統能夠處理更大的獨立消息並提高總性能。 (默認值:None)

例子

// 從CSV文件讀取輸入數據集
val inputDS: DataSet[(Int, Int, Double)] = env.readCsvFile[(Int, Int, Double)](
  pathToTrainingFile)

// 設定ALS學習器
val als = ALS()
.setIterations(10)
.setNumFactors(10)
.setBlocks(100)
.setTemporaryPath("hdfs://tempPath")

// 經過一個map參數設置其餘參數
val parameters = ParameterMap()
.add(ALS.Lambda, 0.9)
.add(ALS.Seed, 42L)

// 計算因式分解
als.fit(inputDS, parameters)

// 從CSV文件讀取測試數據
val testingDS: DataSet[(Int, Int)] = env.readCsvFile[(Int, Int)](pathToData)

// 經過矩陣因式分解計算評分
val predictedRatings = als.predict(testingDS)
發佈了89 篇原創文章 · 獲贊 155 · 訪問量 74萬+
相關文章
相關標籤/搜索