Alink漫談(一) : 從KMeans算法實現不一樣看Alink設計思想

Alink漫談(一) : 從KMeans算法實現不一樣看Alink設計思想

0x00 摘要

Alink 是阿里巴巴基於實時計算引擎 Flink 研發的新一代機器學習算法平臺,是業界首個同時支持批式算法、流式算法的機器學習平臺。本文將帶領你們從多重角度出發來分析推測Alink的設計思路。html

由於Alink的公開資料太少,因此如下均爲自行揣測,確定會有疏漏錯誤,但願你們指出,我會隨時更新。java

Apache Flink是由Apache軟件基金會開發的開源流處理框架,它經過實現了 Google Dataflow 流式計算模型實現了高吞吐、低延遲、高性能兼具實時流式計算框架。git

其核心是用Java和Scala編寫的分佈式流數據流引擎。Flink以數據並行和流水線方式執行任意流數據程序,Flink的流水線運行時系統能夠執行批處理和流處理程序。此外,Flink的運行時自己也支持迭代算法的執行。程序員

Alink 是阿里巴巴計算平臺事業部PAI團隊從2017年開始基於實時計算引擎 Flink 研發的新一代機器學習算法平臺,提供豐富的算法組件庫和便捷的操做框架,開發者能夠一鍵搭建覆蓋數據處理、特徵工程、模型訓練、模型預測的算法模型開發全流程。項目之因此定爲Alink,是取自相關名稱(Alibaba, Algorithm, AI, Flink, Blink)的公共部分。web

藉助Flink在批流一體化方面的優點,Alink可以爲批流任務提供一致性的操做。在2017年初,阿里團隊經過調研團隊看到了Flink在批流一體化方面的優點及底層引擎的優秀性能,因而基於Flink從新設計研發了機器學習算法庫,即Alink平臺。該平臺於2018年在阿里集團內部上線,隨後不斷改進完善,在阿里內部錯綜複雜的業務場景中鍛鍊成長。算法

0x03 Alink設計思路

由於目前關於Alink設計的公開資料比較少,咱們手頭只有其源碼,看起來只能從代碼反推。可是世界上的事物都不是孤立的,咱們還有其餘角度來幫助咱們判斷推理。因此下面就讓咱們來進行推斷。sql

1. 白手起家

FlinkML 是 Flink 社區現存的一套機器學習算法庫,這一套算法庫已經存在好久並且更新比較緩慢。apache

Alink團隊起初面臨的抉擇是:是否要基於 Flink ML 進行開發,或者對 Flink ML進行更新。編程

通過研究,Alink團隊發現,Flink ML 其僅支持10餘種算法,支持的數據結構也不夠通用,在算法性能方面作的優化也比較少,並且其代碼也好久沒有更新。因此,他們放棄了基於舊版FlinkML進行改進、升級的想法,決定基於Flink從新設計研發機器學習算法庫。api

因此咱們要分析的就是如何從無到有設計出一個新的機器學習平臺/框架

2. 替代品如何形成威脅

由於Alink是市場的新進入者,因此Alink的最大問題就是如何替代市場上的現有產品

邁克爾·波特用 「替代品威脅」 來解釋用戶的整個替代邏輯,當新產品能緊緊掌握住這一點,就有可能在市場上得到很是好的表現,戰勝競爭對手。

假如如今想從0到1構建一個機器學習庫或者機器學習框架,那麼咱們須要從商業意識和商業邏輯出發,來思考這個產品的價值所在,就能對這個產品作個比較精確的定義,從而可以肯定產品路線。

產品須要解決應用環境下的綜合性問題,產品的價值體現,能夠分拆了三個維度。

  • 用戶的角度:價值體如今用戶使用,獲取產品的意願。這個就是換用成本的問題,一旦換用成本太高,這個產品就很難成功。
  • 競爭對手的角度: 產品的競爭力,最終都體現爲用戶爲了獲取該產品願意支付的最高成本上限,當一個替代品進入市場,必須有能給用戶足夠的洞理驅使用戶換用替代品。
  • 企業的角度:站在企業的角度,實際就是成本結構和收益的規模性問題 。

