Alink 是阿里巴巴基於實時計算引擎 Flink 研發的新一代機器學習算法平臺,是業界首個同時支持批式算法、流式算法的機器學習平臺。本文是漫談系列的第二篇,將從源碼入手,帶領你們具體剖析Alink設計思想和架構爲什麼。html
由於Alink的公開資料太少,因此均爲自行揣測,確定會有疏漏錯誤,但願你們指出,我會隨時更新。java
前文中 Alink漫談(一) : 從KMeans算法實現看Alink設計思想 咱們推測總結出Alink部分設計原則git
算法的歸算法,Flink的歸Flink,儘可能屏蔽AI算法和Flink之間的聯繫。程序員
採用最簡單,最多見的開發語言和思惟方式。算法
儘可能借鑑市面上通用的機器學習設計思路和開發模式,讓開發者無縫切換。sql
構建一套戰術打法(middleware或adapter),即屏蔽了Flink,又能夠利用好Flink,還能讓用戶快速開發算法。apache
下面咱們就針對這些設計原則,從上至下看看Alink如何設計本身這套戰術打法。編程
爲了能讓你們更好理解,先整理一個概要圖。由於Alink系統主要能夠分紅三個層面(頂層流水線, 中間層算法組件, 底層迭代計算框架),再加上一個Flink runtime,因此下圖就是分別從這四個層面出發來看程序執行流程。api
如何看待 pipeline.fit(data).transform(data).print(); // 從頂層流水線角度看 訓練流水線 +-----> [VectorAssembler(Transformer)] -----> [KMeans(Estimator)] | // KMeans.fit以後,會生成一個KMeansModel用來轉換 | 轉換流水線 +-----> [VectorAssembler(Transformer)] -----> [KMeansModel(Transformer)] // 從中間層算法組件角度看 訓練算法組件 +-----> [MapBatchOp] -----> [KMeansTrainBatchOp] | // VectorAssemblerMapper in MapBatchOp 是業務邏輯 | 轉換算法組件 +-----> [MapBatchOp] -----> [ModelMapBatchOp] // VectorAssemblerMapper in MapBatchOp 是業務邏輯 // KMeansModelMapper in ModelMapBatchOp 是業務邏輯 // 從底層迭代計算框架角度看 訓練by框架 +-----> [VectorAssemblerMapper] -----> [KMeansPreallocateCentroid / KMeansAssignCluster / AllReduce / KMeansUpdateCentroids in IterativeComQueue] | // 映射到Flink的各類算子進行訓練 | 轉換(直接) +-----> [VectorAssemblerMapper] -----> [KMeansModelMapper] // 映射到Flink的各類算子進行轉換 // 從Flink runtime角度看 訓練 +-----> map, mapPartiiton... | // VectorAssemblerMapper.map等會被調用 | 轉換 +-----> map, mapPartiiton... // 好比調用 KMeansModelMapper.map 來轉換
示例代碼仍是用以前的KMeans算法部分模塊。數組
public class KMeansExample { public static void main(String[] args) throws Exception { ...... BatchOperator data = new CsvSourceBatchOp().setFilePath(URL).setSchemaStr(SCHEMA_STR); VectorAssembler va = new VectorAssembler() .setSelectedCols(new String[]{"sepal_length", "sepal_width", "petal_length", "petal_width"}) .setOutputCol("features"); KMeans kMeans = new KMeans().setVectorCol("features").setK(3) .setPredictionCol("prediction_result") .setPredictionDetailCol("prediction_detail") .setReservedCols("category") .setMaxIter(100); Pipeline pipeline = new Pipeline().add(va).add(kMeans); pipeline.fit(data).transform(data).print(); } }
public final class KMeansTrainBatchOp extends BatchOperator <KMeansTrainBatchOp> implements KMeansTrainParams <KMeansTrainBatchOp> { static DataSet <Row> iterateICQ(...省略...) { return new IterativeComQueue() .initWithPartitionedData(TRAIN_DATA, data) .initWithBroadcastData(INIT_CENTROID, initCentroid) .initWithBroadcastData(KMEANS_STATISTICS, statistics) .add(new KMeansPreallocateCentroid()) .add(new KMeansAssignCluster(distance)) .add(new AllReduce(CENTROID_ALL_REDUCE)) .add(new KMeansUpdateCentroids(distance)) .setCompareCriterionOfNode0(new KMeansIterTermination(distance, tol)) .closeWith(new KMeansOutputModel(distanceType, vectorColName, latitudeColName, longitudeColName)) .setMaxIter(maxIter) .exec(); } }
基於點計數和座標,計算新的聚類中心。
// Update the centroids based on the sum of points and point number belonging to the same cluster. public class KMeansUpdateCentroids extends ComputeFunction { @Override public void calc(ComContext context) { Integer vectorSize = context.getObj(KMeansTrainBatchOp.VECTOR_SIZE); Integer k = context.getObj(KMeansTrainBatchOp.K); double[] sumMatrixData = context.getObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE); Tuple2<Integer, FastDistanceMatrixData> stepNumCentroids; if (context.getStepNo() % 2 == 0) { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2); } else { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1); } stepNumCentroids.f0 = context.getStepNo(); context.putObj(KMeansTrainBatchOp.K, updateCentroids(stepNumCentroids.f1, k, vectorSize, sumMatrixData, distance)); } }
本部分實現的設計原則是 :儘可能借鑑市面上通用的設計思路和開發模式,讓開發者無縫切換。
一個典型的機器學習過程從數據收集開始,要經歷多個步驟,才能獲得須要的輸出。這很是相似於流水線式工做,即一般會包含源數據ETL(抽取、轉化、加載),數據預處理,指標提取,模型訓練與交叉驗證,新數據預測等步驟。
先來講一下幾個重要的概念:
從 Alink的目錄結構中 ,咱們能夠看出,Alink提供了這些常見概念(其中有些代碼借鑑了Flink ML)。
./java/com/alibaba/alink: common operator params pipeline ./java/com/alibaba/alink/params: associationrule evaluation nlp regression statistics classification feature onlinelearning shared tuning clustering io outlier similarity udf dataproc mapper recommendation sql validators ./java/com/alibaba/alink/pipeline: EstimatorBase.java ModelBase.java Trainer.java feature LocalPredictable.java ModelExporterUtils.java TransformerBase.java nlp LocalPredictor.java Pipeline.java classification recommendation MapModel.java PipelineModel.java clustering regression MapTransformer.java PipelineStageBase.java dataproc tuning
比較基礎的是三個接口:PipelineStages,Transformer,Estimator,分別剛好對應了機器學習的兩個通用概念 :轉換器 ,評估器。PipelineStages是這兩個的基礎接口。
// Base class for a stage in a pipeline. The interface is only a concept, and does not have any actual functionality. Its subclasses must be either Estimator or Transformer. No other classes should inherit this interface directly. public interface PipelineStage<T extends PipelineStage<T>> extends WithParams<T>, Serializable // A transformer is a PipelineStage that transforms an input Table to a result Table. public interface Transformer<T extends Transformer<T>> extends PipelineStage<T> // Estimators are PipelineStages responsible for training and generating machine learning models. public interface Estimator<E extends Estimator<E, M>, M extends Model<M>> extends PipelineStage<E>
其次是三個抽象類定義:PipelineStageBase,EstimatorBase,TransformerBase,分別就對應了以上的三個接口。其中定義了一些基礎操做,好比 fit,transform。
// The base class for a stage in a pipeline, either an EstimatorBase or a TransformerBase. public abstract class PipelineStageBase<S extends PipelineStageBase<S>> implements WithParams<S>, HasMLEnvironmentId<S>, Cloneable // The base class for estimator implementations. public abstract class EstimatorBase<E extends EstimatorBase<E, M>, M extends ModelBase<M>> extends PipelineStageBase<E> implements Estimator<E, M> // The base class for transformer implementations. public abstract class TransformerBase<T extends TransformerBase<T>> extends PipelineStageBase<T> implements Transformer<T>
而後是Pipeline基礎類,這個類就能夠把Transformer,Estimator聯繫起來 。
// A pipeline is a linear workflow which chains EstimatorBases and TransformerBases to execute an algorithm public class Pipeline extends EstimatorBase<Pipeline, PipelineModel> { private ArrayList<PipelineStageBase> stages = new ArrayList<>(); public Pipeline add(PipelineStageBase stage) { this.stages.add(stage); return this; } }
最後是 Parameter 概念相關舉例,好比實例中用到的 VectorAssemblerParams。
// Parameters for MISOMapper. public interface MISOMapperParams<T> extends HasSelectedCols <T>, HasOutputCol <T>, HasReservedCols <T> {} // parameters of vector assembler. public interface VectorAssemblerParams<T> extends MISOMapperParams<T> { ParamInfo <String> HANDLE_INVALID = ParamInfoFactory .createParamInfo("handleInvalid", String.class) .setDescription("parameter for how to handle invalid data (NULL values)") .setHasDefaultValue("error") .build(); }
綜合來講,由於模型和數據,在Alink運行時候,都統一轉化爲Table類型,因此能夠整理以下:
首先是一些基礎抽象類,好比:
// Abstract class for a flat map TransformerBase. public abstract class MapTransformer<T extends MapTransformer <T>> extends TransformerBase<T> implements LocalPredictable { // The base class for a machine learning model. public abstract class ModelBase<M extends ModelBase<M>> extends TransformerBase<M> implements Model<M> // Abstract class for a trainer that train a machine learning model. public abstract class Trainer<T extends Trainer <T, M>, M extends ModelBase<M>> extends EstimatorBase<T, M>
而後就是咱們實例用到的兩個類型定義。
// 這是一個 EstimatorBase 類型 public class KMeans extends Trainer <KMeans, KMeansModel> implements KMeansTrainParams <KMeans>, KMeansPredictParams <KMeans> { @Override protected BatchOperator train(BatchOperator in) { return new KMeansTrainBatchOp(this.getParams()).linkFrom(in); } } // 這是一個 TransformerBase 類型 public class VectorAssembler extends MapTransformer<VectorAssembler> implements VectorAssemblerParams <VectorAssembler> { public VectorAssembler(Params params) { super(VectorAssemblerMapper::new, params); } }
實例中,分別構建了兩個流水線階段,而後這兩個實例就被連接到流水線上。
VectorAssembler va = new VectorAssembler() KMeans kMeans = new KMeans() Pipeline pipeline = new Pipeline().add(va).add(kMeans); // 能看出來,流水線上有兩個階段,分別是VectorAssembler和KMeans。 pipeline = {Pipeline@1201} stages = {ArrayList@2853} size = 2 0 = {VectorAssembler@1199} mapperBuilder = {VectorAssembler$lambda@2859} params = {Params@2860} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}" 1 = {KMeans@1200} params = {Params@2857} "Params {vectorCol="features", maxIter=100, reservedCols=["category"], k=3, predictionCol="prediction_result", predictionDetailCol="prediction_detail"}"
算法組件是中間層的概念,能夠認爲是真正實現算法的模塊/層次。主要做用是承上啓下。
好比
public final class KMeansTrainBatchOp extends BatchOperator <KMeansTrainBatchOp> implements KMeansTrainParams <KMeansTrainBatchOp> // class for a flat map BatchOperator. public class MapBatchOp<T extends MapBatchOp<T>> extends BatchOperator<T>
不管是調用Estimator.fit 仍是 Transformer.transform,其本質都是經過linkFrom函數,把各個Operator聯繫起來,這樣就把數據流串聯起來。而後就能夠逐步映射到Flink具體運行計劃上。
AlgoOperator是算子組件的基類,其子類有BatchOperator和StreamOperator,分別對應了批處理和流處理。
// Base class for algorithm operators. public abstract class AlgoOperator<T extends AlgoOperator<T>> implements WithParams<T>, HasMLEnvironmentId<T>, Serializable // Base class of batch algorithm operators. public abstract class BatchOperator<T extends BatchOperator <T>> extends AlgoOperator <T> { // Link this object to BatchOperator using the BatchOperators as its input. public abstract T linkFrom(BatchOperator <?>... inputs); public <B extends BatchOperator <?>> B linkTo(B next) { return link(next); } public BatchOperator print() throws Exception { return linkTo(new PrintBatchOp().setMLEnvironmentId(getMLEnvironmentId())); } } public abstract class StreamOperator<T extends StreamOperator <T>> extends AlgoOperator <T>
示例代碼以下:
// 輸入csv文件被轉化爲一個BatchOperator BatchOperator data = new CsvSourceBatchOp().setFilePath(URL).setSchemaStr(SCHEMA_STR); ... pipeline.fit(data).transform(data).print();
Mapper是底層迭代計算框架的一部分,是業務邏輯(組件)。從目錄結構能看出。這裏提早說明,是由於在流水線講解過程當中大量涉及,因此就提早放在這裏說明。
./java/com/alibaba/alink/common linalg mapper model comqueue utils io
Mapper的幾個主要類定義以下,其做用普遍,便可以映射輸入到輸出,也能夠映射模型到具體數值。
// Abstract class for mappers. public abstract class Mapper implements Serializable {} // Abstract class for mappers with model. public abstract class ModelMapper extends Mapper {} // Find the closest cluster center for every point. public class KMeansModelMapper extends ModelMapper {} // Mapper with Multi-Input columns and Single Output column(MISO). public abstract class MISOMapper extends Mapper {} // This mapper maps many columns to one vector. the columns should be vector or numerical columns. public class VectorAssemblerMapper extends MISOMapper {}
Mapper的業務邏輯依賴於算法組件來運做,好比 [ VectorAssemblerMapper in MapBatchOp ] ,[ KMeansModelMapper in ModelMapBatchOp ]。
ModelMapper具體運行則須要依賴 ModelMapperAdapter 來和Flink runtime聯繫起來。ModelMapperAdapter繼承了RichMapFunction,ModelMapper做爲其成員變量,在map操做中執行業務邏輯,ModelSource則是數據來源。
對應本實例,KMeansModelMapper 就是最後轉換的 BatchOperator,其map函數用來轉換。
系統內置了一些經常使用的算法組件,好比:
以 ModelMapBatchOp 爲例給你們說明其做用,從下面代碼註釋中能夠看出,linkFrom做用是:
public class ModelMapBatchOp<T extends ModelMapBatchOp<T>> extends BatchOperator<T> { @Override public T linkFrom(BatchOperator<?>... inputs) { checkOpSize(2, inputs); try { BroadcastVariableModelSource modelSource = new BroadcastVariableModelSource(BROADCAST_MODEL_TABLE_NAME); // mapper是映射函數 ModelMapper mapper = this.mapperBuilder.apply( inputs[0].getSchema(), inputs[1].getSchema(), this.getParams()); // modelRows 是模型 DataSet<Row> modelRows = inputs[0].getDataSet().rebalance(); // resultRows 是輸入數據的映射變化 DataSet<Row> resultRows = inputs[1].getDataSet() .map(new ModelMapperAdapter(mapper, modelSource)) // 把模型做爲廣播變量,後續會在 ModelMapperAdapter 中使用 .withBroadcastSet(modelRows, BROADCAST_MODEL_TABLE_NAME); TableSchema outputSchema = mapper.getOutputSchema(); this.setOutput(resultRows, outputSchema); return (T) this; } catch (Exception ex) { throw new RuntimeException(ex); } } }
ModelMapperAdapter 是適配器的實現,用來在flink上運行業務邏輯Mapper。從代碼能夠看出,ModelMapperAdapter取出以前存儲的mapper和模型數據,而後基於此來進行具體算法業務。
/** * Adapt a {@link ModelMapper} to run within flink. * This adapter class hold the target {@link ModelMapper} and it's {@link ModelSource}. Upon open(), it will load model rows from {@link ModelSource} into {@link ModelMapper}. */ public class ModelMapperAdapter extends RichMapFunction<Row, Row> implements Serializable { /** * The ModelMapper to adapt. */ private final ModelMapper mapper; /** * Load model data from ModelSource when open(). */ private final ModelSource modelSource; public ModelMapperAdapter(ModelMapper mapper, ModelSource modelSource) { // mapper是業務邏輯,modelSource是模型Broadcast source this.mapper = mapper; // 在map操做中執行業務邏輯 this.modelSource = modelSource; // 數據來源 } @Override public void open(Configuration parameters) throws Exception { // 從廣播變量中獲取模型數據 List<Row> modelRows = this.modelSource.getModelRows(getRuntimeContext()); this.mapper.loadModel(modelRows); } @Override public Row map(Row row) throws Exception { // 執行業務邏輯,在數據來源上轉換 return this.mapper.map(row); } }
在 pipeline.fit(data)
之中,會沿着流水線依次執行。若是流水線下一個階段遇到了是Transformer,就調用其transform;若是遇到的是EstimatorBase,就先調用其fit,把EstimatorBase轉換爲Transformer,而後再次調用這個轉換出來的Transformer.transform。就這樣一個一個階段執行。
若是流水線下一階段遇到EstimatorBase,會處理EstimatorBase的fit,把流水線上的Estimator轉換爲 TransformerBase。Estimator.fit 接受一個特徵數據併產生一個轉換器。
(若是這個階段 不是 流水線最後一個階段)會對這個 TransformerBase繼續處理。處理以後才能進入到流水線下一階段。
(若是這個階段 是 流水線最後一個階段)不會對這個 TransformerBase 作處理,直接結束流水線 fit 操做。
若是流水線下一階段遇到TransformerBase,就直接調用其transform函數。
對於全部須要處理的TransformerBase,不管是從EstimatorBase轉換出來的,仍是Pipeline原有的 ,都調用其transform函數,轉換其input。input = transformers[i].transform(input);
。這樣每次轉換後的輸出再次賦值給input,做爲流水線下一階段的輸入。
最後獲得一個PipelineModel (其自己也是一個Transformer) ,這個屬於下一階段轉換流水線。
本實例有兩個stage。VectorAssembler是Transformer,KMeans是EstimatorBase。
這時候Pipeline其內部變量是:
this = {Pipeline@1195} stages = {ArrayList@2851} size = 2 0 = {VectorAssembler@1198} mapperBuilder = {VectorAssembler$lambda@2857} params = {Params@2858} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}" 1 = {KMeans@2856} params = {Params@2860} "Params {vectorCol="features", maxIter=100, reservedCols=["category"], k=3, predictionCol="prediction_result", predictionDetailCol="prediction_detail"}" params = {HashMap@2862} size = 6
PipelineMode 的新流水線處理流程是:從 csv 讀入/ 映射(VectorAssembler 處理),而後 KMeansModel 作轉換(下一節會具體介紹)。
fit 具體代碼是
public class Pipeline extends EstimatorBase<Pipeline, PipelineModel> { // Train the pipeline with batch data. public PipelineModel fit(BatchOperator input) { int lastEstimatorIdx = getIndexOfLastEstimator(); TransformerBase[] transformers = new TransformerBase[stages.size()]; for (int i = 0; i < stages.size(); i++) { PipelineStageBase stage = stages.get(i); if (i <= lastEstimatorIdx) { if (stage instanceof EstimatorBase) { // 這裏會把流水線上的具體 Algorithm operators 經過 linkFrom 函數串聯起來。 transformers[i] = ((EstimatorBase) stage).fit(input); } else if (stage instanceof TransformerBase) { transformers[i] = (TransformerBase) stage; } // 注意,若是是流水線最後一個階段,則不作transform處理。 if (i < lastEstimatorIdx) { // 這裏會調用到具體Transformer的transform函數,其會把流水線上的具體 Algorithm operators 經過 linkFrom 函數串聯起來。 input = transformers[i].transform(input); } } else { transformers[i] = (TransformerBase) stage; } } // 這裏生成了一個PipelineModel,transformers會做爲參數傳給他 return new PipelineModel(transformers).setMLEnvironmentId(input.getMLEnvironmentId()); } } // MapTransformer是VectorAssembler的基類。transform會生成一個MapBatchOp,而後再調用MapBatchOp.linkFrom。 public abstract class MapTransformer<T extends MapTransformer <T>> extends TransformerBase<T> implements LocalPredictable { @Override public BatchOperator transform(BatchOperator input) { return new MapBatchOp(this.mapperBuilder, this.params).linkFrom(input); } } // Trainer是KMeans的基類。 public abstract class Trainer<T extends Trainer <T, M>, M extends ModelBase<M>> @Override public M fit(BatchOperator input) { // KMeans.train 會調用 KMeansTrainBatchOp(this.getParams()).linkFrom(in); // createModel會生成一個新的model,本示例中是 com.alibaba.alink.pipeline.clustering.KMeansModel return createModel(train(input).getOutputTable()); } }
下面會逐一論述這兩個環節。
這部分做用是把csv數據轉化爲KMeans訓練所須要的數據類型。
VectorAssembler.transform會調用到MapBatchOp.linkFrom。linkFrom首先把 csv input 進行了轉換,變成DataSet
public class MapBatchOp<T extends MapBatchOp<T>> extends BatchOperator<T> { public T linkFrom(BatchOperator<?>... inputs) { BatchOperator in = checkAndGetFirst(inputs); try { Mapper mapper = (Mapper)this.mapperBuilder.apply(in.getSchema(), this.getParams()); // 這裏對csv輸入進行了map,這裏只是生成邏輯執行計劃,具體操做會在print以後才作的。 DataSet<Row> resultRows = in.getDataSet().map(new MapperAdapter(mapper)); TableSchema resultSchema = mapper.getOutputSchema(); this.setOutput(resultRows, resultSchema); return this; } catch (Exception var6) { throw new RuntimeException(var6); } } } // MapBatchOp自己 this = {MapBatchOp@3748} "UnnamedTable$1" mapperBuilder = {VectorAssembler$lambda@3744} params = {Params@3754} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}" output = {TableImpl@5862} "UnnamedTable$1" sideOutputs = null // mapper就是業務邏輯模塊 mapper = {VectorAssemblerMapper@5785} handleInvalid = {VectorAssemblerMapper$HandleType@5813} "ERROR" outputColsHelper = {OutputColsHelper@5814} colIndices = {int[4]@5815} dataFieldNames = {String[5]@5816} dataFieldTypes = {DataType[5]@5817} params = {Params@5818} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}" // 返回數值以下 resultRows = {MapOperator@5788} function = {MapperAdapter@5826} mapper = {VectorAssemblerMapper@5785} defaultName = "linkFrom(MapBatchOp.java:35)" // 調用棧以下 linkFrom:31, MapBatchOp (com.alibaba.alink.operator.batch.utils) transform:34, MapTransformer (com.alibaba.alink.pipeline) fit:122, Pipeline (com.alibaba.alink.pipeline) main:31, KMeansExample (com.alibaba.alink)
這部分就是訓練模型。
KMeans是一個Trainer,其進而實現了EstimatorBase類型,因此流水線就調用到了其fit函數
KMeans.fit就是調用了Trainer.fit。
由於KMeans是流水線最後一個階段,這時候不調用 input = transformers[i].transform(input);
因此目前仍是訓練,生成一個模型 KMeansModel。
// 實際部分代碼 Trainer.fit(BatchOperator input) { return createModel(train(input).getOutputTable()); } public final class KMeansTrainBatchOp extends BatchOperator <KMeansTrainBatchOp> implements KMeansTrainParams <KMeansTrainBatchOp> { public KMeansTrainBatchOp linkFrom(BatchOperator <?>... inputs) { DataSet <Row> finalCentroid = iterateICQ(initCentroid, data, vectorSize, maxIter, tol, distance, distanceType, vectorColName, null, null); this.setOutput(finalCentroid, new KMeansModelDataConverter().getModelSchema()); return this; } } // 變量內容 this = {KMeansTrainBatchOp@5887} params = {Params@5895} "Params {vectorCol="features", maxIter=100, reservedCols=["category"], k=3, predictionCol="prediction_result", predictionDetailCol="prediction_detail"}" output = null sideOutputs = null inputs = {BatchOperator[1]@5888} 0 = {MapBatchOp@3748} "UnnamedTable$1" mapperBuilder = {VectorAssembler$lambda@3744} params = {Params@3754} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}" output = {TableImpl@5862} "UnnamedTable$1" sideOutputs = null // 調用棧以下 linkFrom:84, KMeansTrainBatchOp (com.alibaba.alink.operator.batch.clustering) train:31, KMeans (com.alibaba.alink.pipeline.clustering) fit:34, Trainer (com.alibaba.alink.pipeline) fit:117, Pipeline (com.alibaba.alink.pipeline) main:31, KMeansExample (com.alibaba.alink)
KMeansTrainBatchOp.linkFrom是算法重點。這裏其實就是生成了算法所須要的一切前提,把各類Flink算子搭建好。後續會再提到。
fit函數生成了 KMeansModel,其transform函數在基類MapModel中實現,會在下一個transform階段完成調用。這個就是訓練出來的KMeans模型,其也是一個Transformer。
// Find the closest cluster center for every point. public class KMeansModel extends MapModel<KMeansModel> implements KMeansPredictParams <KMeansModel> { public KMeansModel(Params params) { super(KMeansModelMapper::new, params); } }
前面說到了,Pipeline的fit函數,返回一個PipelineModel。這個PipelineModel在後續會繼續調用transform,完成轉換階段。
return new PipelineModel(transformers).setMLEnvironmentId(input.getMLEnvironmentId());
轉換階段的流水線,依然要從VectorAssembler入手來讀取csv,進行map處理。而後調用 KMeansModel。
PipelineModel會繼續調用transform函數。其做用是把Transformer轉化爲BatchOperator。這時候其內部變量以下,看出來已經從最初流水線各類類型參雜 轉換爲 統一transform實例。
this = {PipelineModel@5016} transformers = {TransformerBase[2]@5017} 0 = {VectorAssembler@1198} mapperBuilder = {VectorAssembler$lambda@2855} params = {Params@2856} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}" 1 = {KMeansModel@5009} mapperBuilder = {KMeansModel$lambda@5011} modelData = {TableImpl@4984} "UnnamedTable$2" params = {Params@5012} "Params {vectorCol="features", maxIter=100, reservedCols=["category"], k=3, predictionCol="prediction_result", predictionDetailCol="prediction_detail"}" modelData = null params = {Params@5018} "Params {MLEnvironmentId=0}"
第一次transform調用到了MapBatchOp.linkFrom,就是VectorAssembler.transform調用到的,其做用和 在 fit 流水線中起到的做用同樣,下面註釋中有解釋。
第二次transform調用到了ModelMapBatchOp.linkFrom,就是KMeansModel.transform間接調用到的。下面註釋中有解釋。
這兩次 transform 的調用生成了 BatchOperator 的串聯。最終返回結果是 ModelMapBatchOp,即一個BatchOperator。轉換將由ModelMapBatchOp來轉換。
// The model fitted by Pipeline. public class PipelineModel extends ModelBase<PipelineModel> implements LocalPredictable { @Override public BatchOperator<?> transform(BatchOperator input) { for (TransformerBase transformer : this.transformers) { input = transformer.transform(input); } return input; } } // 通過變化後,獲得一個最終的轉化結果 BatchOperator,以此來轉換 // {KMeansModel$lambda@5050} 就是 KMeansModelMapper,轉換邏輯。 input = {ModelMapBatchOp@5047} "UnnamedTable$3" mapperBuilder = {KMeansModel$lambda@5050} params = {Params@5051} "Params {vectorCol="features", maxIter=100, reservedCols=["category"], k=3, predictionCol="prediction_result", predictionDetailCol="prediction_detail"}" params = {HashMap@5058} size = 6 "vectorCol" -> ""features"" "maxIter" -> "100" "reservedCols" -> "["category"]" "k" -> "3" "predictionCol" -> ""prediction_result"" "predictionDetailCol" -> ""prediction_detail"" output = {TableImpl@5052} "UnnamedTable$3" tableEnvironment = {BatchTableEnvironmentImpl@5054} operationTree = {DataSetQueryOperation@5055} operationTreeBuilder = {OperationTreeBuilder@5056} lookupResolver = {LookupCallResolver@5057} tableName = "UnnamedTable$3" sideOutputs = null // MapTransformer是VectorAssembler的基類。transform會生成一個MapBatchOp,而後再調用MapBatchOp.linkFrom。 public abstract class MapTransformer<T extends MapTransformer <T>> extends TransformerBase<T> implements LocalPredictable { @Override public BatchOperator transform(BatchOperator input) { return new MapBatchOp(this.mapperBuilder, this.params).linkFrom(input); } } // MapModel是KMeansModel的基類,transform會生成一個ModelMapBatchOp,而後再調用ModelMapBatchOp.linkFrom。 public abstract class MapModel<T extends MapModel<T>> extends ModelBase<T> implements LocalPredictable { @Override public BatchOperator transform(BatchOperator input) { return new ModelMapBatchOp(this.mapperBuilder, this.params) .linkFrom(BatchOperator.fromTable(this.getModelData()) .setMLEnvironmentId(input.getMLEnvironmentId()), input); } }
在這兩個linkFrom中,仍是分別生成了兩個MapOperator,而後拼接起來,構成了一個 BatchOperator 串。從上面代碼中能夠看出,KMeansModel對應的ModelMapBatchOp,其linkFrom會返回一個ModelMapperAdapter。ModelMapperAdapter是一個RichMapFunction類型,它會把KMeansModelMapper做爲RichMapFunction.function成員變量保存起來。而後會調用 .map(new ModelMapperAdapter(mapper, modelSource))
,map就是Flink算子,這樣轉換算法就和Flink聯繫起來了。
最後 Keans 算法的轉換工做是經過 KMeansModelMapper.map 來完成的。
咱們都知道,Flink程序中,爲了讓程序運行,須要
getExecutionEnvironment()
來獲取environment;env.execute("KMeans Example");
來真正執行。Alink其實就是一個Flink應用,只不過要比普通Flink應用複雜太多。
可是從實例代碼中,咱們沒有看到相似調用。這說明Alink封裝的很是好,可是做爲好奇的程序員,咱們須要知道究竟這些調用隱藏在哪裏。
Alink是在Pipeline執行的時候,獲取到運行環境。具體來講,由於csv文件是最初的輸入,因此當transform調用其 in.getSchema()
時候,會獲取運行環境。
public final class CsvSourceBatchOp extends BaseSourceBatchOp<CsvSourceBatchOp> implements CsvSourceParams<CsvSourceBatchOp> { @Override public Table initializeDataSource() { ExecutionEnvironment execEnv = MLEnvironmentFactory.get(getMLEnvironmentId()).getExecutionEnvironment(); } } initializeDataSource:77, CsvSourceBatchOp (com.alibaba.alink.operator.batch.source) getOutputTable:52, BaseSourceBatchOp (com.alibaba.alink.operator.batch.source) getSchema:180, AlgoOperator (com.alibaba.alink.operator) linkFrom:34, MapBatchOp (com.alibaba.alink.operator.batch.utils) transform:34, MapTransformer (com.alibaba.alink.pipeline) fit:122, Pipeline (com.alibaba.alink.pipeline) main:31, KMeansExample (com.alibaba.alink)
截止到如今,Alink已經作了不少東西,也映射到了 Flink算子上,那麼究竟什麼地方纔真正和Flink聯繫起來呢?
print 調用的是BatchOperator.print,真正從這裏開始,會一層一層調用下去,最後來到
package com.alibaba.alink.operator.batch.utils; public class PrintBatchOp extends BaseSinkBatchOp<PrintBatchOp> { @Override protected PrintBatchOp sinkFrom(BatchOperator in) { this.setOutputTable(in.getOutputTable()); if (null != this.getOutputTable()) { try { // 在這個 collect 以後,會進入到 Flink 的runtime之中。 List <Row> rows = DataSetConversionUtil.fromTable(getMLEnvironmentId(), this.getOutputTable()).collect(); batchPrintStream.println(TableUtil.formatTitle(this.getColNames())); for (Row row : rows) { batchPrintStream.println(TableUtil.formatRows(row)); } } catch (Exception ex) { throw new RuntimeException(ex); } } return this; } }
在 LocalEnvironment 這裏把Alink和Flink的運行環境聯繫起來。
public class LocalEnvironment extends ExecutionEnvironment { @Override public String getExecutionPlan() throws Exception { Plan p = createProgramPlan(null, false); // 下面會真正的和Flink聯繫起來。 if (executor != null) { return executor.getOptimizerPlanAsJSON(p); } else { PlanExecutor tempExecutor = PlanExecutor.createLocalExecutor(configuration); return tempExecutor.getOptimizerPlanAsJSON(p); } } } // 調用棧以下 execute:91, LocalEnvironment (org.apache.flink.api.java) execute:820, ExecutionEnvironment (org.apache.flink.api.java) collect:413, DataSet (org.apache.flink.api.java) sinkFrom:40, PrintBatchOp (com.alibaba.alink.operator.batch.utils) sinkFrom:18, PrintBatchOp (com.alibaba.alink.operator.batch.utils) linkFrom:31, BaseSinkBatchOp (com.alibaba.alink.operator.batch.sink) linkFrom:17, BaseSinkBatchOp (com.alibaba.alink.operator.batch.sink) link:89, BatchOperator (com.alibaba.alink.operator.batch) linkTo:239, BatchOperator (com.alibaba.alink.operator.batch) print:337, BatchOperator (com.alibaba.alink.operator.batch) main:31, KMeansExample (com.alibaba.alink)
這裏對應以下設計原則:
讓咱們想一想看,大概有哪些基礎工做須要作:
其中最重要的概念是IterativeComQueue,這是把通訊或者計算抽象成ComQueueItem,而後把ComQueueItem串聯起來造成隊列。這樣就造成了面向迭代計算場景的一套迭代通訊計算框架。
再次把目錄結構列在這裏:
./java/com/alibaba/alink/common: MLEnvironment.java linalg MLEnvironmentFactory.java mapper VectorTypes.java model comqueue utils io
裏面大體有 :
算法組件在其linkFrom函數中,會作以下操做:
下面就一一闡述。
MLEnvironment 是個重要的類。其封裝了Flink開發所必需要的運行上下文。用戶能夠經過這個類來獲取各類實際運行環境,能夠創建table,能夠運行SQL語句。
/** * The MLEnvironment stores the necessary context in Flink. * Each MLEnvironment will be associated with a unique ID. * The operations associated with the same MLEnvironment ID * will share the same Flink job context. */ public class MLEnvironment { private ExecutionEnvironment env; private StreamExecutionEnvironment streamEnv; private BatchTableEnvironment batchTableEnv; private StreamTableEnvironment streamTableEnv; }
Function是計算框架中,對於計算和通信等業務邏輯的最小模塊。具體定義以下。
後續將統稱爲 Function。
/** * Basic build block in {@link BaseComQueue}, for either communication or computation. */ public interface ComQueueItem extends Serializable {} /** * An BaseComQueue item for computation. */ public abstract class ComputeFunction implements ComQueueItem { /** * Perform the computation work. * * @param context to get input object and update output object. */ public abstract void calc(ComContext context); } /** * An BaseComQueue item for communication. */ public abstract class CommunicateFunction implements ComQueueItem { /** * Perform communication work. * * @param input output of previous queue item. * @param sessionId session id for shared objects. * @param <T> Type of dataset. * @return result dataset. */ public abstract <T> DataSet <T> communicateWith(DataSet <T> input, int sessionId); }
結合咱們代碼來看,KMeansTrainBatchOp算法組件的部分做用是:KMeans算法被分割成若干CommunicateFunction。而後被添加到計算通信隊列上。
下面代碼中,具體 Item 以下:
即算法實現的主要工做是:
static DataSet <Row> iterateICQ(...省略...) { return new IterativeComQueue() .initWithPartitionedData(TRAIN_DATA, data) .initWithBroadcastData(INIT_CENTROID, initCentroid) .initWithBroadcastData(KMEANS_STATISTICS, statistics) .add(new KMeansPreallocateCentroid()) .add(new KMeansAssignCluster(distance)) .add(new AllReduce(CENTROID_ALL_REDUCE)) .add(new KMeansUpdateCentroids(distance)) .setCompareCriterionOfNode0(new KMeansIterTermination(distance, tol)) .closeWith(new KMeansOutputModel(distanceType, vectorColName, latitudeColName, longitudeColName)) .setMaxIter(maxIter) .exec(); }
BaseComQueue 就是這個迭代框架的基礎。它維持了一個 List<ComQueueItem> queue
。用戶在生成算法模塊時候,會把各類 Function 添加到隊列中。
IterativeComQueue 是 BaseComQueue 的缺省實現,具體實現了setMaxIter,setCompareCriterionOfNode0兩個函數。
BaseComQueue兩個重要函數是:
computation.calc(context);
。能夠認爲,BaseComQueue 是個邏輯概念,讓算法工程師能夠更好的組織本身的業務語言。而經過其exec函數把算法邏輯映射到Flink算子上。這樣在某種程度上起到了與Flink解耦合的做用。
具體定義(摘取函數內部分代碼)以下:
// Base class for the com(Computation && Communicate) queue. public class BaseComQueue<Q extends BaseComQueue<Q>> implements Serializable { /** * All computation or communication functions. */ private final List<ComQueueItem> queue = new ArrayList<>(); /** * The function executed to decide whether to break the loop. */ private CompareCriterionFunction compareCriterion; /** * The function executed when closing the iteration */ private CompleteResultFunction completeResult; private void optimize() { if (queue.isEmpty()) { return; } int current = 0; for (int ahead = 1; ahead < queue.size(); ++ahead) { ComQueueItem curItem = queue.get(current); ComQueueItem aheadItem = queue.get(ahead); // 這裏進行判斷,是不是先後都是 ComputeFunction,而後合併成 ChainedComputation if (aheadItem instanceof ComputeFunction && curItem instanceof ComputeFunction) { if (curItem instanceof ChainedComputation) { queue.set(current, ((ChainedComputation) curItem).add((ComputeFunction) aheadItem)); } else { queue.set(current, new ChainedComputation() .add((ComputeFunction) curItem) .add((ComputeFunction) aheadItem) ); } } else { queue.set(++current, aheadItem); } } queue.subList(current + 1, queue.size()).clear(); } /** * Execute the BaseComQueue and get the result dataset. * * @return result dataset. */ public DataSet<Row> exec() { optimize(); IterativeDataSet<byte[]> loop = loopStartDataSet(executionEnvironment) .iterate(maxIter); DataSet<byte[]> input = loop .mapPartition(new DistributeData(cacheDataObjNames, sessionId)) .withBroadcastSet(loop, "barrier") .name("distribute data"); for (ComQueueItem com : queue) { if ((com instanceof CommunicateFunction)) { CommunicateFunction communication = ((CommunicateFunction) com); // 這裏會調用好比 AllReduce.communication, 其會返回allReduce包裝後賦值給input,當循環遇到了下一個ComputeFunction(KMeansUpdateCentroids)時候,會把input賦給它處理。好比input = {MapPartitionOperator@5248},input.function = {AllReduce$AllReduceRecv@5260},input調用mapPartition,去間接調用KMeansUpdateCentroids。 input = communication.communicateWith(input, sessionId); } else if (com instanceof ComputeFunction) { final ComputeFunction computation = (ComputeFunction) com; // 這裏纔到了 Flink,把計算隊列上的各個 ComputeFunction 映射到 Flink 的RichMapPartitionFunction。 input = input .mapPartition(new RichMapPartitionFunction<byte[], byte[]>() { @Override public void mapPartition(Iterable<byte[]> values, Collector<byte[]> out) { ComContext context = new ComContext( sessionId, getIterationRuntimeContext() ); // 在這裏會被Flink調用具體計算函數,就是以前算法工程師拆分的算法片斷。 computation.calc(context); } }) .withBroadcastSet(input, "barrier") .name(com instanceof ChainedComputation ? ((ChainedComputation) com).name() : "computation@" + computation.getClass().getSimpleName()); } else { throw new RuntimeException("Unsupported op in iterative queue."); } } return serializeModel(clearObjs(loopEnd)); } }
Mapper是底層迭代計算框架的一部分,能夠認爲是 Mapper Function。由於涉及到業務邏輯,因此提早說明。
初始化發生在 KMeansTrainBatchOp.linkFrom 中。咱們能夠看到在初始化時候,是能夠調用 Flink 各類算子(好比.rebalance().map()) ,由於這時候尚未和框架相關聯,這時候的計算是用戶自行控制,不須要加到 IterativeComQueue 之上。
若是某一個計算既要加到 IterativeComQueue 之上,還要本身玩 Flink 算子,那框架就懵圈了,不知道該如何處理。因此用戶自由操做只能發生在沒有和框架聯繫以前。
@Override public KMeansTrainBatchOp linkFrom(BatchOperator <?>... inputs) { DataSet <FastDistanceVectorData> data = statistics.f0.rebalance().map( new MapFunction <Vector, FastDistanceVectorData>() { @Override public FastDistanceVectorData map(Vector value) { return distance.prepareVectorData(Row.of(value), 0); } }); ...... }
框架也提供了初始化功能,用於將DataSet緩存到內存中,緩存的形式包括Partition和Broadcast兩種形式。前者將DataSet分片緩存至內存,後者將DataSet總體緩存至每一個worker的內存。
return new IterativeComQueue() .initWithPartitionedData(TRAIN_DATA, data) .initWithBroadcastData(INIT_CENTROID, initCentroid) .initWithBroadcastData(KMEANS_STATISTICS, statistics) ......
這是算法的具體計算模塊,算法工程師應該把算法拆分紅各個能夠並行處理的模塊,分別用 ComputeFunction 實現,這樣能夠利用 Flnk 的分佈式計算效力。
下面舉出一個例子以下,這段代碼爲每一個點(point)計算最近的聚類中心,爲每一個聚類中心的點座標的計數和求和:
/** * Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster. */ public class KMeansAssignCluster extends ComputeFunction { private FastDistance fastDistance; private transient DenseMatrix distanceMatrix; @Override public void calc(ComContext context) { Integer vectorSize = context.getObj(KMeansTrainBatchOp.VECTOR_SIZE); Integer k = context.getObj(KMeansTrainBatchOp.K); // get iterative coefficient from static memory. Tuple2<Integer, FastDistanceMatrixData> stepNumCentroids; if (context.getStepNo() % 2 == 0) { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1); } else { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2); } if (null == distanceMatrix) { distanceMatrix = new DenseMatrix(k, 1); } double[] sumMatrixData = context.getObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE); if (sumMatrixData == null) { sumMatrixData = new double[k * (vectorSize + 1)]; context.putObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE, sumMatrixData); } Iterable<FastDistanceVectorData> trainData = context.getObj(KMeansTrainBatchOp.TRAIN_DATA); if (trainData == null) { return; } Arrays.fill(sumMatrixData, 0.0); for (FastDistanceVectorData sample : trainData) { KMeansUtil.updateSumMatrix(sample, 1, stepNumCentroids.f1, vectorSize, sumMatrixData, k, fastDistance, distanceMatrix); } } }
這裏可以看出,在 ComputeFunction 中,使用的是 命令式編程模式,這樣可以最大的契合目前程序員現狀,極高提高生產力。
前面代碼中有一個關鍵處 .add(new AllReduce(CENTROID_ALL_REDUCE))
。這部分代碼起到了承前啓後的做用。以前的 KMeansPreallocateCentroid,KMeansAssignCluster
和其後的 KMeansUpdateCentroids
經過它作了一個 reduce / broadcast 通信。
具體從註解中能夠看到,AllReduce 是 MPI 相關通信原語的一個實現。這裏主要是對 double[] object 進行 reduce / broadcast。
public class AllReduce extends CommunicateFunction { public static <T> DataSet <T> allReduce( DataSet <T> input, final String bufferName, final String lengthName, final SerializableBiConsumer <double[], double[]> op, final int sessionId) { final String transferBufferName = UUID.randomUUID().toString(); return input .mapPartition(new AllReduceSend <T>(bufferName, lengthName, transferBufferName, sessionId)) .withBroadcastSet(input, "barrier") .returns( new TupleTypeInfo <>(Types.INT, Types.INT, PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO)) .name("AllReduceSend") .partitionCustom(new Partitioner <Integer>() { @Override public int partition(Integer key, int numPartitions) { return key; } }, 0) .name("AllReduceBroadcastRaw") .mapPartition(new AllReduceSum(bufferName, lengthName, sessionId, op)) .returns( new TupleTypeInfo <>(Types.INT, Types.INT, PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO)) .name("AllReduceSum") .partitionCustom(new Partitioner <Integer>() { @Override public int partition(Integer key, int numPartitions) { return key; } }, 0) .name("AllReduceBroadcastSum") .mapPartition(new AllReduceRecv <T>(bufferName, lengthName, sessionId)) .returns(input.getType()) .name("AllReduceRecv"); } }
通過調試咱們能看出來,AllReduceSum 是在本身mapPartition實現中,調用了 SUM。
/** * The all-reduce operation which does elementwise sum operation. */ public final static SerializableBiConsumer <double[], double[]> SUM = new SerializableBiConsumer <double[], double[]>() { @Override public void accept(double[] a, double[] b) { for (int i = 0; i < a.length; ++i) { a[i] += b[i]; } } }; private static class AllReduceSum extends RichMapPartitionFunction <Tuple3 <Integer, Integer, double[]>, Tuple3 <Integer, Integer, double[]>> { @Override public void mapPartition(Iterable <Tuple3 <Integer, Integer, double[]>> values, Collector <Tuple3 <Integer, Integer, double[]>> out) { // 省略各類初始化操做,好比肯定傳輸位置,傳輸目標等 ...... do { Tuple3 <Integer, Integer, double[]> val = it.next(); int localPos = val.f1 - startPos; if (sum[localPos] == null) { sum[localPos] = val.f2; agg[localPos]++; } else { // 這裏會調用 SUM op.accept(sum[localPos], val.f2); } } while (it.hasNext()); for (int i = 0; i < numOfSubTasks; ++i) { for (int j = 0; j < cnt; ++j) { out.collect(Tuple3.of(i, startPos + j, sum[j])); } } } } accept:129, AllReduce$3 (com.alibaba.alink.common.comqueue.communication) accept:126, AllReduce$3 (com.alibaba.alink.common.comqueue.communication) mapPartition:314, AllReduce$AllReduceSum (com.alibaba.alink.common.comqueue.communication) run:103, MapPartitionDriver (org.apache.flink.runtime.operators) run:504, BatchTask (org.apache.flink.runtime.operators) run:157, AbstractIterativeTask (org.apache.flink.runtime.iterative.task) run:107, IterationIntermediateTask (org.apache.flink.runtime.iterative.task) invoke:369, BatchTask (org.apache.flink.runtime.operators) doRun:705, Task (org.apache.flink.runtime.taskmanager) run:530, Task (org.apache.flink.runtime.taskmanager) run:745, Thread (java.lang)
總結到如今,咱們發現這個迭代計算框架設計的很是優秀。可是Alink並無限定你們只能使用這個框架來實現算法。若是你是Flink高手,你徹底能夠爲所欲爲的實現。
Alink例子中自己就有一個這樣的實現 ALSExample。其核心類 AlsTrainBatchOp 就是直接使用了 Flink 算子,IterativeDataSet 等。
這就比如是武松武都頭,一雙戒刀搠得倒貪官佞臣,赤手空拳也打得死吊睛白額大蟲。
public final class AlsTrainBatchOp extends BatchOperator<AlsTrainBatchOp> implements AlsTrainParams<AlsTrainBatchOp> { @Override public AlsTrainBatchOp linkFrom(BatchOperator<?>... inputs) { BatchOperator<?> in = checkAndGetFirst(inputs); ...... AlsTrain als = new AlsTrain(rank, numIter, lambda, implicitPrefs, alpha, numMiniBatches, nonNegative); DataSet<Tuple3<Byte, Long, float[]>> factors = als.fit(alsInput); DataSet<Row> output = factors.mapPartition(new RichMapPartitionFunction<Tuple3<Byte, Long, float[]>, Row>() { @Override public void mapPartition(Iterable<Tuple3<Byte, Long, float[]>> values, Collector<Row> out) { new AlsModelDataConverter(userColName, itemColName).save(values, out); } }); return this; } }
多提一點,Flink ML中也有ALS算法,是一個Scala實現。沒有Scala經驗的算法工程師看代碼會咬碎鋼牙。
通過這兩篇文章的推測和驗證,如今咱們總結以下。
Alink的部分設計原則
算法的歸算法,Flink的歸Flink,儘可能屏蔽AI算法和Flink之間的聯繫。
採用最簡單,最多見的開發語言和思惟方式。
儘可能借鑑市面上通用的機器學習設計思路和開發模式,讓開發者無縫切換。
構建一套戰術打法(middleware或者adapter),即屏蔽了Flink,又能夠利用好Flink,還可讓用戶基於此能夠快速開發算法。
針對這些原則,Alink實現了
這樣Alink便可以最大限度的享受Flink帶來的各類優點,也能順應目前形勢,讓算法工程師工做更方便。從而達到系統性能和生產力的雙重提高。
下一篇文章爭取介紹 AllReduce 的具體實現。
Spark ML簡介之Pipeline,DataFrame,Estimator,Transformer
斬獲GitHub 2000+ Star,阿里雲開源的 Alink 機器學習平臺如何跑贏雙11數據「博弈」?|AI 技術生態論