做者|白松java
一、 添加類,把每一個超步發送的消息量大小寫入Hadoop的Counter中。在org.apache.giraph.counters包下新建GiraphMessages類,來統計消息量。apache
源代碼以下:app
package org.apache.giraph.counters; import java.util.Iterator; import java.util.Map; import org.apache.hadoop.mapreduce.Mapper.Context; import com.google.common.collect.Maps; /** * Hadoop Counters in group "Giraph Messages" for counting every superstep * message count. */ public class GiraphMessages extends HadoopCountersBase { /** Counter group name for the giraph Messages */ public static final String GROUP_NAME = "Giraph Messages"; /** Singleton instance for everyone to use */ private static GiraphMessages INSTANCE; /** superstep time in msec */ private final Map superstepMessages; private GiraphMessages(Context context) { super(context, GROUP_NAME); superstepMessages = Maps.newHashMap(); } /** * Instantiate with Hadoop Context. * * @param context * Hadoop Context to use. */ public static void init(Context context) { INSTANCE = new GiraphMessages(context); } /** * Get singleton instance. * * @return singleton GiraphTimers instance. */ public static GiraphMessages getInstance() { return INSTANCE; } /** * Get counter for superstep messages * * @param superstep * @return */ public GiraphHadoopCounter getSuperstepMessages(long superstep) { GiraphHadoopCounter counter = superstepMessages.get(superstep); if (counter == null) { String counterPrefix = "Superstep- " + superstep+" "; counter = getCounter(counterPrefix); superstepMessages.put(superstep, counter); } return counter; } @Override public Iterator iterator() { return superstepMessages.values().iterator(); } }
二、在BspServiceMaster類中添加統計功能。Master在每次同步時候,會彙集每一個Worker發送的消息量大小(求和),存儲於GlobalStats中。所以只須要在每次同步後,從GlobalStats對象中取出總的通訊量大小,而後寫入GiraphMessages中。格式爲<SuperStep-Number,TotalMessagesCount>,實際存儲於上步GiraphMessages類中定義的Map<Long, GiraphHadoopCounter> superstepMessages 對象中。 在BspServiceMaster的構造方法中,最後面追加一行代碼,對GiraphMessages進行初始化。ide
GiraphMessages.init(context);
在BspServiceMaster類的SuperstepState coordinateSuperstep()方法中,添加記錄功能。片斷代碼以下:oop
…… // If the master is halted or all the vertices voted to halt and there // are no more messages in the system, stop the computation GlobalStats globalStats = aggregateWorkerStats(getSuperstep()); LOG.info("D-globalStats: "+globalStats+"\n\n"); //添加下面語句。從第0個超步起開始記錄。 if(getSuperstep() != INPUT_SUPERSTEP) { GiraphMessages.getInstance().getSuperstepMessages(getSuperstep()).increment(globalStats.getMessageCount()); } ……
三、實驗結果以下:google
完!code