下面就讓咱們逐一分析。

3. 用戶角度看設計

這個就是換用成本的問題,一旦換用成本太高,這個產品就很難成功。

Alink大略有兩種用戶:算法工程師,應用工程師。

Alink算法工程師特指實現機器學習算法的工程師。Alink應用工程師就是應用Alink AI算法作業務的工程師。這兩類用戶的換用成本都是Alink須要考慮的。

新產品對於用戶來講,有兩個大的問題:產品底層邏輯和開發工具。一個優秀的新產品絕對不能在這兩個問題上增長用戶的換用成本。

Flink這個平臺博大精深,不管是熟悉其API仍是深刻理解系統架構都不是容易的事情。若是Alink用戶還須要熟悉Flink,那勢必形成ALink用戶的換用成本,因此這點應該儘可能避免。

  • 對於算法工程師,他們應該主要把思路集中在算法上,而儘可能不用關心Flink內部的細節,若是必定要熟悉Flink,那麼越少越好;

  • 對於應用工程師,他們主要的需求就是API接口越簡單越好,他們最理想的狀態應該是:徹底感受不到Flink的存在。

綜上所述,Alink的原則之一應該是 :算法的歸算法,Flink的歸Flink,儘可能屏蔽AI算法和Flink之間的聯繫

開發工具

開發工具就是究竟用什麼語言開發。Flink的開發語言主要是JAVA,SCALA,Python。而機器學習世界中主要仍是Python。

  • 首先要排除SCALA。由於Scala 是一門很難掌握的語言,它的規則是基於數學類型理論的,學習曲線至關陡峭。一個可以領會規則和語言特性的優秀程序員,使用 Scala 會比使用 Java 更高效,可是一個普通程序員的生產力,從功能實現上來看,效率則會相反。

    讓咱們看看基於Flink的原生KMeans SCALA代碼,不少人看了以後恐怕都會懵圈。

    val finalCentroids = centroids.iterate(params.getInt("iterations", 10)) { currentCentroids => val newCentroids = points
            .map(new SelectNearestCenter).withBroadcastSet(currentCentroids, "centroids")
            .map { x => (x._1, x._2, 1L) }.withForwardedFields("_1; _2")
            .groupBy(0)
            .reduce { (p1, p2) => (p1._1, p1._2.add(p2._2), p1._3 + p2._3) }.withForwardedFields("_1")
            .map { x => new Centroid(x._1, x._2.div(x._3)) }.withForwardedFields("_1->id")
          newCentroids
        }
  • 其次是選擇JAVA仍是Python開發具體算法。Alink內部確定進行了不少權宜和抉擇。由於這個不僅僅是哪一個語言自己更合適,也涉及到Alink團隊內部有哪些資源,好比是JAVA工程師更多仍是Python更多。最終Alink選擇了JAVA來開發算法。

  • 最後是API。這個就沒有什麼疑問了,Alink提供了Python和JAVA兩種語言的API,直接可參見GitHub的介紹。

在 PyAlink 中,算法組件提供的接口基本與 Java API 一致,即經過默認構造方法建立一個算法組件,而後經過 setXXX 設置參數,經過 link/linkTo/linkFrom 與其餘組件相連。 這裏利用 Jupyter 的自動補全機制能夠提供書寫便利。

另外,若是採用JAVA或者Python,確定有大量現有代碼能夠修改複用。若是採用SCALA,就難以複用以前的積累。

綜上所述,Alink的原則之一應該是 :採用最簡單,最多見的開發語言和設計思惟

4. 競爭對手角度看設計

Alink的競爭對手大略能夠認爲是Spark ML, Flink ML, Scikit-learn。

他們是市場上的現有力量,擁有大量的用戶。用戶已經熟悉了這些競爭對手的設計思路,開發策略,基本概念和API。除非Alink可以提供一種神奇簡便的API,不然Alink應該在設計上最大程度借鑑這些競爭對手。

好比機器學習開發中有以下常見概念:Transformer,Estimator,PipeLine,Parameter。這些概念 Alink 應該儘可能提供。

