Alink漫談(二) : 從源碼看機器學習平臺Alink設計和架構

Alink漫談(二) : 從源碼看機器學習平臺Alink設計和架構

0x00 摘要

Alink 是阿里巴巴基於實時計算引擎 Flink 研發的新一代機器學習算法平臺,是業界首個同時支持批式算法、流式算法的機器學習平臺。本文是漫談系列的第二篇,將從源碼入手,帶領你們具體剖析Alink設計思想和架構爲什麼。html

由於Alink的公開資料太少,因此均爲自行揣測,確定會有疏漏錯誤,但願你們指出,我會隨時更新。java

0x01 Alink設計原則

前文中 Alink漫談(一) : 從KMeans算法實現看Alink設計思想 咱們推測總結出Alink部分設計原則git

  • 算法的歸算法,Flink的歸Flink,儘可能屏蔽AI算法和Flink之間的聯繫。程序員

  • 採用最簡單,最多見的開發語言和思惟方式。算法

  • 儘可能借鑑市面上通用的機器學習設計思路和開發模式,讓開發者無縫切換。sql

  • 構建一套戰術打法(middleware或adapter),即屏蔽了Flink,又能夠利用好Flink,還能讓用戶快速開發算法。apache

下面咱們就針對這些設計原則,從上至下看看Alink如何設計本身這套戰術打法。編程

爲了能讓你們更好理解,先整理一個概要圖。由於Alink系統主要能夠分紅三個層面(頂層流水線, 中間層算法組件, 底層迭代計算框架),再加上一個Flink runtime,因此下圖就是分別從這四個層面出發來看程序執行流程。api

如何看待 pipeline.fit(data).transform(data).print();

// 從頂層流水線角度看
訓練流水線  +-----> [VectorAssembler(Transformer)] -----> [KMeans(Estimator)]
          |      // KMeans.fit以後,會生成一個KMeansModel用來轉換
          |      
轉換流水線  +-----> [VectorAssembler(Transformer)] -----> [KMeansModel(Transformer)]


// 從中間層算法組件角度看    
訓練算法組件 +-----> [MapBatchOp] -----> [KMeansTrainBatchOp]  
           |       // VectorAssemblerMapper in MapBatchOp 是業務邏輯
           |      
轉換算法組件 +-----> [MapBatchOp] -----> [ModelMapBatchOp]
                   // VectorAssemblerMapper in MapBatchOp 是業務邏輯
                   // KMeansModelMapper in ModelMapBatchOp 是業務邏輯
 
  
// 從底層迭代計算框架角度看
訓練by框架 +-----> [VectorAssemblerMapper] -----> [KMeansPreallocateCentroid / KMeansAssignCluster / AllReduce / KMeansUpdateCentroids in IterativeComQueue]   
          |       // 映射到Flink的各類算子進行訓練
          |      
轉換(直接) +-----> [VectorAssemblerMapper] -----> [KMeansModelMapper]    
                  // 映射到Flink的各類算子進行轉換 
  
// 從Flink runtime角度看  
訓練 +-----> map, mapPartiiton...
    |       // VectorAssemblerMapper.map等會被調用
    |      
轉換 +-----> map, mapPartiiton...
            // 好比調用 KMeansModelMapper.map 來轉換

0x02 Alink實例代碼

示例代碼仍是用以前的KMeans算法部分模塊。數組

算法調用

public class KMeansExample {
	    public static void main(String[] args) throws Exception {
        ......

        BatchOperator data = new CsvSourceBatchOp().setFilePath(URL).setSchemaStr(SCHEMA_STR);

        VectorAssembler va = new VectorAssembler()
            .setSelectedCols(new String[]{"sepal_length", "sepal_width", "petal_length", "petal_width"})
            .setOutputCol("features");

        KMeans kMeans = new KMeans().setVectorCol("features").setK(3)
            .setPredictionCol("prediction_result")
            .setPredictionDetailCol("prediction_detail")
            .setReservedCols("category")
            .setMaxIter(100);

        Pipeline pipeline = new Pipeline().add(va).add(kMeans);
        pipeline.fit(data).transform(data).print();
    }
}

算法主函數

public final class KMeansTrainBatchOp extends BatchOperator <KMeansTrainBatchOp>
	implements KMeansTrainParams <KMeansTrainBatchOp> {

	static DataSet <Row> iterateICQ(...省略...) {

		return new IterativeComQueue()
			.initWithPartitionedData(TRAIN_DATA, data)
			.initWithBroadcastData(INIT_CENTROID, initCentroid)
			.initWithBroadcastData(KMEANS_STATISTICS, statistics)
			.add(new KMeansPreallocateCentroid())
			.add(new KMeansAssignCluster(distance))
			.add(new AllReduce(CENTROID_ALL_REDUCE))
			.add(new KMeansUpdateCentroids(distance))
			.setCompareCriterionOfNode0(new KMeansIterTermination(distance, tol))
			.closeWith(new KMeansOutputModel(distanceType, vectorColName, latitudeColName, longitudeColName))
			.setMaxIter(maxIter)
			.exec();
	}
}

算法模塊舉例

基於點計數和座標,計算新的聚類中心。

// Update the centroids based on the sum of points and point number belonging to the same cluster.
public class KMeansUpdateCentroids extends ComputeFunction {
    @Override
    public void calc(ComContext context) {

        Integer vectorSize = context.getObj(KMeansTrainBatchOp.VECTOR_SIZE);
        Integer k = context.getObj(KMeansTrainBatchOp.K);
        double[] sumMatrixData = context.getObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE);

        Tuple2<Integer, FastDistanceMatrixData> stepNumCentroids;
        if (context.getStepNo() % 2 == 0) {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
        } else {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
        }

        stepNumCentroids.f0 = context.getStepNo();
        context.putObj(KMeansTrainBatchOp.K,
            updateCentroids(stepNumCentroids.f1, k, vectorSize, sumMatrixData, distance));
    }
}

0x03 頂層 -- 流水線

本部分實現的設計原則是 :儘可能借鑑市面上通用的設計思路和開發模式,讓開發者無縫切換。

1. 機器學習重要概念

一個典型的機器學習過程從數據收集開始,要經歷多個步驟,才能獲得須要的輸出。這很是相似於流水線式工做,即一般會包含源數據ETL(抽取、轉化、加載),數據預處理,指標提取,模型訓練與交叉驗證,新數據預測等步驟。

先來講一下幾個重要的概念:

  • Transformer:轉換器,是一種能夠將一個數據轉換爲另外一個數據的算法。好比一個模型就是一個 Transformer。它能夠把一個不包含轉換標籤的測試數據集 打上標籤,轉化成另外一個包含轉換標籤的特徵數據。Transformer能夠理解爲特徵工程,即:特徵標準化、特徵正則化、特徵離散化、特徵平滑、onehot編碼等。該類型有一個transform方法,用於fit數據以後,輸入新的數據,進行特徵變換。
  • Estimator:評估器,它是學習算法或在訓練數據上的訓練方法的概念抽象。全部的機器學習算法模型,都被稱爲估計器。在 Pipeline 裏一般是被用來操做 數據並生產一個 Transformer。從技術上講,Estimator實現了一個方法fit(),它接受一個特徵數據併產生一個轉換器。好比一個隨機森林算法就是一個 Estimator,它能夠調用fit(),經過訓練特徵數據而獲得一個隨機森林模型。
  • PipeLine:工做流或者管道。工做流將多個工做流階段(轉換器和估計器)鏈接在一塊兒,造成機器學習的工做流,並得到結果輸出。
  • Parameter:Parameter 被用來設置 Transformer 或者 Estimator 的參數。

2. Alink中概念實現

從 Alink的目錄結構中 ,咱們能夠看出,Alink提供了這些常見概念(其中有些代碼借鑑了Flink ML)。

./java/com/alibaba/alink:
common		operator	params		pipeline
  
