Alink漫談(五) : 迭代計算和Superstep

Alink漫談(五) : 迭代計算和Superstep

0x00 摘要

Alink 是阿里巴巴基於實時計算引擎 Flink 研發的新一代機器學習算法平臺,是業界首個同時支持批式算法、流式算法的機器學習平臺。迭代算法在不少數據分析領域會用到,好比機器學習或者圖計算。本文將經過Superstep入手看看Alink是如何利用Flink迭代API來實現具體算法。html

由於Alink的公開資料太少,因此如下均爲自行揣測,確定會有疏漏錯誤,但願你們指出,我會隨時更新。java

0x01 原因

爲何提到 Superstep 這個概念,是由於在擼KMeans代碼的時候,發現幾個很奇怪的地方,好比如下三個步驟中,都用到了context.getStepNo(),並且會根據其數值的不一樣進行不一樣業務操做:git

public class KMeansPreallocateCentroid extends ComputeFunction {
    public void calc(ComContext context) {
        LOG.info("liuhao  KMeansPreallocateCentroid ");
        if (context.getStepNo() == 1) {
          /** 具體業務邏輯代碼
           * Allocate memory for pre-round centers and current centers.
           */        
        }
    }
}  

public class KMeansAssignCluster extends ComputeFunction {
    public void calc(ComContext context) {
        ......
        if (context.getStepNo() % 2 == 0) {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
        } else {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
        }
      /** 具體業務邏輯代碼
       * Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster.
       */
    }
}

public class KMeansUpdateCentroids extends ComputeFunction {
    public void calc(ComContext context) {
        if (context.getStepNo() % 2 == 0) {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
        } else {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
        }
      /** 具體業務邏輯代碼
       * Update the centroids based on the sum of points and point number belonging to the same cluster.
       */
    }

查看ComContext的源碼,發現stepNo的來源竟然是runtimeContext.getSuperstepNumber()web

public class ComContext {
   private final int taskId;
   private final int numTask;
   private final int stepNo; // 對,就是這裏
   private final int sessionId;
	public ComContext(int sessionId, IterationRuntimeContext runtimeContext) {
		this.sessionId = sessionId;
		this.numTask = runtimeContext.getNumberOfParallelSubtasks();
		this.taskId = runtimeContext.getIndexOfThisSubtask();
		this.stepNo = runtimeContext.getSuperstepNumber(); // 這裏進行了變量初始化
	}  
	/**
	 * Get current iteration step number, the same as {@link IterationRuntimeContext#getSuperstepNumber()}.
	 * @return iteration step number.
	 */
	public int getStepNo() {
		return stepNo; // 這裏是使用
	}  
}

看到這裏有的兄弟可能會虎軀一震,這不是BSP模型的概念嘛。我就是想寫個KMeans算法,怎麼除了MPI模型,還要考慮BSP模型。下面就讓咱們一步一步挖掘究竟Alink都作了什麼工做。算法

0x02 背景概念

2.1 四層執行圖

在 Flink 中的執行圖能夠分爲四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執行圖apache

  • StreamGraph:Stream API 編寫的代碼生成的最初的圖。用來表示程序的拓撲結構。
  • JobGraph:StreamGraph 通過優化後生成了 JobGraph, JobGraph是提交給 JobManager 的數據結構。主要的優化爲,將多個符合條件的節點 chain 在一塊兒做爲一個節點,這樣能夠減小數據在節點之間流動所須要的序列化/反序列化/傳輸消耗。JobGraph是惟一被Flink的數據流引擎所識別的表述做業的數據結構,也正是這一共同的抽象體現了流處理和批處理在運行時的統一。
  • ExecutionGraph:JobManager 根據 JobGraph 生成 ExecutionGraph。ExecutionGraph 是 JobGraph 的並行化版本,是調度層最核心的數據結構。
  • 物理執行圖:JobManager 根據 ExecutionGraph 對 Job 進行調度後,在各個TaskManager 上部署 Task 後造成的「圖」,並非一個具體的數據結構。

2.2 Task和SubTask

由於某種緣由,Flink內部對這兩個概念的使用自己就有些混亂:在Task Manager裏這個subtask的概念由一個叫Task的類來實現。Task Manager裏談論的Task對象實際上對應的是ExecutionGraph裏的一個subtask。編程

因此這兩個概念須要理清楚。api