綜上所述,**Alink的原則之一應該是 :儘可能借鑑市面上通用的設計思路和開發模式,讓開發者無縫切換 **

從 Alink的目錄結構中 ,咱們能夠看出,Alink確實提供了這些常見概念。

好比 Pipeline,Trainer,Model,Estimator。咱們會在後續文章中再詳細介紹這些概念。

./java/com/alibaba/alink:
common		operator	params		pipeline
  
./java/com/alibaba/alink/params:
associationrule	evaluation	nlp		regression	statistics
classification	feature		onlinelearning	shared		tuning
clustering	io		outlier		similarity	udf
dataproc	mapper		recommendation	sql		validators

./java/com/alibaba/alink/pipeline:
EstimatorBase.java	ModelBase.java		Trainer.java		feature
LocalPredictable.java	ModelExporterUtils.java	TransformerBase.java	nlp
LocalPredictor.java	Pipeline.java		classification		recommendation
MapModel.java		PipelineModel.java	clustering		regression
MapTransformer.java	PipelineStageBase.java	dataproc		tuning

5. 企業角度看設計

這是成本結構和收益的規模性問題。從而決定了Alink在開發時候,必須儘可能提升開發工程師的效率,提升生產力。前面提到的棄用SCALA,部分也出於這個考慮。

挑戰集中在:

  • 如何在對開發者最大程度屏蔽Flink的狀況下,依然利用好Flink的各類能力。
  • 如何構建一套相應打法和戰術體系,即middleware或者adapter,讓用戶基於此能夠快速開發算法

舉個例子:

  • 確定有個別開發者,其對Flink特別熟悉,他們能夠運用各類Flink API和函數編程思惟開發出高效率的算法。這種開發者,咱們能夠稱爲是武松武都頭。他們相似特種兵,能上戰場衝鋒陷陣,也能吊打白額大蟲。

  • 可是絕大多數開發者對Flink不熟悉,他們更熟悉AI算法和命令式編程思路。這種開發者咱們能夠認爲他們屬於八十萬禁軍或者是玄甲軍,北府兵,魏武卒,背嵬軍。這種纔是實際開發中的主力部隊和常規套路。

咱們須要針對八十萬禁軍,讓林沖林教頭設計出一套適合正規做戰的槍棒打法。或者針對背嵬軍,讓岳飛嶽元帥設計一套馬軍衝陣機制。

所以,**Alink的原則之一應該是 :構建一套戰術打法(middleware或者adapter),即屏蔽了Flink,又能夠利用好Flink,還可讓用戶基於此能夠快速開發算法 **

咱們想一想看大概有哪些基礎工做須要作:

  • 如何初始化
  • 若是通訊
  • 如何分割代碼,如何廣播代碼
  • 若是分割數據,如何廣播數據
  • 如何迭代算法
  • ......

讓咱們看看Alink作了哪些努力,這點從其目錄結構能夠看出有queue,operator,mapper等等構建架構所必須的數據結構:

./java/com/alibaba/alink/common:
MLEnvironment.java		linalg MLEnvironmentFactory.java	mapper
VectorTypes.java		model comqueue			utils io

./java/com/alibaba/alink/operator:
AlgoOperator.java 	common  batch	 stream

其中最重要的概念是BaseComQueue,這是把通訊或者計算抽象成ComQueueItem,而後把ComQueueItem串聯起來造成隊列。這樣就造成了面向迭代計算場景的一套迭代通訊計算框架。其餘數據結構大可能是圍繞着BaseComQueue來具體運做。

/**
 * Base class for the com(Computation && Communicate) queue.
 */
public class BaseComQueue<Q extends BaseComQueue<Q>> implements Serializable {
	/**
	 * All computation or communication functions.
	 */
	private final List<ComQueueItem> queue = new ArrayList<>();
	/**
	 * sessionId for shared objects within this BaseComQueue.
	 */
	private final int sessionId = SessionSharedObjs.getNewSessionId();
	/**
	 * The function executed to decide whether to break the loop.
	 */
	private CompareCriterionFunction compareCriterion;
	/**
	 * The function executed when closing the iteration
	 */
	private CompleteResultFunction completeResult;
	/**
	 * Max iteration count.
	 */
	private int maxIter = Integer.MAX_VALUE;