./java/com/alibaba/alink/params:
associationrule	evaluation	nlp		regression	statistics
classification	feature		onlinelearning	shared		tuning
clustering	io		outlier		similarity	udf
dataproc	mapper		recommendation	sql		validators

./java/com/alibaba/alink/pipeline:
EstimatorBase.java	ModelBase.java		Trainer.java		feature
LocalPredictable.java	ModelExporterUtils.java	TransformerBase.java	nlp
LocalPredictor.java	Pipeline.java		classification		recommendation
MapModel.java		PipelineModel.java	clustering		regression
MapTransformer.java	PipelineStageBase.java	dataproc		tuning

比較基礎的是三個接口:PipelineStages,Transformer,Estimator,分別剛好對應了機器學習的兩個通用概念 :轉換器 ,評估器。PipelineStages是這兩個的基礎接口。

// Base class for a stage in a pipeline. The interface is only a concept, and does not have any actual functionality. Its subclasses must be either Estimator or Transformer. No other classes should inherit this interface directly.
public interface PipelineStage<T extends PipelineStage<T>> extends WithParams<T>, Serializable 

// A transformer is a PipelineStage that transforms an input Table to a result Table.
public interface Transformer<T extends Transformer<T>> extends PipelineStage<T> 
 
// Estimators are PipelineStages responsible for training and generating machine learning models.
public interface Estimator<E extends Estimator<E, M>, M extends Model<M>> extends PipelineStage<E>

其次是三個抽象類定義:PipelineStageBase,EstimatorBase,TransformerBase,分別就對應了以上的三個接口。其中定義了一些基礎操做,好比 fit,transform。

// The base class for a stage in a pipeline, either an EstimatorBase or a TransformerBase.
public abstract class PipelineStageBase<S extends PipelineStageBase<S>>
    implements WithParams<S>, HasMLEnvironmentId<S>, Cloneable 
  
// The base class for estimator implementations.
public abstract class EstimatorBase<E extends EstimatorBase<E, M>, M extends ModelBase<M>>
    extends PipelineStageBase<E> implements Estimator<E, M>   
  
// The base class for transformer implementations.
public abstract class TransformerBase<T extends TransformerBase<T>>
    extends PipelineStageBase<T> implements Transformer<T>

而後是Pipeline基礎類,這個類就能夠把Transformer,Estimator聯繫起來 。

// A pipeline is a linear workflow which chains EstimatorBases and TransformerBases to execute an algorithm 
public class Pipeline extends EstimatorBase<Pipeline, PipelineModel> {
	private ArrayList<PipelineStageBase> stages = new ArrayList<>();
  
  	public Pipeline add(PipelineStageBase stage) {
		this.stages.add(stage);
		return this;
	}
}

最後是 Parameter 概念相關舉例,好比實例中用到的 VectorAssemblerParams。

// Parameters for MISOMapper.
public interface MISOMapperParams<T> extends HasSelectedCols <T>,  HasOutputCol <T>,
	HasReservedCols <T> {}

// parameters of vector assembler.
public interface VectorAssemblerParams<T> extends MISOMapperParams<T> {
ParamInfo <String> HANDLE_INVALID = ParamInfoFactory
		.createParamInfo("handleInvalid", String.class)
		.setDescription("parameter for how to handle invalid data (NULL values)")
		.setHasDefaultValue("error")
		.build();
}

綜合來講,由於模型和數據,在Alink運行時候,都統一轉化爲Table類型,因此能夠整理以下:

  • Transformer: 將input table轉換爲output table。
  • Estimator:將input table轉換爲模型。
  • 模型:將input table轉換爲output table。

3. 結合實例看流水線

首先是一些基礎抽象類,好比:

  • MapTransformer是 flat map 的Transformer。
  • ModelBase是模型定義,也是一個Transformer。
  • Trainer是訓練模型定義,是EstimatorBase。
// Abstract class for a flat map TransformerBase. 
public abstract class MapTransformer<T extends MapTransformer <T>>
		extends TransformerBase<T> implements LocalPredictable {  

// The base class for a machine learning model.
public abstract class ModelBase<M extends ModelBase<M>> extends TransformerBase<M>
    implements Model<M> 

// Abstract class for a trainer that train a machine learning model.
public abstract class Trainer<T extends Trainer <T, M>, M extends ModelBase<M>>
	extends EstimatorBase<T, M>

而後就是咱們實例用到的兩個類型定義。

  • KMeans 是一個Trainer,其實現了EstimatorBase;
  • VectorAssembler 是一個TransformerBase。
// 這是一個 EstimatorBase 類型
public class KMeans extends Trainer <KMeans, KMeansModel> implements
	KMeansTrainParams <KMeans>, KMeansPredictParams <KMeans> {
	@Override
	protected BatchOperator train(BatchOperator in) {
		return new KMeansTrainBatchOp(this.getParams()).linkFrom(in);
	}
}

// 這是一個 TransformerBase 類型
public class VectorAssembler extends MapTransformer<VectorAssembler>
	implements VectorAssemblerParams <VectorAssembler> {
	public VectorAssembler(Params params) {
		super(VectorAssemblerMapper::new, params);
	}
}

實例中,分別構建了兩個流水線階段,而後這兩個實例就被連接到流水線上。

VectorAssembler va = new VectorAssembler()
KMeans kMeans = new KMeans()  
Pipeline pipeline = new Pipeline().add(va).add(kMeans);

// 能看出來,流水線上有兩個階段,分別是VectorAssembler和KMeans。

pipeline = {Pipeline@1201} 
 stages = {ArrayList@2853}  size = 2
   
  0 = {VectorAssembler@1199} 
   mapperBuilder = {VectorAssembler$lambda@2859} 
   params = {Params@2860} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}"
     
  1 = {KMeans@1200} 
   params = {Params@2857} "Params {vectorCol="features", maxIter=100, reservedCols=["category"], k=3, predictionCol="prediction_result", predictionDetailCol="prediction_detail"}"

0x04 中間層 -- 算法組件

算法組件是中間層的概念,能夠認爲是真正實現算法的模塊/層次。主要做用是承上啓下。

  • 其上層是流水線各個階段,流水線的生成結果就是一個算法組件。算法組件的做用是把流水線的Estimator或者Transformer翻譯成具體算法。算法組件彼此是經過 linkFrom 串聯在一塊兒
  • 其下層是"迭代計算框架",算法組件把具體算法邏輯中的計算/通訊分紅一個個小模塊,映射到Mapper Function 或者具體"迭代計算框架"的計算/通訊 Function 上,這樣才能更好的利用Flink的各類優點。
  • "迭代計算框架" 中,主要兩個部分是 Mapper Function 和 計算/通訊 Function,其在代碼中分別對應Mapper,ComQueueItem。
  • Mapper Function 是映射Function(系統寫好了部分Mapper,用戶也能夠根據算法來寫本身的Mapper);
  • 計算/通訊 Function是專門爲算法寫的專用Function(也分紅 系統內置的,算法自定義的)。
  • 能夠這麼理解:各類Function是業務邏輯(組件)。算法組件只是提供運行規則,業務邏輯(組件)做爲運行在算法組件上的插件。
  • 也能夠這麼理解 :算法組件就是框架,其把部分業務邏輯委託給Mapper或者ComQueueItem。

好比

  • KMeans 是 Estimator,其對應算法組件是 KMeansTrainBatchOp。其業務邏輯(組件)也在這個類中,是由IterativeComQueue爲基礎串聯起來的一系列算法類(ComQueueItem)。
  • VectorAssembler 是 Transformer,其對應算法組件是 MapBatchOp。其業務邏輯(組件)是VectorAssemblerMapper(其 map 函數會作業務邏輯,把將多個數值列按順序彙總成一個向量列)。
public final class KMeansTrainBatchOp extends BatchOperator <KMeansTrainBatchOp>   implements KMeansTrainParams <KMeansTrainBatchOp> 
    