  • Task(任務) :Task對應JobGraph的一個節點,是一個算子Operator。Task 是一個階段多個功能相同 subTask 的集合,相似於 Spark 中的 TaskSet。
  • subTask(子任務) :subTask 是 Flink 中任務最小執行單元,是一個 Java 類的實例,這個 Java 類中有屬性和方法,完成具體的計算邏輯。在ExecutionGraph裏Task被分解爲多個並行執行的subtask 。每一個subtask做爲一個excution分配到Task Manager裏執行。
  • Operator Chains(算子鏈) :沒有 shuffle 的多個算子合併在一個 subTask 中,就造成了 Operator Chains,相似於 Spark 中的 Pipeline。Operator subTask 的數量指的就是算子的並行度。同一程序的不一樣算子也可能具備不一樣的並行度(由於能夠經過 setParallelism() 方法來修改並行度)。

Flink 中的程序本質上是並行的。在執行期間,每個算子 Operator (Transformation)都有一個或多個算子subTask(Operator SubTask),每一個算子的 subTask 之間都是彼此獨立,並在不一樣的線程中執行,而且可能在不一樣的機器或容器上執行。sass

Task( SubTask) 是一個Runnable 對象, Task Manager接受到TDD 後會用它實例化成一個Task對象, 並啓動一個線程執行Task的Run方法。網絡

TaskDeploymentDescriptor(TDD) : 是Task Manager在submitTask是提交給TM的數據結構。 他包含了關於Task的全部描述信息。好比:

  • TaskInfo : 包含該Task 執行的java 類,該類是某個 AbstractInvokable的實現類 , 固然也是某個operator的實現類 (好比DataSourceTask, DataSinkTask, BatchTask,StreamTask 等)。
  • IG描述 :一般包含一個或兩個InputGateDeploymentDescriptor(IGD)。
  • 目標RP的描述: ParitionId, PartitionType, RS個數等等。

2.3 如何劃分 Task 的依據

在如下狀況下會從新劃分task

  • 並行度發生變化時
  • keyBy() /window()/apply() 等發生 Rebalance 從新分配;
  • 調用 startNewChain() 方法,開啓一個新的算子鏈;
  • 調用 diableChaining()方法,即:告訴當前算子操做不使用 算子鏈 操做。

好比有以下操做

DataStream<String> text = env.socketTextStream(hostname, port);

DataStream counts = text
    .filter(new FilterClass())
    .map(new LineSplitter())
    .keyBy(0)
    .timeWindow(Time.seconds(10))
    .sum(2)

那麼StreamGraph的轉換流是:

Source --> Filter --> Map --> Timestamps/Watermarks --> Window(SumAggregator) --> Sink

其task是四個:

  • Source --> Filter --> Map
  • keyBy
  • timeWindow
  • Sink

其中每一個task又會被分紅分若干subtask。在執行時,一個Task會被並行化成若干個subTask實例進行執行,一個subTask對應一個執行線程。

2.4 JobGraph

以上說了這麼多,就是要說jobGraph和subtask,由於本文中咱們在分析源碼和調試時候,主要是從jobGraph這裏開始入手來看subtask

JobGraph是在StreamGraph的基礎之上,對StreamNode進行了關聯合並的操做,好比對於source -> flatMap -> reduce -> sink 這樣一個數據處理鏈,當source和flatMap知足連接的條件時,能夠能夠將兩個操做符的操做放到一個線程並行執行,這樣能夠減小網絡中的數據傳輸,因爲在source和flatMap之間的傳輸的數據也不用序列化和反序列化,因此也提升了程序的執行效率。

相比流圖(StreamGraph)以及批處理優化計劃(OptimizedPlan),JobGraph發生了一些變化,已經不徹底是「靜態」的數據結構了,由於它加入了中間結果集(IntermediateDataSet)這一「動態」概念。

做業頂點(JobVertex)、中間數據集(IntermediateDataSet)、做業邊(JobEdge)是組成JobGraph的基本元素。這三個對象彼此之間互爲依賴:

  • 一個JobVertex關聯着若干個JobEdge做爲輸入端以及若干個IntermediateDataSet做爲其生產的結果集;每一個JobVertex都有諸如並行度和執行代碼等屬性。
  • 一個IntermediateDataSet關聯着一個JobVertex做爲生產者以及若干個JobEdge做爲消費者;
  • 一個JobEdge關聯着一個IntermediateDataSet可認爲是源以及一個JobVertex可認爲是目標消費者;

那麼JobGraph是怎麼組織並存儲這些元素的呢?其實JobGraph只以Map的形式存儲了全部的JobVertex,鍵是JobVertexID:

private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<JobVertexID, JobVertex>();

至於其它的元素,經過JobVertex均可以根據關係找尋到。須要注意的是,用於迭代的反饋邊(feedback edge)當前並不體如今JobGraph中,而是被內嵌在特殊的JobVertex中經過反饋信道(feedback channel)在它們之間創建關係。

2.5 BSP模型和Superstep

BSP模型

BSP模型是並行計算模型的一種。並行計算模型一般指從並行算法的設計和分析出發,將各類並行計算機(至少某一類並行計算機)的基本特徵抽象出來,造成一個抽象的計算模型。

BSP模型是一種異步MIMD-DM模型(DM: distributed memory,SM: shared memory),BSP模型支持消息傳遞系統,塊內異步並行,塊間顯式同步,該模型基於一個master協調,全部的worker同步(lock-step)執行, 數據從輸入的隊列中讀取。

BSP計算模型不只是一種體系結構模型,也是設計並行程序的一種方法。BSP程序設計準則是總體同步(bulk synchrony),其獨特之處在於超步(superstep)概念的引入。一個BSP程序同時具備水平和垂直兩個方面的結構。從垂直上看,一個BSP程序由一系列串行的超步(superstep)組成。

BSP模型的實現

BSP模型的實現大概舉例以下:

  • Pregel :Google的大規模圖計算框架,首次提出了將BSP模型應用於圖計算,具體請看Pregel——大規模圖處理系統,不過至今未開源。
  • Apache Giraph :ASF社區的Incubator項目,由Yahoo!貢獻,是BSP的java實現,專一於迭代圖計算(如pagerank,最短鏈接等),每個job就是一個沒有reducer過程的hadoop job。
  • Apache Hama :也是ASF社區的Incubator項目,與Giraph不一樣的是它是一個純粹的BSP模型的java實現,而且不僅僅是用於圖計算,意在提供一個通用的BSP模型的應用框架。

Flink-Gelly利用Flink的高效迭代算子來支持海量數據的迭代式圖處理。目前,Flink Gelly提供了「Vertex-Centric」,「Scatter-Gather」以及「Gather-Sum-Apply」等計算模型的實現。

「Vertex-Centric」迭代模型也就是咱們常常聽到的「Pregel」,是一種從Vertex角度出發的圖計算方式。其中,同步地迭代計算的步驟稱之爲「superstep」。在每一個「superstep」中,每一個頂點都執行一個用戶自定義的函數,且頂點之間經過消息進行通訊,當一個頂點知道圖中其餘任意頂點的惟一ID時,該頂點就能夠向其發送一條消息。

可是實際上,KMeans不是圖處理,Alink也沒有基於Flink-Gelly來構建。也許只是借鑑了其概念。因此咱們還須要再探尋。

0x03 Flink的迭代算法(superstep-based)

迭代算法在不少數據分析領域會用到,好比機器學習或者圖計算。爲了從大數據中抽取有用信息,這個時候每每會須要在處理的過程當中用到迭代計算。

所謂迭代運算,就是給定一個初值,用所給的算法公式計算初值獲得一箇中間結果,而後將中間結果做爲輸入參數進行反覆計算,在知足必定條件的時候獲得計算結果。

大數據處理框架不少,好比spark,mr。實際上這些實現迭代計算都是很困難的。

Flink直接支持迭代計算。Flink實現迭代的思路也是很簡單,就是實現一個step函數,而後將其嵌入到迭代算子中去。有兩種迭代操做算子: Iterate和Delta Iterate。兩個操做算子都是在未收到終止迭代信號以前一直調用step函數。

3.1 Bulk Iterate

這種迭代方式稱爲全量迭代,它會將整個數據輸入,通過必定的迭代次數,最終獲得你想要的結果。

迭代操做算子包括了簡單的迭代形式:每次迭代,step函數會消費全量數據(本次輸入和上次迭代的結果),而後計算獲得下輪迭代的輸出(例如,map,reduce,join等)

迭代過程主要分爲如下幾步:

  • Iteration Input(迭代輸入):是初始輸入值或者上一次迭代計算的結果。
  • Step Function(step函數):每次迭代都會執行step函數。它迭代計算DataSet,由一系列的operator組成,好比map,flatMap,join等,取決於具體的業務邏輯。
  • Next Partial Solution(中間結果):每一次迭代計算的結果,被髮送到下一次迭代計算中。
  • Iteration Result(迭代結果):最後一次迭代輸出的結果,被輸出到datasink或者發送到下游處理。

它迭代的結束條件是:

