由前文知道每一個BSPServiceWorker有一個WorkerServer對象,WorkerServer對象裏面又有ServerData對象,做爲數據實。ServerData中包含該Worker的partitionStore、edgeStore、incomingMessageStore、currentMessageStore、彙集值等。其中incomingMessageStore對象爲MessageStoreByPartition(接口)類型,也就是說消息時按照分區來存儲的。MessageStoreByPartition接口的關係圖以下:apache
在SimpleMessageStore抽象類中,有一個ConcurrentMap<Integer,ConcurrentMap<I,T>>類型的變量map,用來存儲消息。第一層是pairtitionID到發送到該partition消息的映射;第二層是VertexID 到發送給該Vertex的消息隊列。數組
《Giraph通訊模塊分析》:http://my.oschina.net/skyaugust/blog/95182緩存
每一個頂點的消息列表具體爲ExtendedDataOutput類型,它繼承DataOutput接口,增長了幾個方法而已。每一個消息是以字節形式寫入到ExtendedDataOutput對象中的。網絡
發送消息時,採用異步式通訊。併發
圖頂點的計算處理與消息通訊併發執行,在計算過程當中就能夠發送消息,將大規模消息發送分散在不一樣的時間段,避免瞬時網絡通訊阻塞,可是接受端須要額外的空間,存儲臨時接收到的消息,至關於空間換時間。而集中式通訊,圖頂點的計算處理與消息通訊串行進行,在計算完畢後,統一發送消息,控制和實現方式簡單,可在發送端對消息進行最大程度優化,但容易形成瞬時間的網絡通訊阻塞以及增長髮送端的消息存儲開銷。異步
不一樣Worker間的消息通訊使用RPC方式,具體爲Netty。同一Worker內,連續兩次迭代的消息直接經過內存操做,把要發送的消息直接複製到Worker的incomingMessageStore中。下面詳述消息的存儲格式和發送機制。ide
Giraph使用Cache來緩存消息,當消息達到必定閾值後,一次性發送。源碼分析
既按照bulk模式進行,不會一條一條信息發送。向某個頂點發送的消息是按照<destVertexId,Message> pair存儲在ByteArrayVertexIdData<I,T>中(實際爲ByteArrayVertexIdMessages<I,M>類型)。介紹以下: org.apache.giraph.utils.ByteArrayVertexIdData<I,T>優化
功能:把<頂點ID,data> Pair 存儲在一個 byte數組中。裏面有 ExtendedDataOutput對象用來存儲數據。.net
該類中還有一個內部類:VertexIdDataIterator,該內部類繼承 VertexIdIterator類。
org.apache.giraph.comm.SendCache用來緩存發送的信息,而後以「Bulk」模式發送。在Giraph中,每一個Worker上能夠對應多個分區。消息緩存的閾值是以Worker爲單位計算,而不是Partition。
SendCache中有ByteArrayVertexIdData<I,T>[ ] dataCache數組用來存儲發送給每一個Partition的消息;有int[ ] dataSizes數組用於記錄向每一個Worker發送的消息大小,若大於MAX_MSG_REQUEST_SIZE(默認爲512KB)就把此Worker上的全部Partition緩存的消息發送到給該Worker,同一Worker內消息也是如此緩存;有int[ ] initBufferSizes數組用於記錄每一個Worker上的每一個Partition的初始化ByteArrayVertexIdData中ExtendedDataOutput對象的大小,同一Worker上的全部Partition初始值相同,該值爲平均值。記MAX_MSG_REQUEST_SIZE(message request size)值爲M, 該Worker上有P個 partitions,ADDTITIONNAL_MSG_REQUEST_SIZE(比平均值大的因子)默認爲0.2f,記爲A。則每一個Partition的初始大小爲:M*(1+A) / P .
由前文知道,每一個Worker都有一個NettyWorkerClientRequestProcessor<I,V,E,M>用來發送消息。該類中有SendMessageCache對象用來緩存向外發送的信息。NettyWorkerClientRequestProcessor類中的sendMessageRequest(I,M)
方法以下,用於向某個頂點destVertexId發送消息message。
方法解釋:首先根據destVertexId獲得對應的partitionId和WorkerInfo,而後把消息add到SendMessageCache中,並返回向該頂點所屬Worker發送的消息大小workerMessageSize。若該值大於默認值512KB,則把此Worker對應的全部Partition消息從SendMessageCache中刪除,把刪除的消息賦值給workerMessages,其類型爲PairList<Integer,ByteArrayVertexIdMessages<I,M>> ,key爲partitionId,value爲發送給該partition的消息列表,最後調用doRequest()方法發送信息。doRequest()方法以下:
能夠看到在發送消息時,先判斷是否在同一Worker上。若是是的話,調用SendWorkerMessagesRequest<T,M>的doRequest發送消息;不然使用WorkerClient(底層使用Netty)進行消息發送。下面着重討論同一Worker內的機制。
org.apache.giraph.comm.requests.SendWorkerMessagesRequest類中的doRequest方法以下:
參數爲該Worker的ServerData,代碼中的partitionVertexData實際爲PairList<Integer,ByteArrayVertexIdMessages<I,M>>workerMessages。遍歷<partitionID,對應的消息列表>來添加到ServerData中的incomingMessageStore中。
ByteArrayMessagesPerVertexStore類中的addPartitionMessages()方法以下:
當用戶使用了Combiner,incomingMessageStore對應的類型則爲OneMessagePerVertexStore,該類爲每一個頂點只存儲一個消息,而非消息隊列。 結構以下圖:
當添加一條消息時,會把頂點已對應的消息和要添加的消息調用combine()方法進行合併,而後存儲在上述結構圖中。addPartitionMessages()方法以下:
在ComputeCallable中的call()方法調用computePartition(Partition)計算完全部Partition上的頂點後,調用WorkerClientRequestProcessor.flush()方法把全部剩餘的消息發送出去。