	private transient ExecutionEnvironment executionEnvironment;
}

MLEnvironment 是另一個重要的類。其封裝了Flink開發所必需要的運行上下文。用戶能夠經過這個類來獲取各類實際運行環境,能夠創建table,能夠運行SQL語句。

/**
 * The MLEnvironment stores the necessary context in Flink.
 * Each MLEnvironment will be associated with a unique ID.
 * The operations associated with the same MLEnvironment ID
 * will share the same Flink job context.
 */
public class MLEnvironment {
    private ExecutionEnvironment env;
    private StreamExecutionEnvironment streamEnv;
    private BatchTableEnvironment batchTableEnv;
    private StreamTableEnvironment streamTableEnv;
}

6. 設計原則總結

下面咱們能夠總結下Alink部分設計原則

  • 算法的歸算法,Flink的歸Flink,儘可能屏蔽AI算法和Flink之間的聯繫。

  • 採用最簡單,最多見的開發語言。

  • 儘可能借鑑市面上通用的設計思路和開發模式,讓開發者無縫切換。

  • 構建一套戰術打法(middleware或者adapter),即屏蔽了Flink,又能夠利用好Flink,還可讓用戶基於此能夠快速開發算法。

0x04 KMeans算法實現看設計

Flink和Alink源碼中,都提供了KMeans算法例子,因此咱們就從KMeans入手看看Flink原生算法和Alink算法實現的區別。爲了統一標準,咱們都選用JAVA版本的算法實現。

1. KMeans算法

KMeans算法的思想比較簡單,假設咱們要把數據分紅K個類,大概能夠分爲如下幾個步驟:

  • 隨機選取k個點,做爲聚類中心;
  • 計算每一個點分別到k個聚類中心的聚類,而後將該點分到最近的聚類中心,這樣就行成了k個簇;
  • 再從新計算每一個簇的質心(均值);
  • 重複以上2~4步,直到質心的位置再也不發生變化或者達到設定的迭代次數。

K-Means 是迭代的聚類算法,初始設置K個聚類中心

  1. 在每一次迭代過程當中,算法計算每一個數據點到每一個聚類中心的歐式距離
  2. 每一個點被分配到它最近的聚類中心
  3. 隨後每一個聚類中心被移動到全部被分配的點
  4. 移動的聚類中心被分配到下一次迭代
  5. 算法在固定次數的迭代以後終止(在本實現中,參數設置)
  6. 或者聚類中心在迭代中不在移動
  7. 本項目是工做在二維平面的數據點上
  8. 它計算分配給集羣中心的數據點
  9. 每一個數據點都使用其所屬的最終集羣(中心)的id進行註釋。

下面給出部分代碼,具體算法解釋能夠在註釋中看到。

這裏主要採用了Flink的批量迭代。其調用 DataSet 的 iterate(int) 方法建立一個 BulkIteration,迭代以此爲起點,返回一個 IterativeDataSet,能夠用常規運算符進行轉換。迭代調用的參數 int 指定最大迭代次數。

IterativeDataSet 調用 closeWith(DataSet) 方法來指定哪一個轉換應該反饋到下一個迭代,能夠選擇使用 closeWith(DataSet,DataSet) 指定終止條件。若是該 DataSet 爲空,則它將評估第二個 DataSet 並終止迭代。若是沒有指定終止條件,則迭代在給定的最大次數迭代後終止。

public class KMeans {