  • 達到最大迭代次數
  • 自定義收斂聚合函數

編程的時候,須要調用iterate(int),該函數返回的是一個IterativeDataSet,固然咱們能夠對它進行一些操做,好比map等。Iterate函數惟一的參數是表明最大迭代次數。

迭代是一個環。咱們須要進行閉環操做,那麼這時候就要用到closeWith(Dataset)操做了,參數就是須要循環迭代的dataset。也能夠可選的指定一個終止標準,操做closeWith(DataSet, DataSet),能夠經過判斷第二個dataset是否爲空,來終止迭代。若是不指定終止迭代條件,迭代就會在迭代了最大迭代次數後終止。

3.2 迭代機制

DataSet API引進了獨特的同步迭代機制(superstep-based),僅限於用在有界的流。

咱們將迭代操做算子的每一個步驟函數的執行稱爲單個迭代。在並行設置中,在迭代狀態的不一樣分區上並行計算step函數的多個實例。在許多設置中,對全部並行實例上的step函數的一次評估造成了所謂的superstep,這也是同步的粒度。所以,迭代的全部並行任務都須要在初始化下一個superstep以前完成superstep。終止準則也將被評估爲superstep同步屏障。

下面是Apache原文

We referred to each execution of the step function of an iteration operator as a single iteration. In parallel setups, multiple instances of the step function are evaluated in parallel on different partitions of the iteration state. In many settings, one evaluation of the step function on all parallel instances forms a so called superstep, which is also the granularity of synchronization. Therefore, all parallel tasks of an iteration need to complete the superstep, before a next superstep will be initialized. Termination criteria will also be evaluated at superstep barriers.

下面是apache原圖

Supersteps

歸納以下:

每次迭代都是一個superstep
    每次迭代中有若干subtask在不一樣的partition上分別執行step
      	 每一個step有一個HeadTask,若干IntermediateTask,一個TailTask
    每一個superstep有一個SynchronizationSinkTask 同步,由於迭代的全部並行任務須要在下一個迭代前完成

由此咱們能夠知道,superstep這是Flink DataSet API的概念,可是你從這裏可以看到BSP模型的影子,好比:

  • 在傳統的BSP模型中,一個superstep被分爲3步: 本地的計算, 消息的傳遞, 同步的barrier.
  • Barrier Synchronization又叫障礙同步或柵欄同步。每一次同步也是一個超步的完成和下一個超步的開始;
  • Superstep超步 是一次計算迭代,從起始每往前步進一層對應一個超步。
  • 程序該何時結束是程序本身控制

0x04 Alink如何使用迭代

KMeansTrainBatchOp.iterateICQ函數中,生成了一個IterativeComQueue,而IterativeComQueue之中就用到了superstep-based迭代。

return new IterativeComQueue()
   .initWithPartitionedData(TRAIN_DATA, data)
   .initWithBroadcastData(INIT_CENTROID, initCentroid)
   .initWithBroadcastData(KMEANS_STATISTICS, statistics)
   .add(new KMeansPreallocateCentroid())
   .add(new KMeansAssignCluster(distance))
   .add(new AllReduce(CENTROID_ALL_REDUCE))
   .add(new KMeansUpdateCentroids(distance))
   .setCompareCriterionOfNode0(new KMeansIterTermination(distance, tol)) // 終止條件
   .closeWith(new KMeansOutputModel(distanceType, vectorColName, latitudeColName, longitudeColName)) 
   .setMaxIter(maxIter) // 迭代最大次數
   .exec();

而BaseComQueue.exec函數中則有:

public DataSet<Row> exec() {
   IterativeDataSet<byte[]> loop // Flink 迭代API
      = loopStartDataSet(executionEnvironment)
      .iterate(maxIter);
     // 後續操做能看出來,以前添加在queue上的好比KMeansPreallocateCentroid,都是在loop之上運行的。
  		if (null == compareCriterion) {
        loopEnd = loop.closeWith...
     	} else {     
        // compare Criterion.
        DataSet<Boolean> criterion = input ... compareCriterion
        loopEnd = loop.closeWith( ... criterion ... )
      }   
}

再仔細研究代碼,咱們能夠看出:

superstep包括:

.add(new KMeansPreallocateCentroid())
.add(new KMeansAssignCluster(distance))
.add(new AllReduce(CENTROID_ALL_REDUCE))
.add(new KMeansUpdateCentroids(distance))

終止標準就是

利用KMeansIterTermination構建了一個RichMapPartitionFunction做爲終止標準。最後結束時候調用 KMeansOutputModel完成業務操做。

最大循環就是

.setMaxIter(maxIter)

因而咱們能夠得出結論,superstep-based Bulk Iterate 迭代算子是用來實現總體KMeans算法,KMeans算法就是一個superstep進行迭代。可是在superstep內容若是須要通信或者柵欄同步,則採用了MPI的allReduce。

0x05 深刻Flink源碼和runtime來驗證

咱們須要深刻到Flink內部去挖掘驗證,若是你們有興趣,能夠參見下面調用棧,本身添加斷點來研究。

execute:56, LocalExecutor (org.apache.flink.client.deployment.executors)
executeAsync:944, ExecutionEnvironment (org.apache.flink.api.java)
execute:860, ExecutionEnvironment (org.apache.flink.api.java)
execute:844, ExecutionEnvironment (org.apache.flink.api.java)
collect:413, DataSet (org.apache.flink.api.java)
sinkFrom:44, PrintBatchOp (com.alibaba.alink.operator.batch.utils)
sinkFrom:20, PrintBatchOp (com.alibaba.alink.operator.batch.utils)
linkFrom:31, BaseSinkBatchOp (com.alibaba.alink.operator.batch.sink)
linkFrom:17, BaseSinkBatchOp (com.alibaba.alink.operator.batch.sink)
link:89, BatchOperator (com.alibaba.alink.operator.batch)
linkTo:239, BatchOperator (com.alibaba.alink.operator.batch)
print:337, BatchOperator (com.alibaba.alink.operator.batch)
main:35, KMeansExample (com.alibaba.alink)

5.1 向Flink提交Job

Alink和Flink構建聯繫,是在print調用中完成的。由於是本地調試,Flink會啓動一個miniCluster,而後會作以下操做。