// class for a flat map BatchOperator.
public class MapBatchOp<T extends MapBatchOp<T>> extends BatchOperator<T>

不管是調用Estimator.fit 仍是 Transformer.transform,其本質都是經過linkFrom函數,把各個Operator聯繫起來,這樣就把數據流串聯起來。而後就能夠逐步映射到Flink具體運行計劃上

1. Algorithm operators

AlgoOperator是算子組件的基類,其子類有BatchOperator和StreamOperator,分別對應了批處理和流處理。

// Base class for algorithm operators.
public abstract class AlgoOperator<T extends AlgoOperator<T>>
    implements WithParams<T>, HasMLEnvironmentId<T>, Serializable 

// Base class of batch algorithm operators.
public abstract class BatchOperator<T extends BatchOperator <T>> extends AlgoOperator <T> {
    // Link this object to BatchOperator using the BatchOperators as its input.
  	public abstract T linkFrom(BatchOperator <?>... inputs);
  
    public <B extends BatchOperator <?>> B linkTo(B next) {
      return link(next);
    }
    public BatchOperator print() throws Exception {
      return linkTo(new PrintBatchOp().setMLEnvironmentId(getMLEnvironmentId()));
    }  
}

public abstract class StreamOperator<T extends StreamOperator <T>> extends AlgoOperator <T>

示例代碼以下:

// 輸入csv文件被轉化爲一個BatchOperator
BatchOperator data = new CsvSourceBatchOp().setFilePath(URL).setSchemaStr(SCHEMA_STR);

...

pipeline.fit(data).transform(data).print();

2. Mapper(提早說明)

Mapper是底層迭代計算框架的一部分,是業務邏輯(組件)。從目錄結構能看出。這裏提早說明,是由於在流水線講解過程當中大量涉及,因此就提早放在這裏說明

./java/com/alibaba/alink/common
linalg mapper model comqueue utils io

Mapper的幾個主要類定義以下,其做用普遍,便可以映射輸入到輸出,也能夠映射模型到具體數值

// Abstract class for mappers.
public abstract class Mapper implements Serializable {}

// Abstract class for mappers with model.
public abstract class ModelMapper extends Mapper {}

// Find  the closest cluster center for every point.
public class KMeansModelMapper extends ModelMapper {}

// Mapper with Multi-Input columns and Single Output column(MISO).
public abstract class MISOMapper extends Mapper {}

// This mapper maps many columns to one vector. the columns should be vector or numerical columns.
public class VectorAssemblerMapper extends MISOMapper {}

Mapper的業務邏輯依賴於算法組件來運做,好比 [ VectorAssemblerMapper in MapBatchOp ] ,[ KMeansModelMapper in ModelMapBatchOp ]。

ModelMapper具體運行則須要依賴 ModelMapperAdapter 來和Flink runtime聯繫起來。ModelMapperAdapter繼承了RichMapFunction,ModelMapper做爲其成員變量,在map操做中執行業務邏輯,ModelSource則是數據來源

對應本實例,KMeansModelMapper 就是最後轉換的 BatchOperator,其map函數用來轉換

3. 系統內置算法組件

系統內置了一些經常使用的算法組件,好比:

  • MapBatchOp 功能是基於輸入來flat map,是 VectorAssembler 返回的算法組件。
  • ModelMapBatchOp 功能是基於模型進行flat map,是 KMeans 返回的算法組件。

以 ModelMapBatchOp 爲例給你們說明其做用,從下面代碼註釋中能夠看出,linkFrom做用是:

  • 把inputs"算法組件" 和 自己"算法組件" 聯繫起來,這就造成了一個算法邏輯鏈
  • 把業務邏輯映射成 "Flink算子",這就造成了一個 "Flink算子鏈"
public class ModelMapBatchOp<T extends ModelMapBatchOp<T>> extends BatchOperator<T> {
	@Override
	public T linkFrom(BatchOperator<?>... inputs) {
		checkOpSize(2, inputs);

		try {
			BroadcastVariableModelSource modelSource = new BroadcastVariableModelSource(BROADCAST_MODEL_TABLE_NAME);
      // mapper是映射函數
			ModelMapper mapper = this.mapperBuilder.apply(
					inputs[0].getSchema(),
					inputs[1].getSchema(),
					this.getParams());
      // modelRows 是模型
			DataSet<Row> modelRows = inputs[0].getDataSet().rebalance();
      // resultRows 是輸入數據的映射變化
			DataSet<Row> resultRows = inputs[1].getDataSet()
					.map(new ModelMapperAdapter(mapper, modelSource))
           // 把模型做爲廣播變量,後續會在 ModelMapperAdapter 中使用
					.withBroadcastSet(modelRows, BROADCAST_MODEL_TABLE_NAME);

			TableSchema outputSchema = mapper.getOutputSchema();
			this.setOutput(resultRows, outputSchema);
			return (T) this;
		} catch (Exception ex) {
			throw new RuntimeException(ex);
		}
	}
}

ModelMapperAdapter

ModelMapperAdapter 是適配器的實現,用來在flink上運行業務邏輯Mapper。從代碼能夠看出,ModelMapperAdapter取出以前存儲的mapper和模型數據,而後基於此來進行具體算法業務。

/**
 * Adapt a {@link ModelMapper} to run within flink.

 * This adapter class hold the target {@link ModelMapper} and it's {@link ModelSource}. Upon open(), it will load model rows from {@link ModelSource} into {@link ModelMapper}.
 */
public class ModelMapperAdapter extends RichMapFunction<Row, Row> implements Serializable {

    /**
     * The ModelMapper to adapt.
     */
    private final ModelMapper mapper;

    /**
     * Load model data from ModelSource when open().
     */
    private final ModelSource modelSource;

    public ModelMapperAdapter(ModelMapper mapper, ModelSource modelSource) {
        // mapper是業務邏輯,modelSource是模型Broadcast source
        this.mapper = mapper; // 在map操做中執行業務邏輯
        this.modelSource = modelSource; // 數據來源
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        // 從廣播變量中獲取模型數據
        List<Row> modelRows = this.modelSource.getModelRows(getRuntimeContext());
        this.mapper.loadModel(modelRows);
    }

    @Override
    public Row map(Row row) throws Exception {
        // 執行業務邏輯,在數據來源上轉換
        return this.mapper.map(row);
    }
}

4. 訓練階段fit

pipeline.fit(data) 之中,會沿着流水線依次執行。若是流水線下一個階段遇到了是Transformer,就調用其transform;若是遇到的是EstimatorBase,就先調用其fit,把EstimatorBase轉換爲Transformer,而後再次調用這個轉換出來的Transformer.transform。就這樣一個一個階段執行。

4.1 具體流水線處理

  1. 若是流水線下一階段遇到EstimatorBase,會處理EstimatorBase的fit,把流水線上的Estimator轉換爲 TransformerBase。Estimator.fit 接受一個特徵數據併產生一個轉換器。

    若是這個階段 不是 流水線最後一個階段)會對這個 TransformerBase繼續處理。處理以後才能進入到流水線下一階段。

    若是這個階段 是 流水線最後一個階段)不會對這個 TransformerBase 作處理,直接結束流水線 fit 操做。

  2. 若是流水線下一階段遇到TransformerBase,就直接調用其transform函數。

  3. 對於全部須要處理的TransformerBase,不管是從EstimatorBase轉換出來的,仍是Pipeline原有的 ,都調用其transform函數,轉換其input。input = transformers[i].transform(input); 。這樣每次轉換後的輸出再次賦值給input,做爲流水線下一階段的輸入。

  4. 最後獲得一個PipelineModel (其自己也是一個Transformer) ,這個屬於下一階段轉換流水線

4.2 結合本實例概述

本實例有兩個stage。VectorAssembler是Transformer,KMeans是EstimatorBase。

這時候Pipeline其內部變量是:

this = {Pipeline@1195} 
 stages = {ArrayList@2851}  size = 2
     
  0 = {VectorAssembler@1198} 
   mapperBuilder = {VectorAssembler$lambda@2857} 
   params = {Params@2858} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}"
       
  1 = {KMeans@2856} 
   params = {Params@2860} "Params {vectorCol="features", maxIter=100, reservedCols=["category"], k=3, predictionCol="prediction_result", predictionDetailCol="prediction_detail"}"
    params = {HashMap@2862}  size = 6
  • Pipeline 先調用Transformer類型的VectorAssembler,來處理其input (就是csv的BatchOperator)。這個處理csv是經過linkFrom(input)來構建的。處理以後再包裝成一個MapBatchOp返回賦值給input。
  • 其次調用EstimatorBase類型的Kmeans.fit函數,對input (就是 VectorAssembler 返回的MapBatchOp) 進行fit。fit過程當中調用了KMeansTrainBatchOp.linkFrom來設置,fit生成了一個KMeansModel (Transformer)。由於這時候已是流水線最後一步,因此不作後續的KMeansModel.transform操做。KMeansModel 就是訓練出來的判斷模型
  • 在上述調用過程當中,會在transformers數組中記錄運算過的TransformerBase和EstimatorBase適配出來的Transformer。
  • 最後以這個transformers數組爲參數,生成一個 PipelineModel (其也是一個Transformer類型)。生成 PipelineModel 的目的是:PipelineModel是後續轉換中的新流水線

PipelineMode 的新流水線處理流程是:從 csv 讀入/ 映射(VectorAssembler 處理),而後 KMeansModel 作轉換(下一節會具體介紹)。

fit 具體代碼是

public class Pipeline extends EstimatorBase<Pipeline, PipelineModel> {
  
    // Train the pipeline with batch data.
  	public PipelineModel fit(BatchOperator input) {
      
      int lastEstimatorIdx = getIndexOfLastEstimator();
      TransformerBase[] transformers = new TransformerBase[stages.size()];
      for (int i = 0; i < stages.size(); i++) {
        PipelineStageBase stage = stages.get(i);
        if (i <= lastEstimatorIdx) {
          if (stage instanceof EstimatorBase) {
            // 這裏會把流水線上的具體 Algorithm operators 經過 linkFrom 函數串聯起來。
            transformers[i] = ((EstimatorBase) stage).fit(input); 
          } else if (stage instanceof TransformerBase) {
            transformers[i] = (TransformerBase) stage;
          }
          // 注意,若是是流水線最後一個階段,則不作transform處理。
          if (i < lastEstimatorIdx) {
            // 這裏會調用到具體Transformer的transform函數,其會把流水線上的具體 Algorithm operators 經過 linkFrom 函數串聯起來。
            input = transformers[i].transform(input);
          }
        } else {
          transformers[i] = (TransformerBase) stage;
        }
      }
      // 這裏生成了一個PipelineModel,transformers會做爲參數傳給他
      return new PipelineModel(transformers).setMLEnvironmentId(input.getMLEnvironmentId());   
    }
}

// MapTransformer是VectorAssembler的基類。transform會生成一個MapBatchOp,而後再調用MapBatchOp.linkFrom。
public abstract class MapTransformer<T extends MapTransformer <T>>
		extends TransformerBase<T> implements LocalPredictable {
	@Override
	public BatchOperator transform(BatchOperator input) {
		return new MapBatchOp(this.mapperBuilder, this.params).linkFrom(input);
	}
}

// Trainer是KMeans的基類。
public abstract class Trainer<T extends Trainer <T, M>, M extends ModelBase<M>>
	@Override
	public M fit(BatchOperator input) {
    // KMeans.train 會調用 KMeansTrainBatchOp(this.getParams()).linkFrom(in);
    // createModel會生成一個新的model,本示例中是 com.alibaba.alink.pipeline.clustering.KMeansModel
  		return createModel(train(input).getOutputTable()); 
	}
}

下面會逐一論述這兩個環節。

4.3 VectorAssembler.transform

這部分做用是把csv數據轉化爲KMeans訓練所須要的數據類型。

VectorAssembler.transform會調用到MapBatchOp.linkFrom。linkFrom首先把 csv input 進行了轉換,變成DataSet ,而後以此爲參數生成一個MapBatchOp返回,這個返回的 MapBatchOp。其業務邏輯是在 VectorAssemblerMapper 中實現的(將多個數值列按順序彙總成一個向量列)。

public class MapBatchOp<T extends MapBatchOp<T>> extends BatchOperator<T> {
    public T linkFrom(BatchOperator<?>... inputs) {
        BatchOperator in = checkAndGetFirst(inputs);

        try {
            Mapper mapper = (Mapper)this.mapperBuilder.apply(in.getSchema(), this.getParams());
            // 這裏對csv輸入進行了map,這裏只是生成邏輯執行計劃,具體操做會在print以後才作的。
            DataSet<Row> resultRows = in.getDataSet().map(new MapperAdapter(mapper));
            TableSchema resultSchema = mapper.getOutputSchema();
            this.setOutput(resultRows, resultSchema);
            return this;
        } catch (Exception var6) {
            throw new RuntimeException(var6);
        }
    }    
}

// MapBatchOp自己
this = {MapBatchOp@3748} "UnnamedTable$1"
 mapperBuilder = {VectorAssembler$lambda@3744} 
 params = {Params@3754} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}"
 output = {TableImpl@5862} "UnnamedTable$1"
 sideOutputs = null
     
// mapper就是業務邏輯模塊
mapper = {VectorAssemblerMapper@5785} 
 handleInvalid = {VectorAssemblerMapper$HandleType@5813} "ERROR"
 outputColsHelper = {OutputColsHelper@5814} 
 colIndices = {int[4]@5815} 
 dataFieldNames = {String[5]@5816} 
 dataFieldTypes = {DataType[5]@5817} 
 params = {Params@5818} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}"
     
// 返回數值以下
resultRows = {MapOperator@5788} 
 function = {MapperAdapter@5826} 
  mapper = {VectorAssemblerMapper@5785} 
 defaultName = "linkFrom(MapBatchOp.java:35)"      
     
// 調用棧以下

linkFrom:31, MapBatchOp (com.alibaba.alink.operator.batch.utils)
transform:34, MapTransformer (com.alibaba.alink.pipeline)
fit:122, Pipeline (com.alibaba.alink.pipeline)
main:31, KMeansExample (com.alibaba.alink)

4.4 KMeans.fit

這部分就是訓練模型

KMeans是一個Trainer,其進而實現了EstimatorBase類型,因此流水線就調用到了其fit函數

KMeans.fit就是調用了Trainer.fit。

  • Trainer.fit首先調用train函數,最終調用KMeansTrainBatchOp.linkFrom,這樣就和VectorAssembler串聯起來。KMeansTrainBatchOp 把VectorAssembler返回的 MapBatchOp進行處理。最後返回一個一樣類型KMeansTrainBatchOp。
  • Trainer.fit其次調用Trainer.createModel,該函數會根據this的類型決定應該生成什麼Model。對於 KMeans,就生成了KMeansModel。

由於KMeans是流水線最後一個階段,這時候不調用 input = transformers[i].transform(input); 因此目前仍是訓練,生成一個模型 KMeansModel。

// 實際部分代碼    

Trainer.fit(BatchOperator input) {
		return createModel(train(input).getOutputTable());
}
  
public final class KMeansTrainBatchOp extends BatchOperator <KMeansTrainBatchOp>
	implements KMeansTrainParams <KMeansTrainBatchOp> {	
    
    	public KMeansTrainBatchOp linkFrom(BatchOperator <?>... inputs) {
            DataSet <Row> finalCentroid = iterateICQ(initCentroid, data,
                vectorSize, maxIter, tol, distance, distanceType, vectorColName, null, null);
            this.setOutput(finalCentroid, new KMeansModelDataConverter().getModelSchema());
            return this;            
        }
}

// 變量內容
			