	public static void main(String[] args) throws Exception {

		// Checking input parameters
		final ParameterTool params = ParameterTool.fromArgs(args);

		// set up execution environment
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.getConfig().setGlobalJobParameters(params); // make parameters available in the web interface

		// get input data:
		// read the points and centroids from the provided paths or fall back to default data
		DataSet<Point> points = getPointDataSet(params, env);
		DataSet<Centroid> centroids = getCentroidDataSet(params, env);

		// set number of bulk iterations for KMeans algorithm
		IterativeDataSet<Centroid> loop = centroids.iterate(params.getInt("iterations", 10));

		DataSet<Centroid> newCentroids = points
			// compute closest centroid for each point
			.map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
			// count and sum point coordinates for each centroid
			.map(new CountAppender())
			.groupBy(0).reduce(new CentroidAccumulator())
			// compute new centroids from point counts and coordinate sums
			.map(new CentroidAverager());

		// feed new centroids back into next iteration
		DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);

		DataSet<Tuple2<Integer, Point>> clusteredPoints = points
			// assign points to final clusters
			.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");

		// emit result
		if (params.has("output")) {
			clusteredPoints.writeAsCsv(params.get("output"), "\n", " ");
			// since file sinks are lazy, we trigger the execution explicitly
			env.execute("KMeans Example");
		} else {
			System.out.println("Printing result to stdout. Use --output to specify output path.");
			clusteredPoints.print();
		}
	}

Alink中,Kmeans是分佈在若干文件中,這裏咱們提取部分代碼來對照。

KMeansTrainBatchOp

這裏是算法主程序,這裏卻是看起來十分清爽乾淨,但其實是沒有這麼簡單,Alink在其背後作了大量的基礎工做。

能夠看出,算法實現的主要工做是:

  • 構建了一個IterativeComQueue(BaseComQueue的缺省實現)。
  • 初始化數據,這裏有兩種辦法:initWithPartitionedData將DataSet分片緩存至內存。initWithBroadcastData將DataSet總體緩存至每一個worker的內存。
  • 將計算分割爲若干ComputeFunction,好比KMeansPreallocateCentroid / KMeansAssignCluster / KMeansUpdateCentroids ...,串聯在IterativeComQueue。
  • 運用AllReduce通訊模型完成了數據同步。
public final class KMeansTrainBatchOp extends BatchOperator <KMeansTrainBatchOp>
	implements KMeansTrainParams <KMeansTrainBatchOp> {

	static DataSet <Row> iterateICQ(...省略...) {

		return new IterativeComQueue()
			.initWithPartitionedData(TRAIN_DATA, data)
			.initWithBroadcastData(INIT_CENTROID, initCentroid)
			.initWithBroadcastData(KMEANS_STATISTICS, statistics)
			.add(new KMeansPreallocateCentroid())
			.add(new KMeansAssignCluster(distance))
			.add(new AllReduce(CENTROID_ALL_REDUCE))
			.add(new KMeansUpdateCentroids(distance))
			.setCompareCriterionOfNode0(new KMeansIterTermination(distance, tol))
			.closeWith(new KMeansOutputModel(distanceType, vectorColName, latitudeColName, longitudeColName))
			.setMaxIter(maxIter)
			.exec();
	}
}

KMeansPreallocateCentroid

預先分配聚類中心

public class KMeansPreallocateCentroid extends ComputeFunction {
    public void calc(ComContext context) {
        if (context.getStepNo() == 1) {
            List<FastDistanceMatrixData> initCentroids = (List)context.getObj("initCentroid");
            List<Integer> list = (List)context.getObj("statistics");
            Integer vectorSize = (Integer)list.get(0);
            context.putObj("vectorSize", vectorSize);
            FastDistanceMatrixData centroid = (FastDistanceMatrixData)initCentroids.get(0);
            Preconditions.checkArgument(centroid.getVectors().numRows() == vectorSize, "Init centroid error, size not equal!");
            context.putObj("centroid1", Tuple2.of(context.getStepNo() - 1, centroid));
            context.putObj("centroid2", Tuple2.of(context.getStepNo() - 1, new FastDistanceMatrixData(centroid)));
            context.putObj("k", centroid.getVectors().numCols());
        }
    }
}

KMeansAssignCluster

爲每一個點(point)計算最近的聚類中心,爲每一個聚類中心的點座標的計數和求和

/**
 * Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster.
 */