  • 首先生成執行計劃Plan。Plan以數據流形式來表示批處理程序,但它只是批處理程序最初的表示,而後計劃會被優化以生成更高效的方案OptimizedPlan。
  • 而後,計劃被編譯生成JobGraph。這個圖是要交給flink去生成task的圖。
  • 生成一系列配置。
  • 將JobGraph和配置交給flink集羣去運行。若是不是本地運行的話,還會把jar文件經過網絡發給其餘節點。
  • 以本地模式運行的話,能夠看到啓動過程,如啓動性能度量、web模塊、JobManager、ResourceManager、taskManager等等。

當咱們看到了submitJob調用,就知道KMeans代碼已經和Flink構建了聯繫

@Internal
public class LocalExecutor implements PipelineExecutor {

   public static final String NAME = "local";

   @Override
   public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception {

      // we only support attached execution with the local executor.
      checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));

      final JobGraph jobGraph = getJobGraph(pipeline, configuration);
      final MiniCluster miniCluster = startMiniCluster(jobGraph, configuration);
      final MiniClusterClient clusterClient = new MiniClusterClient(configuration, miniCluster);

      CompletableFuture<JobID> jobIdFuture = clusterClient.submitJob(jobGraph);

      jobIdFuture
            .thenCompose(clusterClient::requestJobResult)
            .thenAccept((jobResult) -> clusterClient.shutDownCluster());

      return jobIdFuture.thenApply(jobID ->
            new ClusterClientJobClientAdapter<>(() -> clusterClient, jobID));
   }

5.2 生成JobGraph

生成jobGraph的具體流程是:

  • IterativeDataSet.closeWith會生成一個BulkIterationResultSet。
  • PrintBatchOp.sinkFrom中會調用到ExecutionEnvironment.executeAsync
  • 調用createProgramPlan構建一個Plan
  • OperatorTranslation.translate函數發現if (dataSet instanceof BulkIterationResultSet),則調用translateBulkIteration(bulkIterationResultSet);
  • 這時候生成了執行計劃Plan
  • ExecutionEnvironment.executeAsync調用LocalExecutor.execute
  • 而後調用FlinkPipelineTranslationUtil.getJobGraph來生成jobGraph
  • GraphCreatingVisitor.preVisit中會判斷 if (c instanceof BulkIterationBase),以生成BulkIterationNode
  • PlanTranslator.translateToJobGraph會調用到JobGraphGenerator.compileJobGraph,最終調用到createBulkIterationHead就生成了迭代處理的Head。
  • 最後將jobGraph提交給Cluster ,jobGraph 變形爲 ExceutionGraph在JM和TM上執行。

5.3 迭代對應的Task

前面代碼中,getJobGraph函數做用是生成了job graph。

而後 JobManager 根據 JobGraph 生成 ExecutionGraph。ExecutionGraph 是 JobGraph 的並行化版本,是調度層最核心的數據結構。

最後 JobManager 根據 ExecutionGraph 對 Job 進行調度後,在各個TaskManager 上部署 Task。

因此咱們須要看看最終運行時候,迭代API對應着哪些Task。

針對IterativeDataSet,即superstep-based Bulk Iterate,Flink生成了以下的task。

  • IterationHeadTask
  • IterationIntermediateTask
  • IterationTailTask
  • IterationSynchronizationSinkTask

5.3.1 IterationHeadTask

IterationHeadTask主要做用是協調一次迭代。

它會讀取初始輸入,和迭代Tail創建一個BlockingBackChannel。在成功處理輸入以後,它會發送EndOfSuperstep事件給本身的輸出。它在每次superstep以後會聯繫 synchronization task,等到本身收到一個用來同步的AllWorkersDoneEvent。AllWorkersDoneEvent表示全部其餘的heads已經完成了本身的迭代。