this = {KMeansTrainBatchOp@5887} 
 params = {Params@5895} "Params {vectorCol="features", maxIter=100, reservedCols=["category"], k=3, predictionCol="prediction_result", predictionDetailCol="prediction_detail"}"
 output = null
 sideOutputs = null
inputs = {BatchOperator[1]@5888} 
 0 = {MapBatchOp@3748} "UnnamedTable$1"
  mapperBuilder = {VectorAssembler$lambda@3744} 
  params = {Params@3754} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}"
  output = {TableImpl@5862} "UnnamedTable$1"
  sideOutputs = null			
			
// 調用棧以下			
			
linkFrom:84, KMeansTrainBatchOp (com.alibaba.alink.operator.batch.clustering)
train:31, KMeans (com.alibaba.alink.pipeline.clustering)
fit:34, Trainer (com.alibaba.alink.pipeline)
fit:117, Pipeline (com.alibaba.alink.pipeline)
main:31, KMeansExample (com.alibaba.alink)

KMeansTrainBatchOp.linkFrom是算法重點。這裏其實就是生成了算法所須要的一切前提,把各類Flink算子搭建好。後續會再提到。

fit函數生成了 KMeansModel,其transform函數在基類MapModel中實現,會在下一個transform階段完成調用。這個就是訓練出來的KMeans模型,其也是一個Transformer。

// Find  the closest cluster center for every point.
public class KMeansModel extends MapModel<KMeansModel>
	implements KMeansPredictParams <KMeansModel> {

	public KMeansModel(Params params) {
		super(KMeansModelMapper::new, params);
	}
}

4.5 生成新的轉換流水線

前面說到了,Pipeline的fit函數,返回一個PipelineModel。這個PipelineModel在後續會繼續調用transform,完成轉換階段。

return new PipelineModel(transformers).setMLEnvironmentId(input.getMLEnvironmentId());

5. 轉換階段transform

轉換階段的流水線,依然要從VectorAssembler入手來讀取csv,進行map處理。而後調用 KMeansModel。

PipelineModel會繼續調用transform函數。其做用是把Transformer轉化爲BatchOperator。這時候其內部變量以下,看出來已經從最初流水線各類類型參雜 轉換爲 統一transform實例。

this = {PipelineModel@5016} 
 transformers = {TransformerBase[2]@5017} 

  0 = {VectorAssembler@1198} 
   mapperBuilder = {VectorAssembler$lambda@2855} 
   params = {Params@2856} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}"
     
  1 = {KMeansModel@5009} 
   mapperBuilder = {KMeansModel$lambda@5011} 
   modelData = {TableImpl@4984} "UnnamedTable$2"
   params = {Params@5012} "Params {vectorCol="features", maxIter=100, reservedCols=["category"], k=3, predictionCol="prediction_result", predictionDetailCol="prediction_detail"}"
 modelData = null
 params = {Params@5018} "Params {MLEnvironmentId=0}"
  • 第一次transform調用到了MapBatchOp.linkFrom,就是VectorAssembler.transform調用到的,其做用和 在 fit 流水線中起到的做用同樣,下面註釋中有解釋。

  • 第二次transform調用到了ModelMapBatchOp.linkFrom,就是KMeansModel.transform間接調用到的。下面註釋中有解釋。

這兩次 transform 的調用生成了 BatchOperator 的串聯。最終返回結果是 ModelMapBatchOp,即一個BatchOperator。轉換將由ModelMapBatchOp來轉換。

// The model fitted by Pipeline.
public class PipelineModel extends ModelBase<PipelineModel> implements LocalPredictable {
    @Override
    public BatchOperator<?> transform(BatchOperator input) {
        for (TransformerBase transformer : this.transformers) {
            input = transformer.transform(input);
        }
        return input;
    }  
}
   
// 通過變化後,獲得一個最終的轉化結果 BatchOperator,以此來轉換
// {KMeansModel$lambda@5050} 就是 KMeansModelMapper,轉換邏輯。

input = {ModelMapBatchOp@5047} "UnnamedTable$3"
 mapperBuilder = {KMeansModel$lambda@5050} 
 params = {Params@5051} "Params {vectorCol="features", maxIter=100, reservedCols=["category"], k=3, predictionCol="prediction_result", predictionDetailCol="prediction_detail"}"
  params = {HashMap@5058}  size = 6
   "vectorCol" -> ""features""
   "maxIter" -> "100"
   "reservedCols" -> "["category"]"
   "k" -> "3"
   "predictionCol" -> ""prediction_result""
   "predictionDetailCol" -> ""prediction_detail""
 output = {TableImpl@5052} "UnnamedTable$3"
  tableEnvironment = {BatchTableEnvironmentImpl@5054} 
  operationTree = {DataSetQueryOperation@5055} 
  operationTreeBuilder = {OperationTreeBuilder@5056} 
  lookupResolver = {LookupCallResolver@5057} 
  tableName = "UnnamedTable$3"
 sideOutputs = null
    
// MapTransformer是VectorAssembler的基類。transform會生成一個MapBatchOp,而後再調用MapBatchOp.linkFrom。
public abstract class MapTransformer<T extends MapTransformer <T>>
		extends TransformerBase<T> implements LocalPredictable {
	@Override
	public BatchOperator transform(BatchOperator input) {
		return new MapBatchOp(this.mapperBuilder, this.params).linkFrom(input);
	}  
}
    
// MapModel是KMeansModel的基類,transform會生成一個ModelMapBatchOp,而後再調用ModelMapBatchOp.linkFrom。
public abstract class MapModel<T extends MapModel<T>>
		extends ModelBase<T> implements LocalPredictable {
	@Override
	public BatchOperator transform(BatchOperator input) {
		return new ModelMapBatchOp(this.mapperBuilder, this.params)
				.linkFrom(BatchOperator.fromTable(this.getModelData())
					.setMLEnvironmentId(input.getMLEnvironmentId()), input);
	}
}

在這兩個linkFrom中,仍是分別生成了兩個MapOperator,而後拼接起來,構成了一個 BatchOperator 串。從上面代碼中能夠看出,KMeansModel對應的ModelMapBatchOp,其linkFrom會返回一個ModelMapperAdapter。ModelMapperAdapter是一個RichMapFunction類型,它會把KMeansModelMapper做爲RichMapFunction.function成員變量保存起來。而後會調用 .map(new ModelMapperAdapter(mapper, modelSource)),map就是Flink算子,這樣轉換算法就和Flink聯繫起來了

最後 Keans 算法的轉換工做是經過 KMeansModelMapper.map 來完成的

6. 運行

咱們都知道,Flink程序中,爲了讓程序運行,須要

  • 獲取execution environment : 調用相似 getExecutionEnvironment() 來獲取environment;
  • 觸發程序執行 : 調用相似 env.execute("KMeans Example"); 來真正執行。

Alink其實就是一個Flink應用,只不過要比普通Flink應用複雜太多。

可是從實例代碼中,咱們沒有看到相似調用。這說明Alink封裝的很是好,可是做爲好奇的程序員,咱們須要知道究竟這些調用隱藏在哪裏。

獲取執行環境

Alink是在Pipeline執行的時候,獲取到運行環境。具體來講,由於csv文件是最初的輸入,因此當transform調用其 in.getSchema() 時候,會獲取運行環境。

public final class CsvSourceBatchOp extends BaseSourceBatchOp<CsvSourceBatchOp>
    implements CsvSourceParams<CsvSourceBatchOp> {
    @Override
    public Table initializeDataSource() {
      ExecutionEnvironment execEnv = MLEnvironmentFactory.get(getMLEnvironmentId()).getExecutionEnvironment();
    }
}

initializeDataSource:77, CsvSourceBatchOp (com.alibaba.alink.operator.batch.source)
getOutputTable:52, BaseSourceBatchOp (com.alibaba.alink.operator.batch.source)
getSchema:180, AlgoOperator (com.alibaba.alink.operator)
linkFrom:34, MapBatchOp (com.alibaba.alink.operator.batch.utils)
transform:34, MapTransformer (com.alibaba.alink.pipeline)
fit:122, Pipeline (com.alibaba.alink.pipeline)
main:31, KMeansExample (com.alibaba.alink)