public class KMeansAssignCluster extends ComputeFunction {
    @Override
    public void calc(ComContext context) {

        Integer vectorSize = context.getObj(KMeansTrainBatchOp.VECTOR_SIZE);
        Integer k = context.getObj(KMeansTrainBatchOp.K);
        // get iterative coefficient from static memory.
        Tuple2<Integer, FastDistanceMatrixData> stepNumCentroids;
        if (context.getStepNo() % 2 == 0) {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
        } else {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
        }

        if (null == distanceMatrix) {
            distanceMatrix = new DenseMatrix(k, 1);
        }

        double[] sumMatrixData = context.getObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE);
        if (sumMatrixData == null) {
            sumMatrixData = new double[k * (vectorSize + 1)];
            context.putObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE, sumMatrixData);
        }

        Iterable<FastDistanceVectorData> trainData = context.getObj(KMeansTrainBatchOp.TRAIN_DATA);
        if (trainData == null) {
            return;
        }

        Arrays.fill(sumMatrixData, 0.0);
        for (FastDistanceVectorData sample : trainData) {
            KMeansUtil.updateSumMatrix(sample, 1, stepNumCentroids.f1, vectorSize, sumMatrixData, k, fastDistance,
                distanceMatrix);
        }
    }
}

KMeansUpdateCentroids

基於點計數和座標,計算新的聚類中心。

/**
 * Update the centroids based on the sum of points and point number belonging to the same cluster.
 */
public class KMeansUpdateCentroids extends ComputeFunction {
    @Override
    public void calc(ComContext context) {

        Integer vectorSize = context.getObj(KMeansTrainBatchOp.VECTOR_SIZE);
        Integer k = context.getObj(KMeansTrainBatchOp.K);
        double[] sumMatrixData = context.getObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE);

        Tuple2<Integer, FastDistanceMatrixData> stepNumCentroids;
        if (context.getStepNo() % 2 == 0) {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
        } else {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
        }

        stepNumCentroids.f0 = context.getStepNo();
        context.putObj(KMeansTrainBatchOp.K,
            updateCentroids(stepNumCentroids.f1, k, vectorSize, sumMatrixData, distance));
    }
}

4. 區別

代碼量

經過下面的分析能夠看出,從實際業務代碼量角度說,其實差異不大。

  • Flink的代碼量少;
  • Alink的代碼量雖然大,但其本質就是把Flink版本的一些用戶定義類分離到本身不一樣類中,而且有不少讀取環境變量的代碼;

因此Alink代碼只能說比Flink原生實現略大。

耦合度

這裏指的是與Flink的耦合度。能看出來Flink的KMeans算法須要大量的Flink類。而Alink被最大限度屏蔽了。

  • Flink 算法須要引入的flink類以下
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
  • Alink 算法須要引入的flink類以下,能夠看出來ALink使用的都是基本設施,不涉及算子和複雜API,這樣就減小了用戶的負擔。
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

編程模式

這是一個主要的區別。

  • Flink 使用的是函數式編程。這種範式相對新穎,不少工程師不熟悉。
  • Alink 依然使用了命令式編程。這樣的好處在於,大量現有算法代碼能夠複用,也更符合絕大多數工程師的習慣。
  • Flink 經過Flink的各類算子完成了操做,好比IterativeDataSet實現了迭代。但這種實現對於不熟悉Flink的工程師是個折磨。
  • Alink 基於本身的框架,把計算代碼總結成了若干ComputeFunction,而後經過IterativeComQueue完成了具體算法的迭代。這樣用戶其實對Flink是不須要過多深刻理解。

在下一期文章中,將從源碼角度分析驗證本文的設計思路

0x05 參考

商業模式的定義——作產品究竟是作什麼

k-means聚類算法原理簡析

flink kmeans聚類算法實現

Spark ML簡介之Pipeline,DataFrame,Estimator,Transformer

開源 | 全球首個批流一體機器學習平臺

斬獲GitHub 2000+ Star,阿里雲開源的 Alink 機器學習平臺如何跑贏雙11數據「博弈」?|AI 技術生態論

Flink DataSet API

相關文章
相關標籤/搜索