下一次迭代時候,上一次迭代中tail的輸出就經由backchannel傳輸,造成了head的輸入。什麼時候進入到下一個迭代,是由HeadTask完成的。一旦迭代完成,head將發送TerminationEvent給全部和它關聯的task,告訴他們shutdown。

barrier.waitForOtherWorkers();

				if (barrier.terminationSignaled()) {
					requestTermination();
					nextStepKickoff.signalTermination();
				} else {
					incrementIterationCounter();
					String[] globalAggregateNames = barrier.getAggregatorNames();
					Value[] globalAggregates = barrier.getAggregates();
					aggregatorRegistry.updateGlobalAggregatesAndReset(globalAggregateNames, globalAggregates);
          // 在這裏發起下一次Superstep。
					nextStepKickoff.triggerNextSuperstep();
				}
			}

IterationHeadTask是在JobGraphGenerator.createBulkIterationHead中構建的。其例子以下:

"PartialSolution (Bulk Iteration) (org.apache.flink.runtime.iterative.task.IterationHeadTask)"

5.3.2 IterationIntermediateTask

IterationIntermediateTask是superstep中間段的task,其將傳輸EndOfSuperstepEvent和TerminationEvent給全部和它關聯的tasks。此外,IterationIntermediateTask能更新the workset或者the solution set的迭代狀態。

若是迭代狀態被更新,本task的輸出將傳送回IterationHeadTask,在這種狀況下,本task將做爲head再次被安排。

IterationIntermediateTask的例子以下:

"MapPartition (computation@KMeansUpdateCentroids) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)"
   
 "Combine (SUM(0), at kMeansPlusPlusInit(KMeansInitCentroids.java:135) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)"
   
 "MapPartition (AllReduceSend) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)"
   
"Filter (Filter at kMeansPlusPlusInit(KMeansInitCentroids.java:130)) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)"

5.3.3 IterationTailTask

IterationTailTask是迭代的最末尾。若是迭代狀態被更新,本task的輸出將經過BlockingBackChannel傳送回IterationHeadTask,反饋給迭代頭就意味着一個迭代完整邏輯的完成,那麼就能夠關閉這個迭代閉合環了。這種狀況下,本task將在head所在的實例上從新被調度。

這裏有幾個關鍵點須要注意:

如何和Head創建聯繫

Flink有一個BlockingQueueBroker類,這是一個阻塞式的隊列代理,它的做用是對迭代併發進行控制。Broker是單例的,迭代頭任務和尾任務會生成一樣的broker ID,因此頭尾在同一個JVM中會基於相同的dataChannel進行通訊。dataChannel由迭代頭建立。

IterationHeadTask中會生成BlockingBackChannel,這是一個容量爲1的阻塞隊列。

// 生成channel
BlockingBackChannel backChannel = new BlockingBackChannel(new SerializedUpdateBuffer(segments, segmentSize, this.getIOManager())); 

// 而後block在這裏,等待Tail
superstepResult = backChannel.getReadEndAfterSuperstepEnded();

IterationTailTask則是以下:

// 在基類獲得channel,由於是單例,因此會獲得同一個
worksetBackChannel = BlockingBackChannelBroker.instance().getAndRemove(brokerKey());

// notify iteration head if responsible for workset update 在這裏通知Head
worksetBackChannel.notifyOfEndOfSuperstep();

而二者都是利用以下辦法來創建聯繫,在同一個subtask中會使用同一個brokerKey,這樣首尾就聯繫起來了。

public String brokerKey() {
    if (this.brokerKey == null) {
        int iterationId = this.config.getIterationId();
        this.brokerKey = this.getEnvironment().getJobID().toString() + '#' + iterationId + '#' + this.getEnvironment().getTaskInfo().getIndexOfThisSubtask();
    }

    return this.brokerKey;
}
如何把用戶返回的數值傳給Head

這是經過output.collect來完成的。

首先,在Tail初始化時候,會生成一個outputCollector,這個outputCollector會被設置爲本task的輸出outputCollector。這樣就保證了用戶函數的輸出都會轉流到outputCollector。

而outputCollector的輸出就是worksetBackChannel的輸出,這裏設置爲同一個instance。這樣用戶輸出就輸出到backChannel中。

@Override
	protected void initialize() throws Exception {
		super.initialize();
    
		// set the last output collector of this task to reflect the iteration tail state update:
		// a) workset update,
		// b) solution set update, or
		// c) merged workset and solution set update

		Collector<OT> outputCollector = null;
		if (isWorksetUpdate) {
      // 生成一個outputCollector
			outputCollector = createWorksetUpdateOutputCollector();

			// we need the WorksetUpdateOutputCollector separately to count the collected elements
			if (isWorksetIteration) {
				worksetUpdateOutputCollector = (WorksetUpdateOutputCollector<OT>) outputCollector;
			}
		}
    
    ......
    // 把outputCollector設置爲本task的輸出
		setLastOutputCollector(outputCollector);
	}