觸發程序運行

截止到如今,Alink已經作了不少東西,也映射到了 Flink算子上,那麼究竟什麼地方纔真正和Flink聯繫起來呢?

print 調用的是BatchOperator.print,真正從這裏開始,會一層一層調用下去,最後來到

package com.alibaba.alink.operator.batch.utils;

public class PrintBatchOp extends BaseSinkBatchOp<PrintBatchOp> {
	@Override
	protected PrintBatchOp sinkFrom(BatchOperator in) {
		this.setOutputTable(in.getOutputTable());
		if (null != this.getOutputTable()) {
			try {
                // 在這個 collect 以後,會進入到 Flink 的runtime之中。
				List <Row> rows = DataSetConversionUtil.fromTable(getMLEnvironmentId(), this.getOutputTable()).collect();
				batchPrintStream.println(TableUtil.formatTitle(this.getColNames()));
				for (Row row : rows) {
					batchPrintStream.println(TableUtil.formatRows(row));
				}
			} catch (Exception ex) {
				throw new RuntimeException(ex);
			}
		}
		return this;
	}    
}

在 LocalEnvironment 這裏把Alink和Flink的運行環境聯繫起來。

public class LocalEnvironment extends ExecutionEnvironment {
	@Override
	public String getExecutionPlan() throws Exception {
		Plan p = createProgramPlan(null, false);
        
        // 下面會真正的和Flink聯繫起來。
		if (executor != null) {
			return executor.getOptimizerPlanAsJSON(p);
		}
		else {
			PlanExecutor tempExecutor = PlanExecutor.createLocalExecutor(configuration);
			return tempExecutor.getOptimizerPlanAsJSON(p);
		}
	}    
}

// 調用棧以下

execute:91, LocalEnvironment (org.apache.flink.api.java)
execute:820, ExecutionEnvironment (org.apache.flink.api.java)
collect:413, DataSet (org.apache.flink.api.java)
sinkFrom:40, PrintBatchOp (com.alibaba.alink.operator.batch.utils)
sinkFrom:18, PrintBatchOp (com.alibaba.alink.operator.batch.utils)
linkFrom:31, BaseSinkBatchOp (com.alibaba.alink.operator.batch.sink)
linkFrom:17, BaseSinkBatchOp (com.alibaba.alink.operator.batch.sink)
link:89, BatchOperator (com.alibaba.alink.operator.batch)
linkTo:239, BatchOperator (com.alibaba.alink.operator.batch)
print:337, BatchOperator (com.alibaba.alink.operator.batch)
main:31, KMeansExample (com.alibaba.alink)

0x05 底層--迭代計算框架

這裏對應以下設計原則:

  • 構建一套戰術打法(middleware或者adapter),即屏蔽了Flink,又能夠利用好Flink,還可讓用戶基於此能夠快速開發算法
  • 採用最簡單,最多見的開發語言和開發模式。

讓咱們想一想看,大概有哪些基礎工做須要作:

  • 如何初始化
  • 如何通訊
  • 如何分割代碼,如何廣播代碼
  • 如何分割數據,如何廣播數據
  • 如何迭代算法

其中最重要的概念是IterativeComQueue,這是把通訊或者計算抽象成ComQueueItem,而後把ComQueueItem串聯起來造成隊列。這樣就造成了面向迭代計算場景的一套迭代通訊計算框架。

再次把目錄結構列在這裏:

./java/com/alibaba/alink/common:
MLEnvironment.java		linalg MLEnvironmentFactory.java	mapper
VectorTypes.java		model comqueue			utils io

裏面大體有 :

  • Flink 封裝模塊 :MLEnvironment.java, MLEnvironmentFactory.java。
  • 線性代數模塊:linalg。
  • 計算/通信隊列模塊:comqueue,其中ComputeFunction進行計算,好比訓練算法。
  • 映射模塊:mapper,其中Mapper進行各類映射,好比 ModelMapper 把模型映射爲數值(就是轉換算法)。
  • 模型 :model,主要是用來讀取model source。
  • 基礎模塊:utils,io。

算法組件在其linkFrom函數中,會作以下操做:

  • 先進行部分初始化,此時會調用部分Flink算子,好比groupBy等等。
  • 再將算法邏輯剝離出來,委託給Mapper或者ComQueueItem。
  • Mapper或者ComQueueItem會調用Flink map算子或者mapPartition算子等。
  • 調用Flink算子過程就是把算法分割而後適配到Flink上的過程。

下面就一一闡述。

1. Flink上下文封裝

MLEnvironment 是個重要的類。其封裝了Flink開發所必需要的運行上下文。用戶能夠經過這個類來獲取各類實際運行環境,能夠創建table,能夠運行SQL語句。

/**
 * The MLEnvironment stores the necessary context in Flink.
 * Each MLEnvironment will be associated with a unique ID.
 * The operations associated with the same MLEnvironment ID
 * will share the same Flink job context.
 */
public class MLEnvironment {
    private ExecutionEnvironment env;
    private StreamExecutionEnvironment streamEnv;
    private BatchTableEnvironment batchTableEnv;
    private StreamTableEnvironment streamTableEnv;
}

2. Function

Function是計算框架中,對於計算和通信等業務邏輯的最小模塊。具體定義以下。

  • ComputeFunction 是計算模塊。
  • CommunicateFunction 是通信模塊。CommunicateFunction和ComputeFunction都是ComQueueItem子類,它們是業務邏輯實現者。
  • CompareCriterionFunction 是判斷模塊,用來判斷什麼時候結束循環。這就容許用戶指定迭代終止條件。
  • CompleteResultFunction 用來在結束循環時候調用,做爲循環結果。
  • Mapper也是一種Funciton,即Mapper Function。

後續將統稱爲 Function。

/**
 * Basic build block in {@link BaseComQueue}, for either communication or computation.
 */
public interface ComQueueItem extends Serializable {}

/**
 * An BaseComQueue item for computation.
 */
public abstract class ComputeFunction implements ComQueueItem {

	/**
	 * Perform the computation work.
	 *
	 * @param context to get input object and update output object.
	 */
	public abstract void calc(ComContext context);
}

/**
 * An BaseComQueue item for communication.
 */
public abstract class CommunicateFunction implements ComQueueItem {

    /**
     * Perform communication work.
     *
     * @param input     output of previous queue item.
     * @param sessionId session id for shared objects.
     * @param <T>       Type of dataset.
     * @return result dataset.
     */
	public abstract <T> DataSet <T> communicateWith(DataSet <T> input, int sessionId);
}

結合咱們代碼來看,KMeansTrainBatchOp算法組件的部分做用是:KMeans算法被分割成若干CommunicateFunction。而後被添加到計算通信隊列上。

下面代碼中,具體 Item 以下:

  • ComputeFunction :KMeansPreallocateCentroid,KMeansAssignCluster,KMeansUpdateCentroids
  • CommunicateFunction :AllReduce
  • CompareCriterionFunction :KMeansIterTermination
  • CompleteResultFunction : KMeansOutputModel

即算法實現的主要工做是:

  • 構建了一個IterativeComQueue。
  • 初始化數據,這裏有兩種辦法:initWithPartitionedData將DataSet分片緩存至內存。initWithBroadcastData將DataSet總體緩存至每一個worker的內存。
  • 將計算分割爲若干ComputeFunction,串聯在IterativeComQueue
  • 運用AllReduce通訊模型完成了數據同步
