Alink 是阿里巴巴基於實時計算引擎 Flink 研發的新一代機器學習算法平臺,是業界首個同時支持批式算法、流式算法的機器學習平臺。本文將帶領你們來分析Alink中通信模型AllReduce的實現。AllReduce在Alink中應用較多,好比KMeans,LDA,Word2Vec,GD,lbfgs,Newton method,owlqn,SGD,Gbdt, random forest都用到了這個通信模型。java
由於Alink的公開資料太少,因此如下均爲自行揣測,確定會有疏漏錯誤,但願你們指出,我會隨時更新。git
MPI(Message-Passing Interface)是一個跨語言的通信協議,用於編寫並行計算,支持點對點和廣播。算法
MPI的目標是高性能、大規模性和可移植性。MPI在今天仍爲高性能計算的主要模型。apache
其特色是編程
A partitioned address space 每一個線程只能經過調用api去讀取非本地數據。全部的交互(Non-local Memory)都須要協同進行(握手)。api
Supports only explicit parallelization 只支持顯性的並行化,用戶必須明確的規定消息傳遞的方式。數組
AllReduce是MPI提供的一個基本原語,咱們須要先了解reduce才能更好理解AllReduce。sass
MPI_Allreduce和MPI_Reduce的一個區別就是,MPI_Reduce函數將最後的結果只傳給了指定的dest_process 號進程,而MPI_Allreduce函數能夠將結果傳遞給全部的進程,所以全部的進程都能接收到結果。MPI_Allreduce函數的原型也所以不須要指定目標進程號。網絡
AllReduce在Alink中應用較多,好比KMeans,LDA,Word2Vec,GD,lbfgs,Newton method,owlqn,SGD,Gbdt, random forest都用到了這個通信模型。session
AllReduce在算法實現中起到了承上啓下的關鍵做用,即把原來串行跑的並行task強制打斷,把計算結果進行彙總再分發,讓串行繼續執行。有一點相似你們熟悉的併發中的Barrier。
對比Flink原生KMeans算法,咱們能看到AllReduce對應的是 groupBy(0).reduce
。只有全部數據都產生以後,才能作groupBy操做。
DataSet<Centroid> newCentroids = points // compute closest centroid for each point .map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids") // count and sum point coordinates for each centroid .map(new CountAppender()) // 這裏若是是Alink,就對應了AllReduce .groupBy(0).reduce(new CentroidAccumulator()) // compute new centroids from point counts and coordinate sums .map(new CentroidAverager());
從AllReduce的註解中咱們能夠清晰的看出Alink實現MPI的思想。
* An implement of {@link CommunicateFunction} that do the AllReduce. * * AllReduce is a communication primitive widely used in MPI. In our implementation, all workers do reduce on a partition of the whole data and they all get the final reduce result. * * There're mainly three stages: * 1. All workers send the there partial data to other workers for reduce. * 2. All workers do reduce on all data it received and then send partial results to others. * 3. All workers merge partial results into final result and put it into session context with pre-defined object name. */
翻譯以下:
全部的workers都在部分數據上作reduce操做,全部的workers均可以獲取到reduce最終結果 主要有三個階段: 1. 全部workers給其餘workers發送須要reduce的部分數據 2. 全部workers在它收到的數據上作reduce,而後把這個部分reduce的結果發送給其餘workers 3. 全部workers把部分reduce的結果合併成爲最終結果,而後放入預約義的session 上下文變量中
"紙上得來終覺淺,絕知此事要躬行。"
Alink爲了實現AllReduce,在背後作了大量的工做,下面咱們一一剖析。
共享是實現AllReduce的第一要務,由於在歸併/廣播過程當中須要元數據和輸入輸出,若是有共享變量就能夠極大簡化實現。咱們下面就看看Alink如何經過task manager實現共享。
Flink 中的程序本質上是並行的。在執行期間,每個算子(Transformation)都有一個或多個算子subTask(Operator SubTask),每一個算子的 subTask 之間都是彼此獨立,並在不一樣的線程中執行,而且可能在不一樣的機器或容器上執行。
同一個application,多個不一樣 task的 subTask,能夠運行在同一個 slot 資源槽中。同一個 task 中的多個的 subTask,不能運行在一個 slot 資源槽中,他們能夠分散到其餘的資源槽中。對應到後面就是:AllReduceSend的多個並行度實例都不能運行在同一個slot中。
Flink 中每個 TaskManager 都是一個JVM進程,它可能會在獨立的線程上執行一個或多個 subtask。TaskManager 至關於整個集羣的 Slave 節點,負責具體的任務執行和對應任務在每一個節點上的資源申請和管理。
TaskManager爲了對資源進行隔離和增長容許的task數,引入了slot的概念,這個slot對資源的隔離僅僅是對內存進行隔離,策略是均分。一個 TaskManager 至少有一個 slot。若是一個TM有N個Slot,則每一個Slot分配到的Memory大小爲整個TM Memory的1/N,同一個TM內的Slots只有Memory隔離,CPU是共享的。
客戶端經過將編寫好的 Flink 應用編譯打包,提交到 JobManager,而後 JobManager 會根據已註冊在 JobManager 中 TaskManager 的資源狀況,將任務分配給有資源的 TaskManager節點,而後啓動並運行任務。
TaskManager 從 JobManager 接收須要部署的任務,而後使用 Slot 資源啓動 Task,創建數據接入的網絡鏈接,接收數據並開始數據處理。同時 TaskManager 之間的數據交互都是經過數據流的方式進行的。
Flink 的任務運行實際上是採用多線程的方式,一個TaskManager(TM)在多線程中併發執行多個task。這和 MapReduce 多 JVM 進行的方式有很大的區別,Flink 可以極大提升 CPU 使用效率,在多個任務和 Task 之間經過 TaskSlot 方式共享系統資源,每一個 TaskManager 中經過管理多個 TaskSlot 資源池進行對資源進行有效管理。
對應到後面就是:在一個TaskManager中間運行的多個並行的AllReduceSend實例都會共享這個TaskManager中全部靜態變量。
Alink就是利用task manager的靜態變量實現了變量共享。其中有幾個主要類和概念比較複雜。咱們從上到下進行講解,能看到隨着從上到下,須要的標示和狀態逐漸增長。
從上往下調用層次以下:
用戶代碼調用 : context.getObj(bufferName); 這樣對用戶是最理想的,由於對於用戶來講知道變量名字就能夠通過上下文來存取。
可是ComContext則須要知道更多,好比還須要知道 本身對應的sessioin和taskID,具體下面會說明。
ComContext如此向下調用 : SessionSharedObjs.put(objName, sessionId, taskId, obj);
IterativeComQueue 是一個框架概念。以Kmeans爲例,就是Kmeans算法對應了若干IterativeComQueue。
IterativeComQueue上擁有衆多compute/communicate function,每一個function都應該知道本身屬於哪個IterativeComQueue,如何和本Queue上其餘function進行通訊,不能和其餘Queue上搞混了。這樣就須要有一個概念來表標示這個Queue。因而就有了下面Session概念。
爲了區分每一個IterativeComQueue,就產生了session這個概念。這樣IterativeComQueue上全部compute/communicate function都會綁定同一個session id,同一個IterativeComQueue上的全部function之間能夠通訊。
一個 IterativeComQueue 對應一個session,因此<"變量名" + sessionId>就對應了這個 session 能訪問的某個變量。
SessionSharedObjs 包含靜態成員變量 :
正常來講 "某個名字的變量" 對應 "某個變量handle" 便可。即一個session中某個變量名 對應某個變量handle。可是Flink中,會有多個subtask並行操做的狀態,這樣就須要有一個新的概念來標示subtask對應的變量,這個變量應該和taskId有所關聯。因而就有了下面的state概念。
SessionSharedObjs向下調用 : IterTaskObjKeeper.put(handle, taskId, obj);
這裏就是用靜態變量來實現共享。是task manager中全部的 tasks (threads)均可以訪問的共享變量實例。
IterTaskObjKeeper 包含靜態成員變量 :
在Flink中,一個算法會被多個subtask並行操做。若是隻有一個handle,那麼多個subtask共同訪問,就會有你們都熟知的各類多線程操做問題。因此Alink這裏將handle拆分爲多個state。從subtask角度看,每一個state用<handle, taskId>來惟一標示。
總結一下,就是對於一樣一個變量名字,每一個subtask對應的共享state其實都是獨立的,你們互不干擾。共享其實就是在這個subtask上跑的各個operator之間共享。
從實際執行的變量中,咱們能夠有一個更加清楚的認識。
// 能看出來 session 0 中,centroidAllReduce這個變量 對應的handle是 7 SessionSharedObjs.key2Handle = {HashMap@10480} size = 9 {Tuple2@10492} "(initCentroid,0)" -> {Long@10493} 1 {Tuple2@10494} "(statistics,0)" -> {Long@10495} 2 {Tuple2@10496} "(362158a2-588b-429f-b848-c901a1e15e17,0)" -> {Long@10497} 8 {Tuple2@10498} "(k,0)" -> {Long@10499} 6 {Tuple2@10500} "(centroidAllReduce,0)" -> {Long@10501} 7 // 這裏就是所說的 {Tuple2@10502} "(trainData,0)" -> {Long@10503} 0 {Tuple2@10504} "(vectorSize,0)" -> {Long@10505} 3 {Tuple2@10506} "(centroid2,0)" -> {Long@10507} 5 {Tuple2@10508} "(centroid1,0)" -> {Long@10509} 4 // 下面能看出來,handle 7 這一種變量,由於有 4 個subtask,因此細分爲4個state。 com.alibaba.alink.common.comqueue.IterTaskObjKeeper.states = {HashMap@10520} size = 36 {Tuple2@10571} "(7,0)" -> {double[15]@10572} {Tuple2@10573} "(7,1)" -> {double[15]@10574} {Tuple2@10577} "(7,2)" -> {double[15]@10578} {Tuple2@10581} "(7,3)" -> {double[15]@10582} {Tuple2@10575} "(5,0)" -> {Tuple2@10576} "(10,com.alibaba.alink.operator.common.distance.FastDistanceMatrixData@29a72fbb)" {Tuple2@10579} "(5,1)" -> {Tuple2@10580} "(10,com.alibaba.alink.operator.common.distance.FastDistanceMatrixData@26c52354)" {Tuple2@10585} "(5,2)" -> {Tuple2@10586} "(10,com.alibaba.alink.operator.common.distance.FastDistanceMatrixData@7c6ed779)" {Tuple2@10588} "(5,3)" -> {Tuple2@10589} "(10,com.alibaba.alink.operator.common.distance.FastDistanceMatrixData@154b8a4d)"
下面讓咱們結合代碼,一一解析涉及的類。
ComContext 是最上層類,用來獲取runtime信息和共享變量。IterativeComQueue(BaseComQueue )上全部的compute/communicate function都經過 ComContext 來訪問共享變量。好比:
public class BaseComQueue<Q extends BaseComQueue<Q>> implements Serializable { // 每個BaseComQueue都會獲得惟一一個sessionId。 private final int sessionId = SessionSharedObjs.getNewSessionId(); int taskId = getRuntimeContext().getIndexOfThisSubtask(); public void mapPartition(Iterable<byte[]> values, Collector<byte[]> out) { // 獲取到了一個ComContext ComContext context = new ComContext(sessionId, getIterationRuntimeContext()); if (getIterationRuntimeContext().getSuperstepNumber() == maxIter || criterion) { // 利用ComContext繼續訪問共享變量 List<Row> model = completeResult.calc(context); } } } // 用戶相似這麼調用 double[] sendBuf = context.getObj(bufferName);
能夠看出來,ComContext 就是用戶應該看到的最頂層上下文概念。 taskId, sessionId 是使用關鍵。
/** * Encapsulates task-specific information: name, index of subtask, parallelism and attempt number. */ @Internal public class TaskInfo { /** * Gets the number of this parallel subtask. The numbering starts from 0 and goes up to parallelism-1 (parallelism as returned by {@link #getNumberOfParallelSubtasks()}). * * @return The index of the parallel subtask. */ public int getIndexOfThisSubtask() { return this.indexOfSubtask; // 這裏獲取taskId } }
ComContext 具體類定義以下
/** * Context used in BaseComQueue to access basic runtime information and shared objects. */ public class ComContext { private final int taskId; private final int numTask; private final int stepNo; private final int sessionId; public ComContext(int sessionId, IterationRuntimeContext runtimeContext) { this.sessionId = sessionId; this.numTask = runtimeContext.getNumberOfParallelSubtasks(); this.taskId = runtimeContext.getIndexOfThisSubtask(); this.stepNo = runtimeContext.getSuperstepNumber(); } /** * Put an object into shared objects for access of other QueueItem of the same taskId. * * @param objName object name * @param obj object itself. */ public void putObj(String objName, Object obj) { SessionSharedObjs.put(objName, sessionId, taskId, obj); } } // 好比具體舉例以下 this = {ComContext@10578} taskId = 4 numTask = 8 stepNo = 1 sessionId = 0
SessionSharedObjs是再下一層的類,維護shared session objects, 這個session 共享是經過 sessionId 作到的。
SessionSharedObjs 維護了一個靜態類變量 sessionId,由此區分各個Session。
SessionSharedObjs核心是 HashMap<Tuple2<String, Integer>, Long> key2Handle
。即 <"變量名" + sessionId> ---> <真實變量 handle> 的一個映射。
一個 IterativeComQueue 對應一個session,因此<"變量名" + sessionId>就對應了這個 IterativeComQueue 能訪問的某個變量,正常來講有一個變量handle便可。
可是由於一個 IterativeComQueue會被若干subtask並行執行,因此爲了互斥和區分,因此每一個handle又細分爲若干state,每一個state用<handle, taskId>來惟一標示。在下面會提到。
/** * An static class that manage shared objects for {@link BaseComQueue}s. */ class SessionSharedObjs implements Serializable { private static HashMap<Tuple2<String, Integer>, Long> key2Handle = new HashMap<>(); private static int sessionId = 0; private static ReadWriteLock rwlock = new ReentrantReadWriteLock(); /** * Get a new session id. * All access operation should bind with a session id. This id is usually shared among compute/communicate function of an {@link IterativeComQueue}. * * @return new session id. */ synchronized static int getNewSessionId() { return sessionId++; } static void put(String objName, int session, int taskId, Object obj) { rwlock.writeLock().lock(); try { Long handle = key2Handle.get(Tuple2.of(objName, session)); if (handle == null) { handle = IterTaskObjKeeper.getNewHandle(); key2Handle.put(Tuple2.of(objName, session), handle); } // 這裏進行調用。taskId也是辨識關鍵。 IterTaskObjKeeper.put(handle, taskId, obj); } finally { rwlock.writeLock().unlock(); } } }
這是最底層的共享類,是在task manager進程的堆內存上的一個靜態實例。task manager的全部task (threads) 均可以分享。
看源碼可知,IterTaskObjKeeper 是經過一個靜態變量states實現了在整個JVM內共享。而具體內容是由 'handle' and 'taskId' 來共同決定。
IterTaskObjKeeper維持了 handle 遞增來做爲 「變量state」 的惟一種類標識。
用<handle, taskId>來做爲「變量state」的惟一標識。這個就是在 task manager process 堆內存中被你們共享的變量。
即handle表明哪種變量state,<handle, taskId>表示這種變量中,對應哪個task的哪個變量。 這是針對task的一種細分。
/** * A 'state' is an object in the heap memory of task manager process, * shared across all tasks (threads) in the task manager. * Note that the 'state' is shared by all tasks on the same task manager, * users should guarantee that no two tasks modify a 'state' at the same time. * A 'state' is identified by 'handle' and 'taskId'. */ public class IterTaskObjKeeper implements Serializable { private static Map <Tuple2 <Long, Integer>, Object> states; /** * A 'handle' is a unique identifier of a state. */ private static long handle = 0L; private static ReadWriteLock rwlock = new ReentrantReadWriteLock(); static { states = new HashMap <>(); } /** * @note Should get a new handle on the client side and pass it to transformers. */ synchronized public static long getNewHandle() { return handle++; } public static void put(long handle, int taskId, Object state) { rwlock.writeLock().lock(); try { states.put(Tuple2.of(handle, taskId), state); } finally { rwlock.writeLock().unlock(); } } }
咱們示例代碼依然以下。
static DataSet <Row> iterateICQ(...省略...) { return new IterativeComQueue() .initWithPartitionedData(TRAIN_DATA, data) .initWithBroadcastData(INIT_CENTROID, initCentroid) .initWithBroadcastData(KMEANS_STATISTICS, statistics) .add(new KMeansPreallocateCentroid()) .add(new KMeansAssignCluster(distance)) .add(new AllReduce(CENTROID_ALL_REDUCE)) .add(new KMeansUpdateCentroids(distance)) .setCompareCriterionOfNode0(new KMeansIterTermination(distance, tol)) .closeWith(new KMeansOutputModel(distanceType, vectorColName, latitudeColName, longitudeColName)) .setMaxIter(maxIter) .exec(); }
Alink的AllReduce主要代碼摘取以下:
public static <T> DataSet <T> allReduce( return input .mapPartition(new AllReduceSend <T>(bufferName, lengthName, transferBufferName, sessionId)) .withBroadcastSet(input, "barrier") .returns( new TupleTypeInfo <>(Types.INT, Types.INT, PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO)) .name("AllReduceSend") .partitionCustom(new Partitioner <Integer>() { @Override public int partition(Integer key, int numPartitions) { return key; } }, 0) .name("AllReduceBroadcastRaw") .mapPartition(new AllReduceSum(bufferName, lengthName, sessionId, op)) .returns( new TupleTypeInfo <>(Types.INT, Types.INT, PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO)) .name("AllReduceSum") .partitionCustom(new Partitioner <Integer>() { @Override public int partition(Integer key, int numPartitions) { return key; } }, 0) .name("AllReduceBroadcastSum") .mapPartition(new AllReduceRecv <T>(bufferName, lengthName, sessionId)) .returns(input.getType()) .name("AllReduceRecv"); }
結合上面具體代碼,咱們先總結AllReduce使用流程以下
KMeansAssignCluster :Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster。而後把本身計算出來的cluster 寫入到本身 task manager 的 CENTROID_ALL_REDUCE。
每一個AllReduceSend 從本身task manager的CENTROID_ALL_REDUCE中取出以前存入的 cluster(每一個AllReduceSend獲取的cluster都是隻有本身能看到的),而後發送給下游task。發送時根據 "下游task index 和 數據量" 來決定往哪些task發送。這裏要注意的是:具體給哪個task發送變量的哪一部分,是依據那個task 的 task index 和數據量 來計算出來的。這個計算機制(如何計算在代碼中,也有部分做爲元信息隨着數據一塊兒發送)被後面的AllReduceRecv複用。
每一個 AllReduceSum 接收到 AllReduceSend 發送過來的 cluster,計算求和,而後把計算結果再發送出去。每個AllReduceSum 都是把本身計算求和出來的數據統一發給每個下游task。
每一個 AllReduceRecv 都接收到 全部 AllReduceSum 發送過來的(求和以後的)cluster。存入到共享變量CENTROID_ALL_REDUCE。具體如何存就複用AllReduceSend計算機制,這樣存到共享變量的什麼地方就互相不會衝突。能夠理解爲merge操做:好比有5個AllReduce,每一個AllReduce的數據都發給了5個AllReduceRecv,每一個AllReduceRecv接到這5份數據以後,會根據本身的subtask index寫到本身對應的state中,可是這5份數據分別寫在state什麼地方都是在數據元信息中指定的,彼此不會有寫的衝突,這樣每一個AllReduceRecv就擁有了所有5份數據。
KMeansUpdateCentroids :取出CENTROID_ALL_REDUCE變量,而後Update the centroids based on the sum of points and point number belonging to the same cluster
該類的做用是:爲每一個點(point)計算最近的聚類中心,爲每一個聚類中心的點座標的計數和求和。
咱們能夠看出,KMeansAssignCluster 經過ComContext存儲了CENTROID_ALL_REDUCE,爲後續AllReduce使用。假若有5個KMeansAssignCluster,則他們計算出來的結果通常來講各不相同。雖然存儲同一個變量名CENTROID_ALL_REDUCE,可是其state各不相同。
由於這5個KMeansAssignCluster勢必對應了5個subtask,則其在共享變量中的<handle, taskId>必不相同,則對應不一樣的state,因此分開存儲。
// Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster. public class KMeansAssignCluster extends ComputeFunction { // 存取共享變量 double[] sumMatrixData = context.getObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE); if (sumMatrixData == null) { sumMatrixData = new double[k * (vectorSize + 1)]; context.putObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE, sumMatrixData); } for (FastDistanceVectorData sample : trainData) { // Find the closest centroid from centroids for sample, and add the sample to sumMatrix. KMeansUtil.updateSumMatrix(sample, 1, stepNumCentroids.f1, vectorSize, sumMatrixData, k, fastDistance, distanceMatrix); } } // 程序中各個變量以下 sample = {FastDistanceVectorData@13274} vector = {DenseVector@13281} "6.3 2.5 4.9 1.5" label = {DenseVector@13282} "72.2" rows = {Row[1]@13283} // 這個就是共享變量。4維向量 + 1 weight ---> 都是"sample和"。 sumMatrixData = {double[15]@10574} 0 = 23.6 1 = 14.9 2 = 8.7 3 = 1.7000000000000002 4 = 5.0 5 = 52.400000000000006 6 = 25.1 7 = 39.699999999999996 8 = 13.299999999999999 9 = 9.0 10 = 33.0 11 = 16.9 12 = 28.900000000000002 13 = 11.4 14 = 5.0 trainData = {ArrayList@10580} size = 19 0 = {FastDistanceVectorData@10590} vector = {DenseVector@10595} "7.7 3.8 6.7 2.2" data = {double[4]@10601} 0 = 7.7 1 = 3.8 2 = 6.7 3 = 2.2 label = {DenseVector@10596} "123.46000000000001" rows = {Row[1]@10597} 1 = {FastDistanceVectorData@10603} vector = {DenseVector@10623} "5.7 2.8 4.1 1.3" label = {DenseVector@10624} "58.83" rows = {Row[1]@10625} 2 = {FastDistanceVectorData@10604} 3 = {FastDistanceVectorData@10605} ...... 17 = {FastDistanceVectorData@10619} 18 = {FastDistanceVectorData@10620} vector = {DenseVector@10654} "6.5 3.0 5.2 2.0" label = {DenseVector@10655} "82.29" rows = {Row[1]@10656}
這裏須要再把代碼摘錄一遍,主要是由於有withBroadcastSet。其做用是:
return input .mapPartition(new AllReduceSend <T>(bufferName, lengthName, transferBufferName, sessionId)) .withBroadcastSet(input, "barrier")
KMeansAssignCluster 會往上下文的變量centroidAllReduce中添加數據。因此 AllReduce 其實就是在等待這個變量。
AllReduce的第一步就是從上下文中取出共享變量,而後發送。這部分代碼由AllReduceSend完成。
對於AllReduceSend的每一個task來講,bufferName都是 centroidAllReduce。
由於每一個AllReduceSend也對應不一樣的task,因此每一個AllReduceSend讀取的centroidAllReduce必然不同,因此每一個task獲取的sendBuf都不同。他們分別把本身<handle, taskId>對應的 "centroidAllReduce" state取出,發送給下游。
AllReduceSend 發給其下游時候,是以subtask的序號爲基準發送給每個task,即本task中獲取的共享變量會發送給每個task,可是具體給哪個task發送變量的那一部分,是依據那個task 的 task index 和數據量 來計算出來的。若是數據量少,可能只給某一個或者幾個task發送。
後續中的 taskId ,都是subtask id。
其中,如何計算給哪一個task發送多少,是在DefaultDistributedInfo完成的。這裏須要結合 pieces 函數進行分析。須要注意的是:AllReduceSend這麼發送,AllReduceRecv後面也按照這個套路接受。這樣AllReduceRecv就能夠merge了。
AllReduceSend這麼發送,AllReduceRecv後面也按照這個套路接受 int pieces = pieces(sendLen);//表示本人此次send的數據分紅幾片,好比分紅50片。每片大小是TRANSFER_BUFFER_SIZE // 將要發給 8 個 subtask for (int i = 0; i < numOfSubTasks; ++i) { // 假如第5個subtask,那麼它發送的起始位置就是50/8 * 4 int startPos = (int) distributedInfo.startPos(i, numOfSubTasks, pieces); // 給第5個subtask發送多少片 int cnt = (int) distributedInfo.localRowCnt(i, numOfSubTasks, pieces);
具體代碼以下:
private static int pieces(int len) { int div = len / TRANSFER_BUFFER_SIZE; //本人此次send的數據分紅幾片,每片大小是TRANSFER_BUFFER_SIZE int mod = len % TRANSFER_BUFFER_SIZE; return mod == 0 ? div : div + 1; } public class DefaultDistributedInfo implements DistributedInfo { public long startPos(long taskId, long parallelism, long globalRowCnt) { long div = globalRowCnt / parallelism; long mod = globalRowCnt % parallelism; if (mod == 0) { return div * taskId; } else if (taskId >= mod) { return div * taskId + mod; } else { return div * taskId + taskId; } } public long localRowCnt(long taskId, long parallelism, long globalRowCnt) { long div = globalRowCnt / parallelism; long mod = globalRowCnt % parallelism; if (mod == 0) { return div; } else if (taskId >= mod) { return div; } else { return div + 1; } } }
具體AllReduceSend代碼以下,註解中有詳細說明。
// 這裏是變量名字定義。 public static final String CENTROID_ALL_REDUCE = "centroidAllReduce"; private static class AllReduceSend<T> extends RichMapPartitionFunction <T, Tuple3 <Integer, Integer, double[]>> { int numOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks(); // 與並行度相關,每一個task都會執行相同操做 // bufferName都是 centroidAllReduce,每一個task獲取的sendBuf都不同 // 計算怎麼發送所須要的數據結構 int pieces = pieces(sendLen); DistributedInfo distributedInfo = new DefaultDistributedInfo(); // 從上下文中獲取須要傳送的數據 double[] sendBuf = context.getObj(bufferName); int agg = 0; // 能夠看出來,是把須要傳送的數據給每一個task都發送。固然這個發送是根據發送數據的大小來肯定的,若是數據量小,可能就只給一個或者幾個task發送。 for (int i = 0; i < numOfSubTasks; ++i) { // startPos : 具體發送變量的那一部分,是依據task index來決定的。 // cnt : 具體哪個下游 task i 發送多少數據由此決定,若是是0,就不給task i發送數據。 int startPos = (int) distributedInfo.startPos(i, numOfSubTasks, pieces); int cnt = (int) distributedInfo.localRowCnt(i, numOfSubTasks, pieces); for (int j = 0; j < cnt; ++j) { // 發送哪個部分 int bufStart = (startPos + j) * TRANSFER_BUFFER_SIZE; // the last if (startPos + j == pieces - 1) { System.arraycopy(sendBuf, bufStart, transBuf, 0, lastLen(sendLen)); } else { System.arraycopy(sendBuf, bufStart, transBuf, 0, TRANSFER_BUFFER_SIZE); } agg++; // i 是subTasks的index,startPos + j是buffer內的位置,後續分區實際就是按照這個 i 來分區的。本AllReduceSend就是發送到numOfSubTasks這些task中。 out.collect(Tuple3.of(i, startPos + j, transBuf)); } } } private static int pieces(int len) { int div = len / TRANSFER_BUFFER_SIZE; // 4096 int mod = len % TRANSFER_BUFFER_SIZE; return mod == 0 ? div : div + 1; } sendBuf = {double[15]@10602} 0 = 40.3 1 = 18.200000000000003 2 = 33.6 3 = 12.5 4 = 6.0 5 = 45.3 6 = 30.599999999999998 7 = 12.4 8 = 2.0 9 = 9.0 10 = 24.0 11 = 10.4 12 = 17.1 13 = 5.199999999999999 14 = 4.0 this = {AllReduce$AllReduceSend@10598} bufferName = "centroidAllReduce" lengthName = null transferBufferName = "3dfb2aae-683d-4497-91fc-30b8d6853bce" sessionId = 0 runtimeContext = {AbstractIterativeTask$IterativeRuntimeUdfContext@10606}
AllReduceSend發送變量給下游時候,使用了自定義的partition(partitionCustom )。其是用 index of subtask 來做爲key分區。這樣就和AllReduceSend那個out.collect對應了。
.partitionCustom(new Partitioner <Integer>() { @Override public int partition(Integer key, int numPartitions) { return key; } }, 0) .name("AllReduceBroadcastRaw") // 調用到這個partition函數的調用棧 partition:102, AllReduce$2 (com.alibaba.alink.common.comqueue.communication) partition:99, AllReduce$2 (com.alibaba.alink.common.comqueue.communication) customPartition:235, OutputEmitter (org.apache.flink.runtime.operators.shipping) selectChannel:149, OutputEmitter (org.apache.flink.runtime.operators.shipping) selectChannel:36, OutputEmitter (org.apache.flink.runtime.operators.shipping) emit:120, RecordWriter (org.apache.flink.runtime.io.network.api.writer) collect:65, OutputCollector (org.apache.flink.runtime.operators.shipping) collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics) mapPartition:257, AllReduce$AllReduceSend (com.alibaba.alink.common.comqueue.communication) run:103, MapPartitionDriver (org.apache.flink.runtime.operators) run:504, BatchTask (org.apache.flink.runtime.operators) run:157, AbstractIterativeTask (org.apache.flink.runtime.iterative.task) run:107, IterationIntermediateTask (org.apache.flink.runtime.iterative.task) invoke:369, BatchTask (org.apache.flink.runtime.operators) doRun:705, Task (org.apache.flink.runtime.taskmanager) run:530, Task (org.apache.flink.runtime.taskmanager) run:745, Thread (java.lang) // @AllReduceSend.mapPartition 這裏開始調用 for (int i = 0; i < numOfSubTasks; ++i) { // i 是subTasks的index,後續分區實際就是按照這個 i 來分區的。本AllReduceSend就是發送到numOfSubTasks這些task中。 out.collect(Tuple3.of(i, startPos + j, transBuf)); } // 從後續調用序列能夠看出來,最終是用 index of subtask 來做爲key分區。 // 這裏發送record public class CountingCollector<OUT> implements Collector<OUT> { public void collect(OUT record) { this.numRecordsOut.inc(); this.collector.collect(record); } } record = {Tuple3@10586} "(0,0,[40.50000000000001, 18.7, 33.300000000000004, 12.8, 6.0, 29.7, 21.0, 8.4, 1.7, 6.0, 48.1, 22.199999999999996, 36.0, 12.200000000000001, 8.0, 0.0," f0 = {Integer@10583} 0 f1 = {Integer@10583} 0 f2 = {double[4096]@10598} // 這裏開始分區 public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T>> { private int customPartition(T record, int numberOfChannels) { if (extractedKeys == null) { extractedKeys = new Object[1]; } if (comparator.extractKeys(record, extractedKeys, 0) == 1) { // 因此 key 是 0 final Object key = extractedKeys[0]; return partitioner.partition(key, numberOfChannels); } } } public final class TupleComparator<T extends Tuple> extends TupleComparatorBase<T> { public int extractKeys(Object record, Object[] target, int index) { int localIndex = index; for(int i = 0; i < comparators.length; i++) { localIndex += comparators[i].extractKeys(((Tuple) record).getField(keyPositions[i]), target, localIndex); } return localIndex - index; } } // 就是取出第一個field的數值 key = {Integer@10583} 0 value = 0 extractedKeys = {Object[1]@10587} 0 = {Integer@10583} 0 value = 0
全部workers在它收到的數據上作reduce,而後把這個部分reduce的結果(partial results)發送給其餘workers。
partial results是由於每一個task接受的數據不一樣,是上游根據task index計算位置而且發送過來的。
可是AllReduceSum的計算結果會給每個下游 task index 發送。
private static class AllReduceSum extends RichMapPartitionFunction <Tuple3 <Integer, Integer, double[]>, Tuple3 <Integer, Integer, double[]>> { public void mapPartition(Iterable <Tuple3 <Integer, Integer, double[]>> values,Collector <Tuple3 <Integer, Integer, double[]>> out) { // 這時候雖然也用到了context取出了sendBuf,可是隻是用來獲取其長度而已。 int taskId = getRuntimeContext().getIndexOfThisSubtask(); int numOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks(); double[] sendBuf = context.getObj(bufferName); int sendLen = lengthName != null ? context.getObj(lengthName) : sendBuf.length; int pieces = pieces(sendLen); DistributedInfo distributedInfo = new DefaultDistributedInfo(); // startPos : 本task接受的數據,startPos 是應該從原始數據的哪一個位置開始。是依據task index來決定的。 // cnt : 具體哪個下游 task i 發送多少數據由此決定。 int startPos = (int) distributedInfo.startPos(taskId, numOfSubTasks, pieces); int cnt = (int) distributedInfo.localRowCnt(taskId, numOfSubTasks, pieces); // 這裏進行了reduce SUM工做 double[][] sum = new double[cnt][]; double[] agg = new double[cnt]; do { Tuple3 <Integer, Integer, double[]> val = it.next(); int localPos = val.f1 - startPos; if (sum[localPos] == null) { sum[localPos] = val.f2; agg[localPos]++; } else { op.accept(sum[localPos], val.f2); } } while (it.hasNext()); // 依然發送給下游,依然是用subtask index來做爲partition key。 // 注意,這裏是把結果發送給全部的下游task。 for (int i = 0; i < numOfSubTasks; ++i) { for (int j = 0; j < cnt; ++j) { // startPos是本task發送的數據應該從原始數據的哪一個位置開始。 // 可是給每個 task i 發的都是一樣的數據。可是 startPos + j 很重要,下游task i 會根據這個知道它應該把接收到的數據存儲在預約義變量的什麼地方。 out.collect(Tuple3.of(i, startPos + j, sum[j])); } } } } sum = {double[1][]@10605} 0 = {double[4096]@10613} 0 = 118.50000000000001 1 = 77.7 2 = 37.2 3 = 5.9 4 = 25.0 5 = 621.1000000000001 6 = 284.7 7 = 487.59999999999997 8 = 166.5 9 = 99.0 10 = 136.9 11 = 95.7 12 = 39.0 13 = 7.4 14 = 26.0
AllReduceSum 發送變量給下游時候,使用了自定義的partition(partitionCustom )。其是用 index of subtask 來做爲key分區。
其意義和以前的 partitionCustom 相同。
All workers merge partial results into final result and put it into session context with pre-defined object name.
每個下游 AllReduceRecv 都接收到 每個上游 AllReduceSum 發送過來的 cluster(求和以後的),而後把每份數據存入到本身task manager對應的預約義變量state的不一樣部分(這個不一樣部分是根據接受到的數據val.f1計算出來的)。
結合前面可知,AllReduceSend發送和AllReduceRecv接受,都是按照一樣的套路計算在共享變量中的數據位置。這樣AllReduceRecv就能夠merge了。
這樣就完成了全部workers把部分reduce sum的結果合併成爲最終結果,而後放入預約義的上下文變量中。
private static class AllReduceRecv<T> extends RichMapPartitionFunction <Tuple3 <Integer, Integer, double[]>, T> { private final String bufferName; private final String lengthName; private final int sessionId; @Override public void mapPartition(Iterable <Tuple3 <Integer, Integer, double[]>> values, Collector <T> out) throws Exception { ComContext context = new ComContext(sessionId, getIterationRuntimeContext()); Iterator <Tuple3 <Integer, Integer, double[]>> it = values.iterator(); if (!it.hasNext()) { return; } double[] recvBuf = context.getObj(bufferName); int recvLen = lengthName != null ? context.getObj(lengthName) : recvBuf.length; int pieces = pieces(recvLen); // 和以前AllReduceSend同樣的套路計算應該存儲在共享變量什麼位置。 do { Tuple3 <Integer, Integer, double[]> val = it.next(); if (val.f1 == pieces - 1) { System.arraycopy(val.f2, 0, recvBuf, val.f1 * TRANSFER_BUFFER_SIZE, lastLen(recvLen)); } else { // 拷貝到共享變量的相應部位。val.f1 是上游發送過來的。做爲merge功能的起始位置。 System.arraycopy(val.f2, 0, recvBuf, val.f1 * TRANSFER_BUFFER_SIZE, TRANSFER_BUFFER_SIZE); } } while (it.hasNext()); } } val = {Tuple3@10672} "(3,0,[335.3, 150.89999999999998, 277.5, 99.79999999999998, 50.0, 290.9, 136.3, 213.1, 67.8, 50.0, 250.3, 170.89999999999998, 73.2, 12.2, 50.0, 0.0....." f0 = {Integer@10682} 3 value = 3 f1 = {Integer@10638} 0 value = 0 f2 = {double[4096]@10674} 0 = 335.3 1 = 150.89999999999998 2 = 277.5 3 = 99.79999999999998 4 = 50.0 5 = 290.9 6 = 136.3 7 = 213.1 8 = 67.8 9 = 50.0 10 = 250.3 11 = 170.89999999999998 12 = 73.2 13 = 12.2 14 = 50.0 15 = 0.0 ...... // 每一個task都收到了reduce sum結果。 recvBuf = {double[15]@10666} 0 = 404.3 1 = 183.1 2 = 329.3 3 = 117.2 4 = 61.0 5 = 250.3 6 = 170.89999999999998 7 = 73.20000000000002 8 = 12.2 9 = 50.0 10 = 221.89999999999998 11 = 104.1 12 = 161.29999999999998 13 = 50.4 14 = 39.0
基於點計數和座標,計算新的聚類中心。這裏就是從task manager中取出了AllReduce存儲的共享變量CENTROID_ALL_REDUCE。
/** * Update the centroids based on the sum of points and point number belonging to the same cluster. */ public class KMeansUpdateCentroids extends ComputeFunction { public void calc(ComContext context) { Integer vectorSize = context.getObj(KMeansTrainBatchOp.VECTOR_SIZE); Integer k = context.getObj(KMeansTrainBatchOp.K); // 這裏取出AllReduce存儲的共享變量 double[] sumMatrixData = context.getObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE); Tuple2<Integer, FastDistanceMatrixData> stepNumCentroids; if (context.getStepNo() % 2 == 0) { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2); } else { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1); } stepNumCentroids.f0 = context.getStepNo(); context.putObj(KMeansTrainBatchOp.K, updateCentroids(stepNumCentroids.f1, k, vectorSize, sumMatrixData, distance)); } }
個人並行計算之路(四)MPI集合通訊之Reduce和Allreduce
Message Passing Interface(MPI)