Alink 是阿里巴巴基於實時計算引擎 Flink 研發的新一代機器學習算法平臺,是業界首個同時支持批式算法、流式算法的機器學習平臺。本文將帶領你們從多重角度出發來分析推測Alink的設計思路。html
由於Alink的公開資料太少,因此如下均爲自行揣測,確定會有疏漏錯誤,但願你們指出,我會隨時更新。java
Apache Flink是由Apache軟件基金會開發的開源流處理框架,它經過實現了 Google Dataflow 流式計算模型實現了高吞吐、低延遲、高性能兼具實時流式計算框架。git
其核心是用Java和Scala編寫的分佈式流數據流引擎。Flink以數據並行和流水線方式執行任意流數據程序,Flink的流水線運行時系統能夠執行批處理和流處理程序。此外,Flink的運行時自己也支持迭代算法的執行。程序員
Alink 是阿里巴巴計算平臺事業部PAI團隊從2017年開始基於實時計算引擎 Flink 研發的新一代機器學習算法平臺,提供豐富的算法組件庫和便捷的操做框架,開發者能夠一鍵搭建覆蓋數據處理、特徵工程、模型訓練、模型預測的算法模型開發全流程。項目之因此定爲Alink,是取自相關名稱(Alibaba, Algorithm, AI, Flink, Blink)的公共部分。web
藉助Flink在批流一體化方面的優點,Alink可以爲批流任務提供一致性的操做。在2017年初,阿里團隊經過調研團隊看到了Flink在批流一體化方面的優點及底層引擎的優秀性能,因而基於Flink從新設計研發了機器學習算法庫,即Alink平臺。該平臺於2018年在阿里集團內部上線,隨後不斷改進完善,在阿里內部錯綜複雜的業務場景中鍛鍊成長。算法
由於目前關於Alink設計的公開資料比較少,咱們手頭只有其源碼,看起來只能從代碼反推。可是世界上的事物都不是孤立的,咱們還有其餘角度來幫助咱們判斷推理。因此下面就讓咱們來進行推斷。sql
FlinkML 是 Flink 社區現存的一套機器學習算法庫,這一套算法庫已經存在好久並且更新比較緩慢。apache
Alink團隊起初面臨的抉擇是:是否要基於 Flink ML 進行開發,或者對 Flink ML進行更新。編程
通過研究,Alink團隊發現,Flink ML 其僅支持10餘種算法,支持的數據結構也不夠通用,在算法性能方面作的優化也比較少,並且其代碼也好久沒有更新。因此,他們放棄了基於舊版FlinkML進行改進、升級的想法,決定基於Flink從新設計研發機器學習算法庫。api
因此咱們要分析的就是如何從無到有設計出一個新的機器學習平臺/框架。
由於Alink是市場的新進入者,因此Alink的最大問題就是如何替代市場上的現有產品。
邁克爾·波特用 「替代品威脅」 來解釋用戶的整個替代邏輯,當新產品能緊緊掌握住這一點,就有可能在市場上得到很是好的表現,戰勝競爭對手。
假如如今想從0到1構建一個機器學習庫或者機器學習框架,那麼咱們須要從商業意識和商業邏輯出發,來思考這個產品的價值所在,就能對這個產品作個比較精確的定義,從而可以肯定產品路線。
產品須要解決應用環境下的綜合性問題,產品的價值體現,能夠分拆了三個維度。
下面就讓咱們逐一分析。
這個就是換用成本的問題,一旦換用成本太高,這個產品就很難成功。
Alink大略有兩種用戶:算法工程師,應用工程師。
Alink算法工程師特指實現機器學習算法的工程師。Alink應用工程師就是應用Alink AI算法作業務的工程師。這兩類用戶的換用成本都是Alink須要考慮的。
新產品對於用戶來講,有兩個大的問題:產品底層邏輯和開發工具。一個優秀的新產品絕對不能在這兩個問題上增長用戶的換用成本。
Flink這個平臺博大精深,不管是熟悉其API仍是深刻理解系統架構都不是容易的事情。若是Alink用戶還須要熟悉Flink,那勢必形成ALink用戶的換用成本,因此這點應該儘可能避免。
對於算法工程師,他們應該主要把思路集中在算法上,而儘可能不用關心Flink內部的細節,若是必定要熟悉Flink,那麼越少越好;
對於應用工程師,他們主要的需求就是API接口越簡單越好,他們最理想的狀態應該是:徹底感受不到Flink的存在。
綜上所述,Alink的原則之一應該是 :算法的歸算法,Flink的歸Flink,儘可能屏蔽AI算法和Flink之間的聯繫。
開發工具就是究竟用什麼語言開發。Flink的開發語言主要是JAVA,SCALA,Python。而機器學習世界中主要仍是Python。
首先要排除SCALA。由於Scala 是一門很難掌握的語言,它的規則是基於數學類型理論的,學習曲線至關陡峭。一個可以領會規則和語言特性的優秀程序員,使用 Scala 會比使用 Java 更高效,可是一個普通程序員的生產力,從功能實現上來看,效率則會相反。
讓咱們看看基於Flink的原生KMeans SCALA代碼,不少人看了以後恐怕都會懵圈。
val finalCentroids = centroids.iterate(params.getInt("iterations", 10)) { currentCentroids => val newCentroids = points .map(new SelectNearestCenter).withBroadcastSet(currentCentroids, "centroids") .map { x => (x._1, x._2, 1L) }.withForwardedFields("_1; _2") .groupBy(0) .reduce { (p1, p2) => (p1._1, p1._2.add(p2._2), p1._3 + p2._3) }.withForwardedFields("_1") .map { x => new Centroid(x._1, x._2.div(x._3)) }.withForwardedFields("_1->id") newCentroids }
其次是選擇JAVA仍是Python開發具體算法。Alink內部確定進行了不少權宜和抉擇。由於這個不僅僅是哪一個語言自己更合適,也涉及到Alink團隊內部有哪些資源,好比是JAVA工程師更多仍是Python更多。最終Alink選擇了JAVA來開發算法。
最後是API。這個就沒有什麼疑問了,Alink提供了Python和JAVA兩種語言的API,直接可參見GitHub的介紹。
在 PyAlink 中,算法組件提供的接口基本與 Java API 一致,即經過默認構造方法建立一個算法組件,而後經過
setXXX
設置參數,經過link/linkTo/linkFrom
與其餘組件相連。 這裏利用 Jupyter 的自動補全機制能夠提供書寫便利。
另外,若是採用JAVA或者Python,確定有大量現有代碼能夠修改複用。若是採用SCALA,就難以複用以前的積累。
綜上所述,Alink的原則之一應該是 :採用最簡單,最多見的開發語言和設計思惟。
Alink的競爭對手大略能夠認爲是Spark ML, Flink ML, Scikit-learn。
他們是市場上的現有力量,擁有大量的用戶。用戶已經熟悉了這些競爭對手的設計思路,開發策略,基本概念和API。除非Alink可以提供一種神奇簡便的API,不然Alink應該在設計上最大程度借鑑這些競爭對手。
好比機器學習開發中有以下常見概念:Transformer,Estimator,PipeLine,Parameter。這些概念 Alink 應該儘可能提供。
綜上所述,**Alink的原則之一應該是 :儘可能借鑑市面上通用的設計思路和開發模式,讓開發者無縫切換 **。
從 Alink的目錄結構中 ,咱們能夠看出,Alink確實提供了這些常見概念。
好比 Pipeline,Trainer,Model,Estimator。咱們會在後續文章中再詳細介紹這些概念。
./java/com/alibaba/alink: common operator params pipeline ./java/com/alibaba/alink/params: associationrule evaluation nlp regression statistics classification feature onlinelearning shared tuning clustering io outlier similarity udf dataproc mapper recommendation sql validators ./java/com/alibaba/alink/pipeline: EstimatorBase.java ModelBase.java Trainer.java feature LocalPredictable.java ModelExporterUtils.java TransformerBase.java nlp LocalPredictor.java Pipeline.java classification recommendation MapModel.java PipelineModel.java clustering regression MapTransformer.java PipelineStageBase.java dataproc tuning
這是成本結構和收益的規模性問題。從而決定了Alink在開發時候,必須儘可能提升開發工程師的效率,提升生產力。前面提到的棄用SCALA,部分也出於這個考慮。
挑戰集中在:
舉個例子:
確定有個別開發者,其對Flink特別熟悉,他們能夠運用各類Flink API和函數編程思惟開發出高效率的算法。這種開發者,咱們能夠稱爲是武松武都頭。他們相似特種兵,能上戰場衝鋒陷陣,也能吊打白額大蟲。
可是絕大多數開發者對Flink不熟悉,他們更熟悉AI算法和命令式編程思路。這種開發者咱們能夠認爲他們屬於八十萬禁軍或者是玄甲軍,北府兵,魏武卒,背嵬軍。這種纔是實際開發中的主力部隊和常規套路。
咱們須要針對八十萬禁軍,讓林沖林教頭設計出一套適合正規做戰的槍棒打法。或者針對背嵬軍,讓岳飛嶽元帥設計一套馬軍衝陣機制。
所以,**Alink的原則之一應該是 :構建一套戰術打法(middleware或者adapter),即屏蔽了Flink,又能夠利用好Flink,還可讓用戶基於此能夠快速開發算法 **。
咱們想一想看大概有哪些基礎工做須要作:
讓咱們看看Alink作了哪些努力,這點從其目錄結構能夠看出有queue,operator,mapper等等構建架構所必須的數據結構:
./java/com/alibaba/alink/common: MLEnvironment.java linalg MLEnvironmentFactory.java mapper VectorTypes.java model comqueue utils io ./java/com/alibaba/alink/operator: AlgoOperator.java common batch stream
其中最重要的概念是BaseComQueue,這是把通訊或者計算抽象成ComQueueItem,而後把ComQueueItem串聯起來造成隊列。這樣就造成了面向迭代計算場景的一套迭代通訊計算框架。其餘數據結構大可能是圍繞着BaseComQueue來具體運做。
/** * Base class for the com(Computation && Communicate) queue. */ public class BaseComQueue<Q extends BaseComQueue<Q>> implements Serializable { /** * All computation or communication functions. */ private final List<ComQueueItem> queue = new ArrayList<>(); /** * sessionId for shared objects within this BaseComQueue. */ private final int sessionId = SessionSharedObjs.getNewSessionId(); /** * The function executed to decide whether to break the loop. */ private CompareCriterionFunction compareCriterion; /** * The function executed when closing the iteration */ private CompleteResultFunction completeResult; /** * Max iteration count. */ private int maxIter = Integer.MAX_VALUE; private transient ExecutionEnvironment executionEnvironment; }
MLEnvironment 是另一個重要的類。其封裝了Flink開發所必需要的運行上下文。用戶能夠經過這個類來獲取各類實際運行環境,能夠創建table,能夠運行SQL語句。
/** * The MLEnvironment stores the necessary context in Flink. * Each MLEnvironment will be associated with a unique ID. * The operations associated with the same MLEnvironment ID * will share the same Flink job context. */ public class MLEnvironment { private ExecutionEnvironment env; private StreamExecutionEnvironment streamEnv; private BatchTableEnvironment batchTableEnv; private StreamTableEnvironment streamTableEnv; }
下面咱們能夠總結下Alink部分設計原則
算法的歸算法,Flink的歸Flink,儘可能屏蔽AI算法和Flink之間的聯繫。
採用最簡單,最多見的開發語言。
儘可能借鑑市面上通用的設計思路和開發模式,讓開發者無縫切換。
構建一套戰術打法(middleware或者adapter),即屏蔽了Flink,又能夠利用好Flink,還可讓用戶基於此能夠快速開發算法。
Flink和Alink源碼中,都提供了KMeans算法例子,因此咱們就從KMeans入手看看Flink原生算法和Alink算法實現的區別。爲了統一標準,咱們都選用JAVA版本的算法實現。
KMeans算法的思想比較簡單,假設咱們要把數據分紅K個類,大概能夠分爲如下幾個步驟:
K-Means 是迭代的聚類算法,初始設置K個聚類中心
- 在每一次迭代過程當中,算法計算每一個數據點到每一個聚類中心的歐式距離
- 每一個點被分配到它最近的聚類中心
- 隨後每一個聚類中心被移動到全部被分配的點
- 移動的聚類中心被分配到下一次迭代
- 算法在固定次數的迭代以後終止(在本實現中,參數設置)
- 或者聚類中心在迭代中不在移動
- 本項目是工做在二維平面的數據點上
- 它計算分配給集羣中心的數據點
- 每一個數據點都使用其所屬的最終集羣(中心)的id進行註釋。
下面給出部分代碼,具體算法解釋能夠在註釋中看到。
這裏主要採用了Flink的批量迭代。其調用 DataSet 的 iterate(int)
方法建立一個 BulkIteration,迭代以此爲起點,返回一個 IterativeDataSet,能夠用常規運算符進行轉換。迭代調用的參數 int 指定最大迭代次數。
IterativeDataSet 調用 closeWith(DataSet)
方法來指定哪一個轉換應該反饋到下一個迭代,能夠選擇使用 closeWith(DataSet,DataSet)
指定終止條件。若是該 DataSet 爲空,則它將評估第二個 DataSet 並終止迭代。若是沒有指定終止條件,則迭代在給定的最大次數迭代後終止。
public class KMeans { public static void main(String[] args) throws Exception { // Checking input parameters final ParameterTool params = ParameterTool.fromArgs(args); // set up execution environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); // make parameters available in the web interface // get input data: // read the points and centroids from the provided paths or fall back to default data DataSet<Point> points = getPointDataSet(params, env); DataSet<Centroid> centroids = getCentroidDataSet(params, env); // set number of bulk iterations for KMeans algorithm IterativeDataSet<Centroid> loop = centroids.iterate(params.getInt("iterations", 10)); DataSet<Centroid> newCentroids = points // compute closest centroid for each point .map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids") // count and sum point coordinates for each centroid .map(new CountAppender()) .groupBy(0).reduce(new CentroidAccumulator()) // compute new centroids from point counts and coordinate sums .map(new CentroidAverager()); // feed new centroids back into next iteration DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids); DataSet<Tuple2<Integer, Point>> clusteredPoints = points // assign points to final clusters .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids"); // emit result if (params.has("output")) { clusteredPoints.writeAsCsv(params.get("output"), "\n", " "); // since file sinks are lazy, we trigger the execution explicitly env.execute("KMeans Example"); } else { System.out.println("Printing result to stdout. Use --output to specify output path."); clusteredPoints.print(); } }
Alink中,Kmeans是分佈在若干文件中,這裏咱們提取部分代碼來對照。
這裏是算法主程序,這裏卻是看起來十分清爽乾淨,但其實是沒有這麼簡單,Alink在其背後作了大量的基礎工做。
能夠看出,算法實現的主要工做是:
public final class KMeansTrainBatchOp extends BatchOperator <KMeansTrainBatchOp> implements KMeansTrainParams <KMeansTrainBatchOp> { static DataSet <Row> iterateICQ(...省略...) { return new IterativeComQueue() .initWithPartitionedData(TRAIN_DATA, data) .initWithBroadcastData(INIT_CENTROID, initCentroid) .initWithBroadcastData(KMEANS_STATISTICS, statistics) .add(new KMeansPreallocateCentroid()) .add(new KMeansAssignCluster(distance)) .add(new AllReduce(CENTROID_ALL_REDUCE)) .add(new KMeansUpdateCentroids(distance)) .setCompareCriterionOfNode0(new KMeansIterTermination(distance, tol)) .closeWith(new KMeansOutputModel(distanceType, vectorColName, latitudeColName, longitudeColName)) .setMaxIter(maxIter) .exec(); } }
預先分配聚類中心
public class KMeansPreallocateCentroid extends ComputeFunction { public void calc(ComContext context) { if (context.getStepNo() == 1) { List<FastDistanceMatrixData> initCentroids = (List)context.getObj("initCentroid"); List<Integer> list = (List)context.getObj("statistics"); Integer vectorSize = (Integer)list.get(0); context.putObj("vectorSize", vectorSize); FastDistanceMatrixData centroid = (FastDistanceMatrixData)initCentroids.get(0); Preconditions.checkArgument(centroid.getVectors().numRows() == vectorSize, "Init centroid error, size not equal!"); context.putObj("centroid1", Tuple2.of(context.getStepNo() - 1, centroid)); context.putObj("centroid2", Tuple2.of(context.getStepNo() - 1, new FastDistanceMatrixData(centroid))); context.putObj("k", centroid.getVectors().numCols()); } } }
爲每一個點(point)計算最近的聚類中心,爲每一個聚類中心的點座標的計數和求和
/** * Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster. */ public class KMeansAssignCluster extends ComputeFunction { @Override public void calc(ComContext context) { Integer vectorSize = context.getObj(KMeansTrainBatchOp.VECTOR_SIZE); Integer k = context.getObj(KMeansTrainBatchOp.K); // get iterative coefficient from static memory. Tuple2<Integer, FastDistanceMatrixData> stepNumCentroids; if (context.getStepNo() % 2 == 0) { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1); } else { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2); } if (null == distanceMatrix) { distanceMatrix = new DenseMatrix(k, 1); } double[] sumMatrixData = context.getObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE); if (sumMatrixData == null) { sumMatrixData = new double[k * (vectorSize + 1)]; context.putObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE, sumMatrixData); } Iterable<FastDistanceVectorData> trainData = context.getObj(KMeansTrainBatchOp.TRAIN_DATA); if (trainData == null) { return; } Arrays.fill(sumMatrixData, 0.0); for (FastDistanceVectorData sample : trainData) { KMeansUtil.updateSumMatrix(sample, 1, stepNumCentroids.f1, vectorSize, sumMatrixData, k, fastDistance, distanceMatrix); } } }
基於點計數和座標,計算新的聚類中心。
/** * Update the centroids based on the sum of points and point number belonging to the same cluster. */ public class KMeansUpdateCentroids extends ComputeFunction { @Override public void calc(ComContext context) { Integer vectorSize = context.getObj(KMeansTrainBatchOp.VECTOR_SIZE); Integer k = context.getObj(KMeansTrainBatchOp.K); double[] sumMatrixData = context.getObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE); Tuple2<Integer, FastDistanceMatrixData> stepNumCentroids; if (context.getStepNo() % 2 == 0) { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2); } else { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1); } stepNumCentroids.f0 = context.getStepNo(); context.putObj(KMeansTrainBatchOp.K, updateCentroids(stepNumCentroids.f1, k, vectorSize, sumMatrixData, distance)); } }
經過下面的分析能夠看出,從實際業務代碼量角度說,其實差異不大。
因此Alink代碼只能說比Flink原生實現略大。
這裏指的是與Flink的耦合度。能看出來Flink的KMeans算法須要大量的Flink類。而Alink被最大限度屏蔽了。
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.operators.IterativeDataSet; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.ml.api.misc.param.Params; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions;
這是一個主要的區別。
在下一期文章中,將從源碼角度分析驗證本文的設計思路。
Spark ML簡介之Pipeline,DataFrame,Estimator,Transformer
斬獲GitHub 2000+ Star,阿里雲開源的 Alink 機器學習平臺如何跑贏雙11數據「博弈」?|AI 技術生態論