static DataSet <Row> iterateICQ(...省略...) {

		return new IterativeComQueue()
			.initWithPartitionedData(TRAIN_DATA, data)
			.initWithBroadcastData(INIT_CENTROID, initCentroid)
			.initWithBroadcastData(KMEANS_STATISTICS, statistics)
			.add(new KMeansPreallocateCentroid())
			.add(new KMeansAssignCluster(distance))
			.add(new AllReduce(CENTROID_ALL_REDUCE))
			.add(new KMeansUpdateCentroids(distance))
			.setCompareCriterionOfNode0(new KMeansIterTermination(distance, tol))
			.closeWith(new KMeansOutputModel(distanceType, vectorColName, latitudeColName, longitudeColName))
			.setMaxIter(maxIter)
			.exec();
	}

3. 計算/通信隊列

BaseComQueue 就是這個迭代框架的基礎。它維持了一個 List<ComQueueItem> queue。用戶在生成算法模塊時候,會把各類 Function 添加到隊列中。

IterativeComQueue 是 BaseComQueue 的缺省實現,具體實現了setMaxIter,setCompareCriterionOfNode0兩個函數。

BaseComQueue兩個重要函數是:

  • optimize 函數:把隊列上相鄰的 ComputeFunction串聯起來,造成一個 ChainedComputation。在框架中進行優化,就是Alink的一個優點所在
  • exec 函數:運行隊列上的各個 Function,返回最終的 Dataset。實際上,這裏才真正到了 Flink,好比把計算隊列上的各個 ComputeFunction 映射到 Flink 的 RichMapPartitionFunction。而後在mapPartition函數調用中,會調用真實算法邏輯片段 computation.calc(context);

能夠認爲,BaseComQueue 是個邏輯概念,讓算法工程師能夠更好的組織本身的業務語言。而經過其exec函數把算法邏輯映射到Flink算子上。這樣在某種程度上起到了與Flink解耦合的做用。

具體定義(摘取函數內部分代碼)以下:

// Base class for the com(Computation && Communicate) queue.
public class BaseComQueue<Q extends BaseComQueue<Q>> implements Serializable {

	/**
	 * All computation or communication functions.
	 */
	private final List<ComQueueItem> queue = new ArrayList<>();
    
	/**
	 * The function executed to decide whether to break the loop.
	 */
	private CompareCriterionFunction compareCriterion;

	/**
	 * The function executed when closing the iteration
	 */
	private CompleteResultFunction completeResult;    
    
	private void optimize() {
		if (queue.isEmpty()) {
			return;
		}

		int current = 0;
		for (int ahead = 1; ahead < queue.size(); ++ahead) {
			ComQueueItem curItem = queue.get(current);
			ComQueueItem aheadItem = queue.get(ahead);

            // 這裏進行判斷,是不是先後都是 ComputeFunction,而後合併成 ChainedComputation
			if (aheadItem instanceof ComputeFunction && curItem instanceof ComputeFunction) {
				if (curItem instanceof ChainedComputation) {
					queue.set(current, ((ChainedComputation) curItem).add((ComputeFunction) aheadItem));
				} else {
					queue.set(current, new ChainedComputation()
						.add((ComputeFunction) curItem)
						.add((ComputeFunction) aheadItem)
					);
				}
			} else {
				queue.set(++current, aheadItem);
			}
		}

		queue.subList(current + 1, queue.size()).clear();
	}    
    
	/**
	 * Execute the BaseComQueue and get the result dataset.
	 *
	 * @return result dataset.
	 */
	public DataSet<Row> exec() {
        
		optimize();

		IterativeDataSet<byte[]> loop
			= loopStartDataSet(executionEnvironment)
			.iterate(maxIter);

		DataSet<byte[]> input = loop
			.mapPartition(new DistributeData(cacheDataObjNames, sessionId))
			.withBroadcastSet(loop, "barrier")
			.name("distribute data");

		for (ComQueueItem com : queue) {
			if ((com instanceof CommunicateFunction)) {
				CommunicateFunction communication = ((CommunicateFunction) com);         
         // 這裏會調用好比 AllReduce.communication, 其會返回allReduce包裝後賦值給input,當循環遇到了下一個ComputeFunction(KMeansUpdateCentroids)時候,會把input賦給它處理。好比input = {MapPartitionOperator@5248},input.function = {AllReduce$AllReduceRecv@5260},input調用mapPartition,去間接調用KMeansUpdateCentroids。              
				input = communication.communicateWith(input, sessionId);
			} else if (com instanceof ComputeFunction) {
				final ComputeFunction computation = (ComputeFunction) com;

        // 這裏纔到了 Flink,把計算隊列上的各個 ComputeFunction 映射到 Flink 的RichMapPartitionFunction。
				input = input
						.mapPartition(new RichMapPartitionFunction<byte[], byte[]>() {

						@Override
						public void mapPartition(Iterable<byte[]> values, Collector<byte[]> out) {
							ComContext context = new ComContext(
								sessionId, getIterationRuntimeContext()
							);
              // 在這裏會被Flink調用具體計算函數,就是以前算法工程師拆分的算法片斷。
							computation.calc(context);
						}
					})
					.withBroadcastSet(input, "barrier")
					.name(com instanceof ChainedComputation ?
						((ChainedComputation) com).name()
						: "computation@" + computation.getClass().getSimpleName());
			} else {
				throw new RuntimeException("Unsupported op in iterative queue.");
			}
		}

		return serializeModel(clearObjs(loopEnd));
	}
}

4. Mapper(Function)

Mapper是底層迭代計算框架的一部分,能夠認爲是 Mapper Function。由於涉及到業務邏輯,因此提早說明。

5. 初始化

初始化發生在 KMeansTrainBatchOp.linkFrom 中。咱們能夠看到在初始化時候,是能夠調用 Flink 各類算子(好比.rebalance().map()) ,由於這時候尚未和框架相關聯,這時候的計算是用戶自行控制,不須要加到 IterativeComQueue 之上。

若是某一個計算既要加到 IterativeComQueue 之上,還要本身玩 Flink 算子,那框架就懵圈了,不知道該如何處理。因此用戶自由操做只能發生在沒有和框架聯繫以前

@Override
	public KMeansTrainBatchOp linkFrom(BatchOperator <?>... inputs) {
		DataSet <FastDistanceVectorData> data = statistics.f0.rebalance().map(
			new MapFunction <Vector, FastDistanceVectorData>() {
				@Override
				public FastDistanceVectorData map(Vector value) {
					return distance.prepareVectorData(Row.of(value), 0);
				}
			});
		......     
    }

框架也提供了初始化功能,用於將DataSet緩存到內存中,緩存的形式包括Partition和Broadcast兩種形式。前者將DataSet分片緩存至內存,後者將DataSet總體緩存至每一個worker的內存。

return new IterativeComQueue()
			.initWithPartitionedData(TRAIN_DATA, data)
			.initWithBroadcastData(INIT_CENTROID, initCentroid)
			.initWithBroadcastData(KMEANS_STATISTICS, statistics)
            ......

6. ComputeFunction

這是算法的具體計算模塊,算法工程師應該把算法拆分紅各個能夠並行處理的模塊,分別用 ComputeFunction 實現,這樣能夠利用 Flnk 的分佈式計算效力。

下面舉出一個例子以下,這段代碼爲每一個點(point)計算最近的聚類中心,爲每一個聚類中心的點座標的計數和求和:

/**
 * Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster.
 */
public class KMeansAssignCluster extends ComputeFunction {
    private FastDistance fastDistance;
    private transient DenseMatrix distanceMatrix;

    @Override
    public void calc(ComContext context) {
        Integer vectorSize = context.getObj(KMeansTrainBatchOp.VECTOR_SIZE);
        Integer k = context.getObj(KMeansTrainBatchOp.K);
        // get iterative coefficient from static memory.
        Tuple2<Integer, FastDistanceMatrixData> stepNumCentroids;
        if (context.getStepNo() % 2 == 0) {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
        } else {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
        }

        if (null == distanceMatrix) {
            distanceMatrix = new DenseMatrix(k, 1);
        }

        double[] sumMatrixData = context.getObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE);
        if (sumMatrixData == null) {
            sumMatrixData = new double[k * (vectorSize + 1)];
            context.putObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE, sumMatrixData);
        }

        Iterable<FastDistanceVectorData> trainData = context.getObj(KMeansTrainBatchOp.TRAIN_DATA);
        if (trainData == null) {
            return;
        }

        Arrays.fill(sumMatrixData, 0.0);
        for (FastDistanceVectorData sample : trainData) {
            KMeansUtil.updateSumMatrix(sample, 1, stepNumCentroids.f1, vectorSize, sumMatrixData, k, fastDistance,
                distanceMatrix);
        }
    }
}