outputCollector的輸出就是worksetBackChannel的輸出buffer,這裏設置爲同一個instance。

protected Collector<OT> createWorksetUpdateOutputCollector(Collector<OT> delegate) {
		DataOutputView outputView = worksetBackChannel.getWriteEnd();
		TypeSerializer<OT> serializer = getOutputSerializer();
		return new WorksetUpdateOutputCollector<OT>(outputView, serializer, delegate);
	}

運行時候以下:

@Override
	public void run() throws Exception {

		SuperstepKickoffLatch nextSuperStepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());

		while (this.running && !terminationRequested()) {

      // 用戶在這裏輸出,最後會輸出到output.collect,也就是worksetBackChannel的輸出buffer。
			super.run();

      // 這時候以及輸出到channel完畢,只是通知head進行讀取。
			if (isWorksetUpdate) {
				// notify iteration head if responsible for workset update
				worksetBackChannel.notifyOfEndOfSuperstep();
			} else if (isSolutionSetUpdate) {
				// notify iteration head if responsible for solution set update
				solutionSetUpdateBarrier.notifySolutionSetUpdate();
			}

      ...
	}

IterationTailTask例子以下:

"Pipe (org.apache.flink.runtime.iterative.task.IterationTailTask)"

5.3.4 IterationSynchronizationSinkTask

IterationSynchronizationSinkTask做用是同步全部的iteration heads,IterationSynchronizationSinkTask被是實現成一個 output task。其只是用來協調,不處理任何數據。

在每一次superstep,IterationSynchronizationSinkTask只是等待直到它從每個head都收到一個WorkerDoneEvent。這表示下一次superstep能夠開始了。

這裏須要注意的是 SynchronizationSinkTask 如何等待各個並行度的headTask。好比Flink的並行度是5,那麼SynchronizationSinkTask怎麼作到等待這5個headTask。

在IterationSynchronizationSinkTask中,註冊了SyncEventHandler來等待head的WorkerDoneEvent。

this.eventHandler = new SyncEventHandler(numEventsTillEndOfSuperstep, this.aggregators, this.getEnvironment().getUserClassLoader());
this.headEventReader.registerTaskEventListener(this.eventHandler, WorkerDoneEvent.class);

在SyncEventHandler中,咱們能夠看到,在構建時候,numberOfEventsUntilEndOfSuperstep就被設置爲並行度,每次收到一個WorkerDoneEvent,workerDoneEventCounter就遞增,當等於numberOfEventsUntilEndOfSuperstep,即並行度時候,就說明本次superstep中,全部headtask都成功了。

