做者|白松java
目的:科研中,須要分析在每次迭代過程當中參與計算的頂點數目,來進一步優化系統。好比,在SSSP的compute()方法最後一行,都會把當前頂點voteToHalt,即變爲InActive狀態。因此每次迭代完成後,全部頂點都是InActive狀態。在大同步後,收到消息的頂點會被激活,變爲Active狀態,而後調用頂點的compute()方法。本文的目的就是統計每次迭代過程當中,參與計算的頂點數目。下面附上SSSP的compute()方法:算法
@Override public void compute(Iterable messages) { if (getSuperstep() == 0) { setValue(new DoubleWritable(Double.MAX_VALUE)); } double minDist = isSource() ? 0d : Double.MAX_VALUE; for (DoubleWritable message : messages) { minDist = Math.min(minDist, message.get()); } if (minDist < getValue().get()) { setValue(new DoubleWritable(minDist)); for (Edge edge : getEdges()) { double distance = minDist + edge.getValue().get(); sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance)); } } //把頂點置爲InActive狀態 voteToHalt(); }
附:giraph中算法的終止條件是:沒有活躍頂點且worker間沒有消息傳遞。apache
hama-0.6.0中算法的終止條件只是:判斷是否有活躍頂點。不是真正的pregel思想,半成品。app
修改過程以下:ide
添加變量和方法,用來統計每一個Partition在每一個超步中參與計算的頂點數目。添加的變量和方法以下:oop
/** computed vertices in this partition */ private long computedVertexCount=0; /** * Increment the computed vertex count by one. */ public void incrComputedVertexCount() { ++ computedVertexCount; } /** * @return the computedVertexCount */ public long getComputedVertexCount() { return computedVertexCount; }
修改readFields()和write()方法,每一個方法追加最後一句。當每一個Partition計算完成後,會把本身的computedVertexCount發送給Master,Mater再讀取彙總。源碼分析
@Override public void readFields(DataInput input) throws IOException { partitionId = input.readInt(); vertexCount = input.readLong(); finishedVertexCount = input.readLong(); edgeCount = input.readLong(); messagesSentCount = input.readLong(); //添加下條語句 computedVertexCount=input.readLong(); } @Override public void write(DataOutput output) throws IOException { output.writeInt(partitionId); output.writeLong(vertexCount); output.writeLong(finishedVertexCount); output.writeLong(edgeCount); output.writeLong(messagesSentCount); //添加下條語句 output.writeLong(computedVertexCount); }
org.apache.giraph.graph. GlobalStats 類測試
添加變量和方法,用來統計每一個超步中參與計算的頂點總數目,包含每一個Worker上的全部Partitions。優化
/** computed vertices in this partition * Add by BaiSong */ private long computedVertexCount=0; /** * @return the computedVertexCount */ public long getComputedVertexCount() { return computedVertexCount; }
修改addPartitionStats(PartitionStats partitionStats)方法,增長統計computedVertexCount功能。this
/** * Add the stats of a partition to the global stats. * * @param partitionStats Partition stats to be added. */ public void addPartitionStats(PartitionStats partitionStats) { this.vertexCount += partitionStats.getVertexCount(); this.finishedVertexCount += partitionStats.getFinishedVertexCount(); this.edgeCount += partitionStats.getEdgeCount(); //Add by BaiSong,添加下條語句 this.computedVertexCount+=partitionStats.getComputedVertexCount(); }
固然爲了Debug方便,也能夠修改該類的toString()方法(可選),修改後的以下:
public String toString() { return "(vtx=" + vertexCount + ", computedVertexCount=" + computedVertexCount + ",finVtx=" + finishedVertexCount + ",edges=" + edgeCount + ",msgCount=" + messageCount + ",haltComputation=" + haltComputation + ")"; }
添加統計功能。在computePartition()方法中,添加下面一句。
if (!vertex.isHalted()) { context.progress(); TimerContext computeOneTimerContext = computeOneTimer.time(); try { vertex.compute(messages); //添加下面一句,當頂點調用完compute()方法後,就把該Partition的computedVertexCount加1 partitionStats.incrComputedVertexCount(); } finally { computeOneTimerContext.stop(); } ……
package org.apache.giraph.counters; import java.util.Iterator; import java.util.Map; import org.apache.hadoop.mapreduce.Mapper.Context; import com.google.common.collect.Maps; /** * Hadoop Counters in group "Giraph Messages" for counting every superstep * message count. */ public class GiraphComputedVertex extends HadoopCountersBase { /** Counter group name for the giraph Messages */ public static final String GROUP_NAME = "Giraph Computed Vertex"; /** Singleton instance for everyone to use */ private static GiraphComputedVertex INSTANCE; /** superstep time in msec */ private final Map superstepVertexCount; private GiraphComputedVertex(Context context) { super(context, GROUP_NAME); superstepVertexCount = Maps.newHashMap(); } /** * Instantiate with Hadoop Context. * * @param context * Hadoop Context to use. */ public static void init(Context context) { INSTANCE = new GiraphComputedVertex(context); } /** * Get singleton instance. * * @return singleton GiraphTimers instance. */ public static GiraphComputedVertex getInstance() { return INSTANCE; } /** * Get counter for superstep messages * * @param superstep * @return */ public GiraphHadoopCounter getSuperstepVertexCount(long superstep) { GiraphHadoopCounter counter = superstepVertexCount.get(superstep); if (counter == null) { String counterPrefix = "Superstep: " + superstep+" "; counter = getCounter(counterPrefix); superstepVertexCount.put(superstep, counter); } return counter; } @Override public Iterator iterator() { return superstepVertexCount.values().iterator(); } }
上圖測試中,共有6次迭代。紅色框中,顯示出了每次迭代過沖參與計算的頂點數目,依次是:9,4,4,3,4,0
解釋:在第0個超步,每一個頂點都是活躍的,全部共有9個頂點參與計算。在第5個超步,共有0個頂點參與計算,那麼就不會向外發送消息,加上每一個頂點都是不活躍的,因此算法迭代終止。
【閱讀更多文章請訪問數瀾社區】