這裏可以看出,在 ComputeFunction 中,使用的是 命令式編程模式,這樣可以最大的契合目前程序員現狀,極高提高生產力

7. CommunicateFunction

前面代碼中有一個關鍵處 .add(new AllReduce(CENTROID_ALL_REDUCE))。這部分代碼起到了承前啓後的做用。以前的 KMeansPreallocateCentroid,KMeansAssignCluster和其後的 KMeansUpdateCentroids經過它作了一個 reduce / broadcast 通信。

具體從註解中能夠看到,AllReduce 是 MPI 相關通信原語的一個實現。這裏主要是對 double[] object 進行 reduce / broadcast。

public class AllReduce extends CommunicateFunction {
	public static <T> DataSet <T> allReduce(
		DataSet <T> input,
		final String bufferName,
		final String lengthName,
		final SerializableBiConsumer <double[], double[]> op,
		final int sessionId) {
		final String transferBufferName = UUID.randomUUID().toString();

		return input
			.mapPartition(new AllReduceSend <T>(bufferName, lengthName, transferBufferName, sessionId))
			.withBroadcastSet(input, "barrier")
			.returns(
				new TupleTypeInfo <>(Types.INT, Types.INT, PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO))
			.name("AllReduceSend")
			.partitionCustom(new Partitioner <Integer>() {
				@Override
				public int partition(Integer key, int numPartitions) {
					return key;
				}
			}, 0)
			.name("AllReduceBroadcastRaw")
			.mapPartition(new AllReduceSum(bufferName, lengthName, sessionId, op))
			.returns(
				new TupleTypeInfo <>(Types.INT, Types.INT, PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO))
			.name("AllReduceSum")
			.partitionCustom(new Partitioner <Integer>() {
				@Override
				public int partition(Integer key, int numPartitions) {
					return key;
				}
			}, 0)
			.name("AllReduceBroadcastSum")
			.mapPartition(new AllReduceRecv <T>(bufferName, lengthName, sessionId))
			.returns(input.getType())
			.name("AllReduceRecv");
	}    
}

通過調試咱們能看出來,AllReduceSum 是在本身mapPartition實現中,調用了 SUM。

/**
	 * The all-reduce operation which does elementwise sum operation.
	 */
	public final static SerializableBiConsumer <double[], double[]> SUM
		= new SerializableBiConsumer <double[], double[]>() {
		@Override
		public void accept(double[] a, double[] b) {
			for (int i = 0; i < a.length; ++i) {
				a[i] += b[i];
			}
		}
	};

private static class AllReduceSum extends RichMapPartitionFunction <Tuple3 <Integer, Integer, double[]>, Tuple3 <Integer, Integer, double[]>> {
		@Override
		public void mapPartition(Iterable <Tuple3 <Integer, Integer, double[]>> values,
								 Collector <Tuple3 <Integer, Integer, double[]>> out) {

      // 省略各類初始化操做,好比肯定傳輸位置,傳輸目標等
      ......
	
			do {
				Tuple3 <Integer, Integer, double[]> val = it.next();
				int localPos = val.f1 - startPos;
				if (sum[localPos] == null) {
					sum[localPos] = val.f2;
					agg[localPos]++;
				} else {
          // 這裏會調用 SUM 
					op.accept(sum[localPos], val.f2);
				}
			} while (it.hasNext());

			for (int i = 0; i < numOfSubTasks; ++i) {
				for (int j = 0; j < cnt; ++j) {
					out.collect(Tuple3.of(i, startPos + j, sum[j]));
				}
			}
		}
	}

accept:129, AllReduce$3 (com.alibaba.alink.common.comqueue.communication)
accept:126, AllReduce$3 (com.alibaba.alink.common.comqueue.communication)
mapPartition:314, AllReduce$AllReduceSum (com.alibaba.alink.common.comqueue.communication)
run:103, MapPartitionDriver (org.apache.flink.runtime.operators)
run:504, BatchTask (org.apache.flink.runtime.operators)
run:157, AbstractIterativeTask (org.apache.flink.runtime.iterative.task)
run:107, IterationIntermediateTask (org.apache.flink.runtime.iterative.task)
invoke:369, BatchTask (org.apache.flink.runtime.operators)
doRun:705, Task (org.apache.flink.runtime.taskmanager)
run:530, Task (org.apache.flink.runtime.taskmanager)
run:745, Thread (java.lang)

0x06 另外一種打法

總結到如今,咱們發現這個迭代計算框架設計的很是優秀。可是Alink並無限定你們只能使用這個框架來實現算法。若是你是Flink高手,你徹底能夠爲所欲爲的實現。

Alink例子中自己就有一個這樣的實現 ALSExample。其核心類 AlsTrainBatchOp 就是直接使用了 Flink 算子,IterativeDataSet 等。

這就比如是武松武都頭,一雙戒刀搠得倒貪官佞臣,赤手空拳也打得死吊睛白額大蟲

public final class AlsTrainBatchOp
    extends BatchOperator<AlsTrainBatchOp>
    implements AlsTrainParams<AlsTrainBatchOp> {

    @Override
    public AlsTrainBatchOp linkFrom(BatchOperator<?>... inputs) {
        BatchOperator<?> in = checkAndGetFirst(inputs);

 		......

        AlsTrain als = new AlsTrain(rank, numIter, lambda, implicitPrefs, alpha, numMiniBatches, nonNegative);
        DataSet<Tuple3<Byte, Long, float[]>> factors = als.fit(alsInput);

        DataSet<Row> output = factors.mapPartition(new RichMapPartitionFunction<Tuple3<Byte, Long, float[]>, Row>() {
            @Override
            public void mapPartition(Iterable<Tuple3<Byte, Long, float[]>> values, Collector<Row> out) {
                new AlsModelDataConverter(userColName, itemColName).save(values, out);
            }
        });
 
        return this;
    }
}

多提一點,Flink ML中也有ALS算法,是一個Scala實現。沒有Scala經驗的算法工程師看代碼會咬碎鋼牙。

0x07 總結

通過這兩篇文章的推測和驗證,如今咱們總結以下。

Alink的部分設計原則

  • 算法的歸算法,Flink的歸Flink,儘可能屏蔽AI算法和Flink之間的聯繫。

  • 採用最簡單,最多見的開發語言和思惟方式。

  • 儘可能借鑑市面上通用的機器學習設計思路和開發模式,讓開發者無縫切換。

  • 構建一套戰術打法(middleware或者adapter),即屏蔽了Flink,又能夠利用好Flink,還可讓用戶基於此能夠快速開發算法。

針對這些原則,Alink實現了

  • 頂層流水線,Estimator, Transformer...
  • 算法組件中間層
  • 底層迭代計算框架

這樣Alink便可以最大限度的享受Flink帶來的各類優點,也能順應目前形勢,讓算法工程師工做更方便。從而達到系統性能和生產力的雙重提高。

下一篇文章爭取介紹 AllReduce 的具體實現。

0x08 參考

k-means聚類算法原理簡析

flink kmeans聚類算法實現

Spark ML簡介之Pipeline,DataFrame,Estimator,Transformer

開源 | 全球首個批流一體機器學習平臺

斬獲GitHub 2000+ Star,阿里雲開源的 Alink 機器學習平臺如何跑贏雙11數據「博弈」?|AI 技術生態論

Flink DataSet API

相關文章
相關標籤/搜索