Alink 是阿里巴巴基於實時計算引擎 Flink 研發的新一代機器學習算法平臺,是業界首個同時支持批式算法、流式算法的機器學習平臺。迭代算法在不少數據分析領域會用到,好比機器學習或者圖計算。本文將經過Superstep入手看看Alink是如何利用Flink迭代API來實現具體算法。html
由於Alink的公開資料太少,因此如下均爲自行揣測,確定會有疏漏錯誤,但願你們指出,我會隨時更新。java
爲何提到 Superstep 這個概念,是由於在擼KMeans代碼的時候,發現幾個很奇怪的地方,好比如下三個步驟中,都用到了context.getStepNo(),並且會根據其數值的不一樣進行不一樣業務操做:git
public class KMeansPreallocateCentroid extends ComputeFunction { public void calc(ComContext context) { LOG.info("liuhao KMeansPreallocateCentroid "); if (context.getStepNo() == 1) { /** 具體業務邏輯代碼 * Allocate memory for pre-round centers and current centers. */ } } } public class KMeansAssignCluster extends ComputeFunction { public void calc(ComContext context) { ...... if (context.getStepNo() % 2 == 0) { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1); } else { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2); } /** 具體業務邏輯代碼 * Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster. */ } } public class KMeansUpdateCentroids extends ComputeFunction { public void calc(ComContext context) { if (context.getStepNo() % 2 == 0) { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2); } else { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1); } /** 具體業務邏輯代碼 * Update the centroids based on the sum of points and point number belonging to the same cluster. */ }
查看ComContext的源碼,發現stepNo的來源竟然是runtimeContext.getSuperstepNumber()
。web
public class ComContext { private final int taskId; private final int numTask; private final int stepNo; // 對,就是這裏 private final int sessionId; public ComContext(int sessionId, IterationRuntimeContext runtimeContext) { this.sessionId = sessionId; this.numTask = runtimeContext.getNumberOfParallelSubtasks(); this.taskId = runtimeContext.getIndexOfThisSubtask(); this.stepNo = runtimeContext.getSuperstepNumber(); // 這裏進行了變量初始化 } /** * Get current iteration step number, the same as {@link IterationRuntimeContext#getSuperstepNumber()}. * @return iteration step number. */ public int getStepNo() { return stepNo; // 這裏是使用 } }
看到這裏有的兄弟可能會虎軀一震,這不是BSP模型的概念嘛。我就是想寫個KMeans算法,怎麼除了MPI模型,還要考慮BSP模型。下面就讓咱們一步一步挖掘究竟Alink都作了什麼工做。算法
在 Flink 中的執行圖能夠分爲四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執行圖apache
由於某種緣由,Flink內部對這兩個概念的使用自己就有些混亂:在Task Manager裏這個subtask的概念由一個叫Task的類來實現。Task Manager裏談論的Task對象實際上對應的是ExecutionGraph裏的一個subtask。編程
因此這兩個概念須要理清楚。api
Flink 中的程序本質上是並行的。在執行期間,每個算子 Operator (Transformation)都有一個或多個算子subTask(Operator SubTask),每一個算子的 subTask 之間都是彼此獨立,並在不一樣的線程中執行,而且可能在不一樣的機器或容器上執行。sass
Task( SubTask) 是一個Runnable 對象, Task Manager接受到TDD 後會用它實例化成一個Task對象, 並啓動一個線程執行Task的Run方法。網絡
TaskDeploymentDescriptor(TDD) : 是Task Manager在submitTask是提交給TM的數據結構。 他包含了關於Task的全部描述信息。好比:
在如下狀況下會從新劃分task
好比有以下操做
DataStream<String> text = env.socketTextStream(hostname, port); DataStream counts = text .filter(new FilterClass()) .map(new LineSplitter()) .keyBy(0) .timeWindow(Time.seconds(10)) .sum(2)
那麼StreamGraph的轉換流是:
Source --> Filter --> Map --> Timestamps/Watermarks --> Window(SumAggregator) --> Sink
其task是四個:
其中每一個task又會被分紅分若干subtask。在執行時,一個Task會被並行化成若干個subTask實例進行執行,一個subTask對應一個執行線程。
以上說了這麼多,就是要說jobGraph和subtask,由於本文中咱們在分析源碼和調試時候,主要是從jobGraph這裏開始入手來看subtask。
JobGraph是在StreamGraph的基礎之上,對StreamNode進行了關聯合並的操做,好比對於source -> flatMap -> reduce -> sink 這樣一個數據處理鏈,當source和flatMap知足連接的條件時,能夠能夠將兩個操做符的操做放到一個線程並行執行,這樣能夠減小網絡中的數據傳輸,因爲在source和flatMap之間的傳輸的數據也不用序列化和反序列化,因此也提升了程序的執行效率。
相比流圖(StreamGraph)以及批處理優化計劃(OptimizedPlan),JobGraph發生了一些變化,已經不徹底是「靜態」的數據結構了,由於它加入了中間結果集(IntermediateDataSet)這一「動態」概念。
做業頂點(JobVertex)、中間數據集(IntermediateDataSet)、做業邊(JobEdge)是組成JobGraph的基本元素。這三個對象彼此之間互爲依賴:
那麼JobGraph是怎麼組織並存儲這些元素的呢?其實JobGraph只以Map的形式存儲了全部的JobVertex,鍵是JobVertexID:
private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<JobVertexID, JobVertex>();
至於其它的元素,經過JobVertex均可以根據關係找尋到。須要注意的是,用於迭代的反饋邊(feedback edge)當前並不體如今JobGraph中,而是被內嵌在特殊的JobVertex中經過反饋信道(feedback channel)在它們之間創建關係。
BSP模型是並行計算模型的一種。並行計算模型一般指從並行算法的設計和分析出發,將各類並行計算機(至少某一類並行計算機)的基本特徵抽象出來,造成一個抽象的計算模型。
BSP模型是一種異步MIMD-DM模型(DM: distributed memory,SM: shared memory),BSP模型支持消息傳遞系統,塊內異步並行,塊間顯式同步,該模型基於一個master協調,全部的worker同步(lock-step)執行, 數據從輸入的隊列中讀取。
BSP計算模型不只是一種體系結構模型,也是設計並行程序的一種方法。BSP程序設計準則是總體同步(bulk synchrony),其獨特之處在於超步(superstep)概念的引入。一個BSP程序同時具備水平和垂直兩個方面的結構。從垂直上看,一個BSP程序由一系列串行的超步(superstep)組成。
BSP模型的實現大概舉例以下:
Flink-Gelly利用Flink的高效迭代算子來支持海量數據的迭代式圖處理。目前,Flink Gelly提供了「Vertex-Centric」,「Scatter-Gather」以及「Gather-Sum-Apply」等計算模型的實現。
「Vertex-Centric」迭代模型也就是咱們常常聽到的「Pregel」,是一種從Vertex角度出發的圖計算方式。其中,同步地迭代計算的步驟稱之爲「superstep」。在每一個「superstep」中,每一個頂點都執行一個用戶自定義的函數,且頂點之間經過消息進行通訊,當一個頂點知道圖中其餘任意頂點的惟一ID時,該頂點就能夠向其發送一條消息。
可是實際上,KMeans不是圖處理,Alink也沒有基於Flink-Gelly來構建。也許只是借鑑了其概念。因此咱們還須要再探尋。
迭代算法在不少數據分析領域會用到,好比機器學習或者圖計算。爲了從大數據中抽取有用信息,這個時候每每會須要在處理的過程當中用到迭代計算。
所謂迭代運算,就是給定一個初值,用所給的算法公式計算初值獲得一箇中間結果,而後將中間結果做爲輸入參數進行反覆計算,在知足必定條件的時候獲得計算結果。
大數據處理框架不少,好比spark,mr。實際上這些實現迭代計算都是很困難的。
Flink直接支持迭代計算。Flink實現迭代的思路也是很簡單,就是實現一個step函數,而後將其嵌入到迭代算子中去。有兩種迭代操做算子: Iterate和Delta Iterate。兩個操做算子都是在未收到終止迭代信號以前一直調用step函數。
這種迭代方式稱爲全量迭代,它會將整個數據輸入,通過必定的迭代次數,最終獲得你想要的結果。
迭代操做算子包括了簡單的迭代形式:每次迭代,step函數會消費全量數據(本次輸入和上次迭代的結果),而後計算獲得下輪迭代的輸出(例如,map,reduce,join等)
迭代過程主要分爲如下幾步:
它迭代的結束條件是:
編程的時候,須要調用iterate(int),該函數返回的是一個IterativeDataSet,固然咱們能夠對它進行一些操做,好比map等。Iterate函數惟一的參數是表明最大迭代次數。
迭代是一個環。咱們須要進行閉環操做,那麼這時候就要用到closeWith(Dataset)操做了,參數就是須要循環迭代的dataset。也能夠可選的指定一個終止標準,操做closeWith(DataSet, DataSet),能夠經過判斷第二個dataset是否爲空,來終止迭代。若是不指定終止迭代條件,迭代就會在迭代了最大迭代次數後終止。
DataSet API引進了獨特的同步迭代機制(superstep-based),僅限於用在有界的流。
咱們將迭代操做算子的每一個步驟函數的執行稱爲單個迭代。在並行設置中,在迭代狀態的不一樣分區上並行計算step函數的多個實例。在許多設置中,對全部並行實例上的step函數的一次評估造成了所謂的superstep,這也是同步的粒度。所以,迭代的全部並行任務都須要在初始化下一個superstep以前完成superstep。終止準則也將被評估爲superstep同步屏障。
下面是Apache原文
We referred to each execution of the step function of an iteration operator as a single iteration. In parallel setups, multiple instances of the step function are evaluated in parallel on different partitions of the iteration state. In many settings, one evaluation of the step function on all parallel instances forms a so called superstep, which is also the granularity of synchronization. Therefore, all parallel tasks of an iteration need to complete the superstep, before a next superstep will be initialized. Termination criteria will also be evaluated at superstep barriers.
下面是apache原圖
歸納以下:
每次迭代都是一個superstep 每次迭代中有若干subtask在不一樣的partition上分別執行step 每一個step有一個HeadTask,若干IntermediateTask,一個TailTask 每一個superstep有一個SynchronizationSinkTask 同步,由於迭代的全部並行任務須要在下一個迭代前完成
由此咱們能夠知道,superstep這是Flink DataSet API的概念,可是你從這裏可以看到BSP模型的影子,好比:
KMeansTrainBatchOp.iterateICQ函數中,生成了一個IterativeComQueue,而IterativeComQueue之中就用到了superstep-based迭代。
return new IterativeComQueue() .initWithPartitionedData(TRAIN_DATA, data) .initWithBroadcastData(INIT_CENTROID, initCentroid) .initWithBroadcastData(KMEANS_STATISTICS, statistics) .add(new KMeansPreallocateCentroid()) .add(new KMeansAssignCluster(distance)) .add(new AllReduce(CENTROID_ALL_REDUCE)) .add(new KMeansUpdateCentroids(distance)) .setCompareCriterionOfNode0(new KMeansIterTermination(distance, tol)) // 終止條件 .closeWith(new KMeansOutputModel(distanceType, vectorColName, latitudeColName, longitudeColName)) .setMaxIter(maxIter) // 迭代最大次數 .exec();
而BaseComQueue.exec函數中則有:
public DataSet<Row> exec() { IterativeDataSet<byte[]> loop // Flink 迭代API = loopStartDataSet(executionEnvironment) .iterate(maxIter); // 後續操做能看出來,以前添加在queue上的好比KMeansPreallocateCentroid,都是在loop之上運行的。 if (null == compareCriterion) { loopEnd = loop.closeWith... } else { // compare Criterion. DataSet<Boolean> criterion = input ... compareCriterion loopEnd = loop.closeWith( ... criterion ... ) } }
再仔細研究代碼,咱們能夠看出:
superstep包括:
.add(new KMeansPreallocateCentroid())
.add(new KMeansAssignCluster(distance))
.add(new AllReduce(CENTROID_ALL_REDUCE))
.add(new KMeansUpdateCentroids(distance))
終止標準就是
利用KMeansIterTermination構建了一個RichMapPartitionFunction做爲終止標準。最後結束時候調用 KMeansOutputModel完成業務操做。
最大循環就是
.setMaxIter(maxIter)
因而咱們能夠得出結論,superstep-based Bulk Iterate 迭代算子是用來實現總體KMeans算法,KMeans算法就是一個superstep進行迭代。可是在superstep內容若是須要通信或者柵欄同步,則採用了MPI的allReduce。
咱們須要深刻到Flink內部去挖掘驗證,若是你們有興趣,能夠參見下面調用棧,本身添加斷點來研究。
execute:56, LocalExecutor (org.apache.flink.client.deployment.executors) executeAsync:944, ExecutionEnvironment (org.apache.flink.api.java) execute:860, ExecutionEnvironment (org.apache.flink.api.java) execute:844, ExecutionEnvironment (org.apache.flink.api.java) collect:413, DataSet (org.apache.flink.api.java) sinkFrom:44, PrintBatchOp (com.alibaba.alink.operator.batch.utils) sinkFrom:20, PrintBatchOp (com.alibaba.alink.operator.batch.utils) linkFrom:31, BaseSinkBatchOp (com.alibaba.alink.operator.batch.sink) linkFrom:17, BaseSinkBatchOp (com.alibaba.alink.operator.batch.sink) link:89, BatchOperator (com.alibaba.alink.operator.batch) linkTo:239, BatchOperator (com.alibaba.alink.operator.batch) print:337, BatchOperator (com.alibaba.alink.operator.batch) main:35, KMeansExample (com.alibaba.alink)
Alink和Flink構建聯繫,是在print調用中完成的。由於是本地調試,Flink會啓動一個miniCluster,而後會作以下操做。
當咱們看到了submitJob
調用,就知道KMeans代碼已經和Flink構建了聯繫。
@Internal public class LocalExecutor implements PipelineExecutor { public static final String NAME = "local"; @Override public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception { // we only support attached execution with the local executor. checkState(configuration.getBoolean(DeploymentOptions.ATTACHED)); final JobGraph jobGraph = getJobGraph(pipeline, configuration); final MiniCluster miniCluster = startMiniCluster(jobGraph, configuration); final MiniClusterClient clusterClient = new MiniClusterClient(configuration, miniCluster); CompletableFuture<JobID> jobIdFuture = clusterClient.submitJob(jobGraph); jobIdFuture .thenCompose(clusterClient::requestJobResult) .thenAccept((jobResult) -> clusterClient.shutDownCluster()); return jobIdFuture.thenApply(jobID -> new ClusterClientJobClientAdapter<>(() -> clusterClient, jobID)); }
生成jobGraph的具體流程是:
if (dataSet instanceof BulkIterationResultSet)
,則調用translateBulkIteration(bulkIterationResultSet);
if (c instanceof BulkIterationBase)
,以生成BulkIterationNode前面代碼中,getJobGraph函數做用是生成了job graph。
而後 JobManager 根據 JobGraph 生成 ExecutionGraph。ExecutionGraph 是 JobGraph 的並行化版本,是調度層最核心的數據結構。
最後 JobManager 根據 ExecutionGraph 對 Job 進行調度後,在各個TaskManager 上部署 Task。
因此咱們須要看看最終運行時候,迭代API對應着哪些Task。
針對IterativeDataSet,即superstep-based Bulk Iterate,Flink生成了以下的task。
IterationHeadTask主要做用是協調一次迭代。
它會讀取初始輸入,和迭代Tail創建一個BlockingBackChannel。在成功處理輸入以後,它會發送EndOfSuperstep事件給本身的輸出。它在每次superstep以後會聯繫 synchronization task,等到本身收到一個用來同步的AllWorkersDoneEvent。AllWorkersDoneEvent表示全部其餘的heads已經完成了本身的迭代。
下一次迭代時候,上一次迭代中tail的輸出就經由backchannel傳輸,造成了head的輸入。什麼時候進入到下一個迭代,是由HeadTask完成的。一旦迭代完成,head將發送TerminationEvent給全部和它關聯的task,告訴他們shutdown。
barrier.waitForOtherWorkers(); if (barrier.terminationSignaled()) { requestTermination(); nextStepKickoff.signalTermination(); } else { incrementIterationCounter(); String[] globalAggregateNames = barrier.getAggregatorNames(); Value[] globalAggregates = barrier.getAggregates(); aggregatorRegistry.updateGlobalAggregatesAndReset(globalAggregateNames, globalAggregates); // 在這裏發起下一次Superstep。 nextStepKickoff.triggerNextSuperstep(); } }
IterationHeadTask是在JobGraphGenerator.createBulkIterationHead中構建的。其例子以下:
"PartialSolution (Bulk Iteration) (org.apache.flink.runtime.iterative.task.IterationHeadTask)"
IterationIntermediateTask是superstep中間段的task,其將傳輸EndOfSuperstepEvent和TerminationEvent給全部和它關聯的tasks。此外,IterationIntermediateTask能更新the workset或者the solution set的迭代狀態。
若是迭代狀態被更新,本task的輸出將傳送回IterationHeadTask,在這種狀況下,本task將做爲head再次被安排。
IterationIntermediateTask的例子以下:
"MapPartition (computation@KMeansUpdateCentroids) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)" "Combine (SUM(0), at kMeansPlusPlusInit(KMeansInitCentroids.java:135) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)" "MapPartition (AllReduceSend) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)" "Filter (Filter at kMeansPlusPlusInit(KMeansInitCentroids.java:130)) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)"
IterationTailTask是迭代的最末尾。若是迭代狀態被更新,本task的輸出將經過BlockingBackChannel傳送回IterationHeadTask,反饋給迭代頭就意味着一個迭代完整邏輯的完成,那麼就能夠關閉這個迭代閉合環了。這種狀況下,本task將在head所在的實例上從新被調度。
這裏有幾個關鍵點須要注意:
Flink有一個BlockingQueueBroker類,這是一個阻塞式的隊列代理,它的做用是對迭代併發進行控制。Broker是單例的,迭代頭任務和尾任務會生成一樣的broker ID,因此頭尾在同一個JVM中會基於相同的dataChannel進行通訊。dataChannel由迭代頭建立。
IterationHeadTask中會生成BlockingBackChannel,這是一個容量爲1的阻塞隊列。
// 生成channel BlockingBackChannel backChannel = new BlockingBackChannel(new SerializedUpdateBuffer(segments, segmentSize, this.getIOManager())); // 而後block在這裏,等待Tail superstepResult = backChannel.getReadEndAfterSuperstepEnded();
IterationTailTask則是以下:
// 在基類獲得channel,由於是單例,因此會獲得同一個 worksetBackChannel = BlockingBackChannelBroker.instance().getAndRemove(brokerKey()); // notify iteration head if responsible for workset update 在這裏通知Head worksetBackChannel.notifyOfEndOfSuperstep();
而二者都是利用以下辦法來創建聯繫,在同一個subtask中會使用同一個brokerKey,這樣首尾就聯繫起來了。
public String brokerKey() { if (this.brokerKey == null) { int iterationId = this.config.getIterationId(); this.brokerKey = this.getEnvironment().getJobID().toString() + '#' + iterationId + '#' + this.getEnvironment().getTaskInfo().getIndexOfThisSubtask(); } return this.brokerKey; }
這是經過output.collect來完成的。
首先,在Tail初始化時候,會生成一個outputCollector,這個outputCollector會被設置爲本task的輸出outputCollector。這樣就保證了用戶函數的輸出都會轉流到outputCollector。
而outputCollector的輸出就是worksetBackChannel的輸出,這裏設置爲同一個instance。這樣用戶輸出就輸出到backChannel中。
@Override protected void initialize() throws Exception { super.initialize(); // set the last output collector of this task to reflect the iteration tail state update: // a) workset update, // b) solution set update, or // c) merged workset and solution set update Collector<OT> outputCollector = null; if (isWorksetUpdate) { // 生成一個outputCollector outputCollector = createWorksetUpdateOutputCollector(); // we need the WorksetUpdateOutputCollector separately to count the collected elements if (isWorksetIteration) { worksetUpdateOutputCollector = (WorksetUpdateOutputCollector<OT>) outputCollector; } } ...... // 把outputCollector設置爲本task的輸出 setLastOutputCollector(outputCollector); }
outputCollector的輸出就是worksetBackChannel的輸出buffer,這裏設置爲同一個instance。
protected Collector<OT> createWorksetUpdateOutputCollector(Collector<OT> delegate) { DataOutputView outputView = worksetBackChannel.getWriteEnd(); TypeSerializer<OT> serializer = getOutputSerializer(); return new WorksetUpdateOutputCollector<OT>(outputView, serializer, delegate); }
運行時候以下:
@Override public void run() throws Exception { SuperstepKickoffLatch nextSuperStepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey()); while (this.running && !terminationRequested()) { // 用戶在這裏輸出,最後會輸出到output.collect,也就是worksetBackChannel的輸出buffer。 super.run(); // 這時候以及輸出到channel完畢,只是通知head進行讀取。 if (isWorksetUpdate) { // notify iteration head if responsible for workset update worksetBackChannel.notifyOfEndOfSuperstep(); } else if (isSolutionSetUpdate) { // notify iteration head if responsible for solution set update solutionSetUpdateBarrier.notifySolutionSetUpdate(); } ... }
IterationTailTask例子以下:
"Pipe (org.apache.flink.runtime.iterative.task.IterationTailTask)"
IterationSynchronizationSinkTask做用是同步全部的iteration heads,IterationSynchronizationSinkTask被是實現成一個 output task。其只是用來協調,不處理任何數據。
在每一次superstep,IterationSynchronizationSinkTask只是等待直到它從每個head都收到一個WorkerDoneEvent。這表示下一次superstep能夠開始了。
這裏須要注意的是 SynchronizationSinkTask 如何等待各個並行度的headTask。好比Flink的並行度是5,那麼SynchronizationSinkTask怎麼作到等待這5個headTask。
在IterationSynchronizationSinkTask中,註冊了SyncEventHandler來等待head的WorkerDoneEvent。
this.eventHandler = new SyncEventHandler(numEventsTillEndOfSuperstep, this.aggregators, this.getEnvironment().getUserClassLoader()); this.headEventReader.registerTaskEventListener(this.eventHandler, WorkerDoneEvent.class);
在SyncEventHandler中,咱們能夠看到,在構建時候,numberOfEventsUntilEndOfSuperstep就被設置爲並行度,每次收到一個WorkerDoneEvent,workerDoneEventCounter就遞增,當等於numberOfEventsUntilEndOfSuperstep,即並行度時候,就說明本次superstep中,全部headtask都成功了。
private void onWorkerDoneEvent(WorkerDoneEvent workerDoneEvent) { if (this.endOfSuperstep) { throw new RuntimeException("Encountered WorderDoneEvent when still in End-of-Superstep status."); } else { // 每次遞增 ++this.workerDoneEventCounter; String[] aggNames = workerDoneEvent.getAggregatorNames(); Value[] aggregates = workerDoneEvent.getAggregates(this.userCodeClassLoader); if (aggNames.length != aggregates.length) { throw new RuntimeException("Inconsistent WorkerDoneEvent received!"); } else { for(int i = 0; i < aggNames.length; ++i) { Aggregator<Value> aggregator = (Aggregator)this.aggregators.get(aggNames[i]); aggregator.aggregate(aggregates[i]); } // numberOfEventsUntilEndOfSuperstep就是並行度,等於並行度時候就說明全部head都成功了。 if (this.workerDoneEventCounter % this.numberOfEventsUntilEndOfSuperstep == 0) { this.endOfSuperstep = true; Thread.currentThread().interrupt(); } } } }
IterationSynchronizationSinkTask的例子以下:
"Sync (BulkIteration (Bulk Iteration)) (org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask)"
綜上所述,咱們最終獲得superstep以下:
***** 文字描述以下 ***** 每次迭代都是一個superstep 每次迭代中有若干subtask在不一樣的partition上分別執行step 每一個step有一個HeadTask,若干IntermediateTask,一個TailTask 每一個superstep有一個SynchronizationSinkTask ***** 僞代碼大體以下 ***** for maxIter : begin superstep for maxSubTask : begin step IterationHeadTask IterationIntermediateTask IterationIntermediateTask ... IterationIntermediateTask IterationIntermediateTask IterationTailTask end step IterationSynchronizationSinkTask end superstep
K-means算法的過程,爲了儘可能不用數學符號,因此描述的不是很嚴謹,大概就是這個意思,「物以類聚、人以羣分」:
KMeansPreallocateCentroid也是superstep一員,可是隻有context.getStepNo() == 1
的時候,纔會進入實際業務邏輯,預分配Centroid。當superstep爲大於1的時候,本task會執行,但不會進入具體業務代碼。
public class KMeansPreallocateCentroid extends ComputeFunction { private static final Logger LOG = LoggerFactory.getLogger(KMeansPreallocateCentroid.class); @Override public void calc(ComContext context) { // 每次superstep都會進到這裏 LOG.info(" KMeansPreallocateCentroid 我每次都會進的呀 "); if (context.getStepNo() == 1) { // 實際預分配業務只進入一次 } } }
KMeansAssignCluster 做用是爲每一個點(point)計算最近的聚類中心,爲每一個聚類中心的點座標的計數和求和。
KMeansUpdateCentroids 做用是基於計算出來的點計數和座標,計算新的聚類中心。
Alink在整個計算過程當中維護一個特殊節點來記住待求中心點當前的結果。
這就是爲啥迭代時候須要區分奇數次和偶數次的緣由了。奇數次就表示老大哥,偶數次就表示新大哥。每次superstep只會計算一批大哥,留下另一批大哥作距離比對。
另外要注意的一點是:普通的迭代計算,是經過Tail給Head回傳用戶數據,可是KMeans這裏的實現並無採用這個辦法,而是把計算出來的中心點都存在共享變量中,在各個intermediate之間互相交互。
public class KMeansAssignCluster extends ComputeFunction { public void calc(ComContext context) { ...... if (context.getStepNo() % 2 == 0) { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1); } else { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2); } /** 具體業務邏輯代碼 * Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster. */ } } public class KMeansUpdateCentroids extends ComputeFunction { public void calc(ComContext context) { if (context.getStepNo() % 2 == 0) { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2); } else { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1); } /** 具體業務邏輯代碼 * Update the centroids based on the sum of points and point number belonging to the same cluster. */ }
這裏要特殊說明,由於KMeansOutputModel是最終輸出模型,而KMeans算法的實現是:全部subtask都擁有全部中心點,就是說全部subtask都會有相同的模型,就沒有必要所有輸出,因此這裏限定了第一個subtask才能輸出,其餘的都不輸出。
@Override public List <Row> calc(ComContext context) { // 只有第一個subtask才輸出模型數據。 if (context.getTaskId() != 0) { return null; } .... modelData.params = new KMeansTrainModelData.ParamSummary(); modelData.params.k = k; modelData.params.vectorColName = vectorColName; modelData.params.distanceType = distanceType; modelData.params.vectorSize = vectorSize; modelData.params.latitudeColName = latitudeColName; modelData.params.longtitudeColName = longtitudeColName; RowCollector collector = new RowCollector(); new KMeansModelDataConverter().save(modelData, collector); return collector.getRows(); }
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/iterations.html
聚類、K-Means、例子、細節
Flink-Gelly:Iterative Graph Processing