private void onWorkerDoneEvent(WorkerDoneEvent workerDoneEvent) {
        if (this.endOfSuperstep) {
            throw new RuntimeException("Encountered WorderDoneEvent when still in End-of-Superstep status.");
        } else {
          // 每次遞增
            ++this.workerDoneEventCounter;
            String[] aggNames = workerDoneEvent.getAggregatorNames();
            Value[] aggregates = workerDoneEvent.getAggregates(this.userCodeClassLoader);
            if (aggNames.length != aggregates.length) {
                throw new RuntimeException("Inconsistent WorkerDoneEvent received!");
            } else {
                for(int i = 0; i < aggNames.length; ++i) {
                    Aggregator<Value> aggregator = (Aggregator)this.aggregators.get(aggNames[i]);
                    aggregator.aggregate(aggregates[i]);
                }

              // numberOfEventsUntilEndOfSuperstep就是並行度,等於並行度時候就說明全部head都成功了。
                if (this.workerDoneEventCounter % this.numberOfEventsUntilEndOfSuperstep == 0) {
                    this.endOfSuperstep = true;
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

IterationSynchronizationSinkTask的例子以下:

"Sync (BulkIteration (Bulk Iteration)) (org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask)"

5.4 superstep

綜上所述,咱們最終獲得superstep以下:

***** 文字描述以下 *****
  
每次迭代都是一個superstep
  每次迭代中有若干subtask在不一樣的partition上分別執行step
     每一個step有一個HeadTask,若干IntermediateTask,一個TailTask
  每一個superstep有一個SynchronizationSinkTask
  
***** 僞代碼大體以下 *****
  
for maxIter :
  begin superstep
      for maxSubTask :
         begin step
           IterationHeadTask
           IterationIntermediateTask
           IterationIntermediateTask
           ...
           IterationIntermediateTask
           IterationIntermediateTask
           IterationTailTask
         end step
    IterationSynchronizationSinkTask
  end superstep

0x06 結合KMeans代碼看superset

6.1 K-means算法概要

K-means算法的過程,爲了儘可能不用數學符號,因此描述的不是很嚴謹,大概就是這個意思,「物以類聚、人以羣分」:

  1. 首先輸入k的值,即咱們但願將數據集通過聚類獲得k個分組。
  2. 從數據集中隨機選擇k個數據點做爲初始大哥(質心,Centroid)
  3. 對集合中每個小弟,計算與每個大哥的距離(距離的含義後面會講),離哪一個大哥距離近,就跟定哪一個大哥。
  4. 這時每個大哥手下都彙集了一票小弟,這時候召開人民表明大會,每一羣選出新的大哥(實際上是經過算法選出新的質心)。
  5. 若是新大哥和老大哥之間的距離小於某一個設置的閾值(表示從新計算的質心的位置變化不大,趨於穩定,或者說收斂),能夠認爲咱們進行的聚類已經達到指望的結果,算法終止。
  6. 若是新大哥和老大哥距離變化很大,須要迭代3~5步驟。

6.2 KMeansPreallocateCentroid

KMeansPreallocateCentroid也是superstep一員,可是隻有context.getStepNo() == 1的時候,纔會進入實際業務邏輯,預分配Centroid。當superstep爲大於1的時候,本task會執行,但不會進入具體業務代碼。

public class KMeansPreallocateCentroid extends ComputeFunction {
    private static final Logger LOG = LoggerFactory.getLogger(KMeansPreallocateCentroid.class);

    @Override
    public void calc(ComContext context) {
        // 每次superstep都會進到這裏
        LOG.info("  KMeansPreallocateCentroid 我每次都會進的呀   ");
        if (context.getStepNo() == 1) {
          // 實際預分配業務只進入一次
        }
    }
}

6.3 KMeansAssignCluster 和 KMeansUpdateCentroids

KMeansAssignCluster 做用是爲每一個點(point)計算最近的聚類中心,爲每一個聚類中心的點座標的計數和求和。

KMeansUpdateCentroids 做用是基於計算出來的點計數和座標,計算新的聚類中心。

Alink在整個計算過程當中維護一個特殊節點來記住待求中心點當前的結果。

這就是爲啥迭代時候須要區分奇數次和偶數次的緣由了。奇數次就表示老大哥,偶數次就表示新大哥。每次superstep只會計算一批大哥,留下另一批大哥作距離比對。

另外要注意的一點是:普通的迭代計算,是經過Tail給Head回傳用戶數據,可是KMeans這裏的實現並無採用這個辦法,而是把計算出來的中心點都存在共享變量中,在各個intermediate之間互相交互。

public class KMeansAssignCluster extends ComputeFunction {
    public void calc(ComContext context) {
        ......
        if (context.getStepNo() % 2 == 0) {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
        } else {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
        }
      /** 具體業務邏輯代碼
       * Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster.
       */
    }
}

public class KMeansUpdateCentroids extends ComputeFunction {
    public void calc(ComContext context) {
        if (context.getStepNo() % 2 == 0) {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
        } else {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
        }
      /** 具體業務邏輯代碼
       * Update the centroids based on the sum of points and point number belonging to the same cluster.
       */
    }

6.4 KMeansOutputModel

這裏要特殊說明,由於KMeansOutputModel是最終輸出模型,而KMeans算法的實現是:全部subtask都擁有全部中心點,就是說全部subtask都會有相同的模型,就沒有必要所有輸出,因此這裏限定了第一個subtask才能輸出,其餘的都不輸出。

@Override
	public List <Row> calc(ComContext context) {
    // 只有第一個subtask才輸出模型數據。
		if (context.getTaskId() != 0) {
			return null;
		}

    ....
      
		modelData.params = new KMeansTrainModelData.ParamSummary();
		modelData.params.k = k;
		modelData.params.vectorColName = vectorColName;
		modelData.params.distanceType = distanceType;
		modelData.params.vectorSize = vectorSize;
		modelData.params.latitudeColName = latitudeColName;
		modelData.params.longtitudeColName = longtitudeColName;

		RowCollector collector = new RowCollector();
		new KMeansModelDataConverter().save(modelData, collector);
		return collector.getRows();
	}

0x07 參考

幾種並行計算模型的區別(BSP LogP PRAM)

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/iterations.html
聚類、K-Means、例子、細節

Flink-Gelly:Iterative Graph Processing

從BSP模型到Apache Hama

Flink DataSet迭代運算

幾種並行計算模型的區別(BSP LogP PRAM)

Flink架構,源碼及debug

Flink 之 Dataflow、Task、subTask、Operator Chains、Slot 介紹

Flink 任務和調度

Flink運行時之生成做業圖

相關文章
相關標籤/搜索