在有向圖中,若是從任意一個頂點出發,都能經過圖中的邊到達圖中的每個頂點,則稱之爲強連通圖。一張有向圖的頂點數極大的強連通子圖稱爲強連通份量。此算法示例基於 parallel Coloring algorithm。
每一個頂點包含兩個部分,以下所示:
colorID:在向前遍歷過程當中存儲頂點 v 的顏色,在計算結束時,具備相同 colorID 的頂點屬於一個強連通份量。java
transposeNeighbors:存儲輸入圖的轉置圖中頂點 v 的鄰居 ID。算法
算法包含如下四部分:
生成轉置圖:包含兩個超步,首先每一個頂點發送 ID 到其出邊對應的鄰居,這些 ID 在第二個超步中會存爲 transposeNeighbors 值。apache
修剪:一個超步,每一個只有一個入邊或出邊的頂點,將其 colorID 設爲自身 ID,狀態設爲不活躍,後面傳給該頂點的信號被忽略。併發
向前遍歷:頂點包括兩個子過程(超步),啓動和休眠。在啓動階段,每一個頂點將其 colorID 設置爲自身 ID,同時將其 ID 傳給出邊對應的鄰居。休眠階段,頂點使用其收到的最大 colorID 更新自身 colorID,並傳播其 colorID,直到 colorID 收斂。當 colorID 收斂,master 進程將全局對象設置爲向後遍歷。app
向後遍歷:一樣包含兩個子過程,啓動和休眠。啓動階段,每個 ID 等於 colorID 的頂點將其 ID 傳遞給其轉置圖鄰居頂點,同時將自身狀態設置爲不活躍,後面傳給該頂點的信號可忽略。在每個休眠步,每一個頂點接收到與其 colorID 匹配的信號,並將其 colorID 在轉置圖中傳播,隨後設置自身狀態爲不活躍。該步結束後若是仍有活躍頂點,則回到修剪步。ide
強連通份量的代碼,以下所示:函數
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import com.aliyun.odps.data.TableInfo; import com.aliyun.odps.graph.Aggregator; import com.aliyun.odps.graph.ComputeContext; import com.aliyun.odps.graph.GraphJob; import com.aliyun.odps.graph.GraphLoader; import com.aliyun.odps.graph.MutationContext; import com.aliyun.odps.graph.Vertex; import com.aliyun.odps.graph.WorkerContext; import com.aliyun.odps.io.BooleanWritable; import com.aliyun.odps.io.IntWritable; import com.aliyun.odps.io.LongWritable; import com.aliyun.odps.io.NullWritable; import com.aliyun.odps.io.Tuple; import com.aliyun.odps.io.Writable; import com.aliyun.odps.io.WritableRecord; /** * Definition from Wikipedia: * In the mathematical theory of directed graphs, a graph is said * to be strongly connected if every vertex is reachable from every * other vertex. The strongly connected components of an arbitrary * directed graph form a partition into subgraphs that are themselves * strongly connected. * * Algorithms with four phases as follows. * 1\. Transpose Graph Formation: Requires two supersteps. In the first * superstep, each vertex sends a message with its ID to all its outgoing * neighbors, which in the second superstep are stored in transposeNeighbors. * * 2\. Trimming: Takes one superstep. Every vertex with only in-coming or * only outgoing edges (or neither) sets its colorID to its own ID and * becomes inactive. Messages subsequently sent to the vertex are ignored. * * 3\. Forward-Traversal: There are two sub phases: Start and Rest. In the * Start phase, each vertex sets its colorID to its own ID and propagates * its ID to its outgoing neighbors. In the Rest phase, vertices update * their own colorIDs with the minimum colorID they have seen, and propagate * their colorIDs, if updated, until the colorIDs converge. * Set the phase to Backward-Traversal when the colorIDs converge. * * 4\. Backward-Traversal: We again break the phase into Start and Rest. * In Start, every vertex whose ID equals its colorID propagates its ID to * the vertices in transposeNeighbors and sets itself inactive. Messages * subsequently sent to the vertex are ignored. In each of the Rest phase supersteps, * each vertex receiving a message that matches its colorID: (1) propagates * its colorID in the transpose graph; (2) sets itself inactive. Messages * subsequently sent to the vertex are ignored. Set the phase back to Trimming * if not all vertex are inactive. * * http://ilpubs.stanford.edu:8090/1077/3/p535-salihoglu.pdf */ public class StronglyConnectedComponents { public final static int STAGE_TRANSPOSE_1 = 0; public final static int STAGE_TRANSPOSE_2 = 1; public final static int STAGE_TRIMMING = 2; public final static int STAGE_FW_START = 3; public final static int STAGE_FW_REST = 4; public final static int STAGE_BW_START = 5; public final static int STAGE_BW_REST = 6; /** * The value is composed of component id, incoming neighbors, * active status and updated status. */ public static class MyValue implements Writable { LongWritable sccID;// strongly connected component id Tuple inNeighbors; // transpose neighbors BooleanWritable active; // vertex is active or not BooleanWritable updated; // sccID is updated or not public MyValue() { this.sccID = new LongWritable(Long.MAX_VALUE); this.inNeighbors = new Tuple(); this.active = new BooleanWritable(true); this.updated = new BooleanWritable(false); } public void setSccID(LongWritable sccID) { this.sccID = sccID; } public LongWritable getSccID() { return this.sccID; } public void setInNeighbors(Tuple inNeighbors) { this.inNeighbors = inNeighbors; } public Tuple getInNeighbors() { return this.inNeighbors; } public void addInNeighbor(LongWritable neighbor) { this.inNeighbors.append(new LongWritable(neighbor.get())); } public boolean isActive() { return this.active.get(); } public void setActive(boolean status) { this.active.set(status); } public boolean isUpdated() { return this.updated.get(); } public void setUpdated(boolean update) { this.updated.set(update); } @Override public void write(DataOutput out) throws IOException { this.sccID.write(out); this.inNeighbors.write(out); this.active.write(out); this.updated.write(out); } @Override public void readFields(DataInput in) throws IOException { this.sccID.readFields(in); this.inNeighbors.readFields(in); this.active.readFields(in); this.updated.readFields(in); } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("sccID: " + sccID.get()); sb.append(" inNeighbores: " + inNeighbors.toDelimitedString(',')); sb.append(" active: " + active.get()); sb.append(" updated: " + updated.get()); return sb.toString(); } } public static class SCCVertex extends Vertex<LongWritable, MyValue, NullWritable, LongWritable> { public SCCVertex() { this.setValue(new MyValue()); } @Override public void compute( ComputeContext<LongWritable, MyValue, NullWritable, LongWritable> context, Iterable<LongWritable> msgs) throws IOException { // Messages sent to inactive vertex are ignored. if (!this.getValue().isActive()) { this.voteToHalt(); return; } int stage = ((SCCAggrValue)context.getLastAggregatedValue(0)).getStage(); switch (stage) { case STAGE_TRANSPOSE_1: context.sendMessageToNeighbors(this, this.getId()); break; case STAGE_TRANSPOSE_2: for (LongWritable msg: msgs) { this.getValue().addInNeighbor(msg); } case STAGE_TRIMMING: this.getValue().setSccID(getId()); if (this.getValue().getInNeighbors().size() == 0 || this.getNumEdges() == 0) { this.getValue().setActive(false); } break; case STAGE_FW_START: this.getValue().setSccID(getId()); context.sendMessageToNeighbors(this, this.getValue().getSccID()); break; case STAGE_FW_REST: long minSccID = Long.MAX_VALUE; for (LongWritable msg : msgs) { if (msg.get() < minSccID) { minSccID = msg.get(); } } if (minSccID < this.getValue().getSccID().get()) { this.getValue().setSccID(new LongWritable(minSccID)); context.sendMessageToNeighbors(this, this.getValue().getSccID()); this.getValue().setUpdated(true); } else { this.getValue().setUpdated(false); } break; case STAGE_BW_START: if (this.getId().equals(this.getValue().getSccID())) { for (Writable neighbor : this.getValue().getInNeighbors().getAll()) { context.sendMessage((LongWritable)neighbor, this.getValue().getSccID()); } this.getValue().setActive(false); } break; case STAGE_BW_REST: this.getValue().setUpdated(false); for (LongWritable msg : msgs) { if (msg.equals(this.getValue().getSccID())) { for (Writable neighbor : this.getValue().getInNeighbors().getAll()) { context.sendMessage((LongWritable)neighbor, this.getValue().getSccID()); } this.getValue().setActive(false); this.getValue().setUpdated(true); break; } } break; } context.aggregate(0, getValue()); } @Override public void cleanup( WorkerContext<LongWritable, MyValue, NullWritable, LongWritable> context) throws IOException { context.write(getId(), getValue().getSccID()); } } /** * The SCCAggrValue maintains global stage and graph updated and active status. * updated is true only if one vertex is updated. * active is true only if one vertex is active. */ public static class SCCAggrValue implements Writable { IntWritable stage = new IntWritable(STAGE_TRANSPOSE_1); BooleanWritable updated = new BooleanWritable(false); BooleanWritable active = new BooleanWritable(false); public void setStage(int stage) { this.stage.set(stage); } public int getStage() { return this.stage.get(); } public void setUpdated(boolean updated) { this.updated.set(updated); } public boolean getUpdated() { return this.updated.get(); } public void setActive(boolean active) { this.active.set(active); } public boolean getActive() { return this.active.get(); } @Override public void write(DataOutput out) throws IOException { this.stage.write(out); this.updated.write(out); this.active.write(out); } @Override public void readFields(DataInput in) throws IOException { this.stage.readFields(in); this.updated.readFields(in); this.active.readFields(in); } } /** * The job of SCCAggregator is to schedule global stage in every superstep. */ public static class SCCAggregator extends Aggregator<SCCAggrValue> { @SuppressWarnings("rawtypes") @Override public SCCAggrValue createStartupValue(WorkerContext context) throws IOException { return new SCCAggrValue(); } @SuppressWarnings("rawtypes") @Override public SCCAggrValue createInitialValue(WorkerContext context) throws IOException { return (SCCAggrValue) context.getLastAggregatedValue(0); } @Override public void aggregate(SCCAggrValue value, Object item) throws IOException { MyValue v = (MyValue)item; if ((value.getStage() == STAGE_FW_REST || value.getStage() == STAGE_BW_REST) && v.isUpdated()) { value.setUpdated(true); } // only active vertex invoke aggregate() value.setActive(true); } @Override public void merge(SCCAggrValue value, SCCAggrValue partial) throws IOException { boolean updated = value.getUpdated() || partial.getUpdated(); value.setUpdated(updated); boolean active = value.getActive() || partial.getActive(); value.setActive(active); } @SuppressWarnings("rawtypes") @Override public boolean terminate(WorkerContext context, SCCAggrValue value) throws IOException { // If all vertices is inactive, job is over. if (!value.getActive()) { return true; } // state machine switch (value.getStage()) { case STAGE_TRANSPOSE_1: value.setStage(STAGE_TRANSPOSE_2); break; case STAGE_TRANSPOSE_2: value.setStage(STAGE_TRIMMING); break; case STAGE_TRIMMING: value.setStage(STAGE_FW_START); break; case STAGE_FW_START: value.setStage(STAGE_FW_REST); break; case STAGE_FW_REST: if (value.getUpdated()) { value.setStage(STAGE_FW_REST); } else { value.setStage(STAGE_BW_START); } break; case STAGE_BW_START: value.setStage(STAGE_BW_REST); break; case STAGE_BW_REST: if (value.getUpdated()) { value.setStage(STAGE_BW_REST); } else { value.setStage(STAGE_TRIMMING); } break; } value.setActive(false); value.setUpdated(false); return false; } } public static class SCCVertexReader extends GraphLoader<LongWritable, MyValue, NullWritable, LongWritable> { @Override public void load( LongWritable recordNum, WritableRecord record, MutationContext<LongWritable, MyValue, NullWritable, LongWritable> context) throws IOException { SCCVertex vertex = new SCCVertex(); vertex.setId((LongWritable) record.get(0)); String[] edges = record.get(1).toString().split(","); for (int i = 0; i < edges.length; i++) { try { long destID = Long.parseLong(edges[i]); vertex.addEdge(new LongWritable(destID), NullWritable.get()); } catch(NumberFormatException nfe) { System.err.println("Ignore " + nfe); } } context.addVertexRequest(vertex); } } public static void main(String[] args) throws IOException { if (args.length < 2) { System.out.println("Usage: <input> <output>"); System.exit(-1); } GraphJob job = new GraphJob(); job.setGraphLoaderClass(SCCVertexReader.class); job.setVertexClass(SCCVertex.class); job.setAggregatorClass(SCCAggregator.class); job.addInput(TableInfo.builder().tableName(args[0]).build()); job.addOutput(TableInfo.builder().tableName(args[1]).build()); long startTime = System.currentTimeMillis(); job.run(); System.out.println("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); } }
兩個頂點之間存在路徑,稱兩個頂點爲連通的。若是無向圖 G 中任意兩個頂點都是連通的,則稱 G 爲連通圖,不然稱爲非連通圖。其頂點個數極大的連通子圖稱爲連通份量。
本算法計算每一個點的連通份量成員,最後輸出頂點值中包含最小頂點 ID 的連通份量。將最小頂點 ID 沿着邊傳播到連通份量的全部頂點。ui
連通份量的代碼,以下所示:this
import java.io.IOException; import com.aliyun.odps.data.TableInfo; import com.aliyun.odps.graph.ComputeContext; import com.aliyun.odps.graph.GraphJob; import com.aliyun.odps.graph.GraphLoader; import com.aliyun.odps.graph.MutationContext; import com.aliyun.odps.graph.Vertex; import com.aliyun.odps.graph.WorkerContext; import com.aliyun.odps.graph.examples.SSSP.MinLongCombiner; import com.aliyun.odps.io.LongWritable; import com.aliyun.odps.io.NullWritable; import com.aliyun.odps.io.WritableRecord; /** * Compute the connected component membership of each vertex and output * each vertex which's value containing the smallest id in the connected * component containing that vertex. * * Algorithm: propagate the smallest vertex id along the edges to all * vertices of a connected component. * */ public class ConnectedComponents { public static class CCVertex extends Vertex<LongWritable, LongWritable, NullWritable, LongWritable> { @Override public void compute( ComputeContext<LongWritable, LongWritable, NullWritable, LongWritable> context, Iterable<LongWritable> msgs) throws IOException { if (context.getSuperstep() == 0L) { this.setValue(getId()); context.sendMessageToNeighbors(this, getValue()); return; } long minID = Long.MAX_VALUE; for (LongWritable id : msgs) { if (id.get() < minID) { minID = id.get(); } } if (minID < this.getValue().get()) { this.setValue(new LongWritable(minID)); context.sendMessageToNeighbors(this, getValue()); } else { this.voteToHalt(); } } /** * Output Table Description: * +-----------------+----------------------------------------+ * | Field | Type | Comment | * +-----------------+----------------------------------------+ * | v | bigint | vertex id | * | minID | bigint | smallest id in the connected component | * +-----------------+----------------------------------------+ */ @Override public void cleanup( WorkerContext<LongWritable, LongWritable, NullWritable, LongWritable> context) throws IOException { context.write(getId(), getValue()); } } /** * Input Table Description: * +-----------------+----------------------------------------------------+ * | Field | Type | Comment | * +-----------------+----------------------------------------------------+ * | v | bigint | vertex id | * | es | string | comma separated target vertex id of outgoing edges | * +-----------------+----------------------------------------------------+ * * Example: * For graph: * 1 ----- 2 * | | * 3 ----- 4 * Input table: * +-----------+ * | v | es | * +-----------+ * | 1 | 2,3 | * | 2 | 1,4 | * | 3 | 1,4 | * | 4 | 2,3 | * +-----------+ */ public static class CCVertexReader extends GraphLoader<LongWritable, LongWritable, NullWritable, LongWritable> { @Override public void load( LongWritable recordNum, WritableRecord record, MutationContext<LongWritable, LongWritable, NullWritable, LongWritable> context) throws IOException { CCVertex vertex = new CCVertex(); vertex.setId((LongWritable) record.get(0)); String[] edges = record.get(1).toString().split(","); for (int i = 0; i < edges.length; i++) { long destID = Long.parseLong(edges[i]); vertex.addEdge(new LongWritable(destID), NullWritable.get()); } context.addVertexRequest(vertex); } } public static void main(String[] args) throws IOException { if (args.length < 2) { System.out.println("Usage: <input> <output>"); System.exit(-1); } GraphJob job = new GraphJob(); job.setGraphLoaderClass(CCVertexReader.class); job.setVertexClass(CCVertex.class); job.setCombinerClass(MinLongCombiner.class); job.addInput(TableInfo.builder().tableName(args[0]).build()); job.addOutput(TableInfo.builder().tableName(args[1]).build()); long startTime = System.currentTimeMillis(); job.run(); System.out.println("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); } }
對於有向邊(u,v),定義全部知足 u算法步驟以下:spa
從圖中找到一個沒有入邊的頂點,並輸出。
從圖中刪除該點,及其全部出邊。
重複以上步驟,直到全部點都已輸出。
拓撲排序算法的代碼,以下所示:
import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.aliyun.odps.data.TableInfo; import com.aliyun.odps.graph.Aggregator; import com.aliyun.odps.graph.Combiner; import com.aliyun.odps.graph.ComputeContext; import com.aliyun.odps.graph.GraphJob; import com.aliyun.odps.graph.GraphLoader; import com.aliyun.odps.graph.MutationContext; import com.aliyun.odps.graph.Vertex; import com.aliyun.odps.graph.WorkerContext; import com.aliyun.odps.io.LongWritable; import com.aliyun.odps.io.NullWritable; import com.aliyun.odps.io.BooleanWritable; import com.aliyun.odps.io.WritableRecord; public class TopologySort { private final static Log LOG = LogFactory.getLog(TopologySort.class); public static class TopologySortVertex extends Vertex<LongWritable, LongWritable, NullWritable, LongWritable> { @Override public void compute( ComputeContext<LongWritable, LongWritable, NullWritable, LongWritable> context, Iterable<LongWritable> messages) throws IOException { // in superstep 0, each vertex sends message whose value is 1 to its // neighbors if (context.getSuperstep() == 0) { if (hasEdges()) { context.sendMessageToNeighbors(this, new LongWritable(1L)); } } else if (context.getSuperstep() >= 1) { // compute each vertex's indegree long indegree = getValue().get(); for (LongWritable msg : messages) { indegree += msg.get(); } setValue(new LongWritable(indegree)); if (indegree == 0) { voteToHalt(); if (hasEdges()) { context.sendMessageToNeighbors(this, new LongWritable(-1L)); } context.write(new LongWritable(context.getSuperstep()), getId()); LOG.info("vertex: " + getId()); } context.aggregate(new LongWritable(indegree)); } } } public static class TopologySortVertexReader extends GraphLoader<LongWritable, LongWritable, NullWritable, LongWritable> { @Override public void load( LongWritable recordNum, WritableRecord record, MutationContext<LongWritable, LongWritable, NullWritable, LongWritable> context) throws IOException { TopologySortVertex vertex = new TopologySortVertex(); vertex.setId((LongWritable) record.get(0)); vertex.setValue(new LongWritable(0)); String[] edges = record.get(1).toString().split(","); for (int i = 0; i < edges.length; i++) { long edge = Long.parseLong(edges[i]); if (edge >= 0) { vertex.addEdge(new LongWritable(Long.parseLong(edges[i])), NullWritable.get()); } } LOG.info(record.toString()); context.addVertexRequest(vertex); } } public static class LongSumCombiner extends Combiner<LongWritable, LongWritable> { @Override public void combine(LongWritable vertexId, LongWritable combinedMessage, LongWritable messageToCombine) throws IOException { combinedMessage.set(combinedMessage.get() + messageToCombine.get()); } } public static class TopologySortAggregator extends Aggregator<BooleanWritable> { @SuppressWarnings("rawtypes") @Override public BooleanWritable createInitialValue(WorkerContext context) throws IOException { return new BooleanWritable(true); } @Override public void aggregate(BooleanWritable value, Object item) throws IOException { boolean hasCycle = value.get(); boolean inDegreeNotZero = ((LongWritable) item).get() == 0 ? false : true; value.set(hasCycle && inDegreeNotZero); } @Override public void merge(BooleanWritable value, BooleanWritable partial) throws IOException { value.set(value.get() && partial.get()); } @SuppressWarnings("rawtypes") @Override public boolean terminate(WorkerContext context, BooleanWritable value) throws IOException { if (context.getSuperstep() == 0) { // since the initial aggregator value is true, and in superstep we don't // do aggregate return false; } return value.get(); } } public static void main(String[] args) throws IOException { if (args.length != 2) { System.out.println("Usage : <inputTable> <outputTable>"); System.exit(-1); } // 輸入表形式爲 // 0 1,2 // 1 3 // 2 3 // 3 -1 // 第一列爲vertexid,第二列爲該點邊的destination vertexid,若值爲-1,表示該點無出邊 // 輸出表形式爲 // 0 0 // 1 1 // 1 2 // 2 3 // 第一列爲supstep值,隱含了拓撲順序,第二列爲vertexid // TopologySortAggregator用來判斷圖中是否有環 // 若輸入的圖有環,則當圖中active的點入度都不爲0時,迭代結束 // 用戶能夠經過輸入表和輸出表的記錄數來判斷一個有向圖是否有環 GraphJob job = new GraphJob(); job.setGraphLoaderClass(TopologySortVertexReader.class); job.setVertexClass(TopologySortVertex.class); job.addInput(TableInfo.builder().tableName(args[0]).build()); job.addOutput(TableInfo.builder().tableName(args[1]).build()); job.setCombinerClass(LongSumCombiner.class); job.setAggregatorClass(TopologySortAggregator.class); long startTime = System.currentTimeMillis(); job.run(); System.out.println("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); } }
在統計學中,線性迴歸是用來肯定兩種或兩種以上變量間的相互依賴關係的統計分析方法,與分類算法處理離散預測不一樣。
迴歸算法可對連續值類型進行預測。線性迴歸算法定義損失函數爲樣本集的最小平方偏差之和,經過最小化損失函數求解權重矢量。
經常使用的解法是梯度降低法,流程以下:
初始化權重矢量,給定降低速率以及迭代次數(或者迭代收斂條件)。
對每一個樣本,計算最小平方偏差。
對最小平方偏差求和,根據降低速率更新權重。
重複迭代直到收斂。
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import com.aliyun.odps.data.TableInfo; import com.aliyun.odps.graph.Aggregator; import com.aliyun.odps.graph.ComputeContext; import com.aliyun.odps.graph.GraphJob; import com.aliyun.odps.graph.MutationContext; import com.aliyun.odps.graph.WorkerContext; import com.aliyun.odps.graph.Vertex; import com.aliyun.odps.graph.GraphLoader; import com.aliyun.odps.io.DoubleWritable; import com.aliyun.odps.io.LongWritable; import com.aliyun.odps.io.NullWritable; import com.aliyun.odps.io.Tuple; import com.aliyun.odps.io.Writable; import com.aliyun.odps.io.WritableRecord; /** * LineRegression input: y,x1,x2,x3,...... **/ public class LinearRegression { public static class GradientWritable implements Writable { Tuple lastTheta; Tuple currentTheta; Tuple tmpGradient; LongWritable count; DoubleWritable lost; @Override public void readFields(DataInput in) throws IOException { lastTheta = new Tuple(); lastTheta.readFields(in); currentTheta = new Tuple(); currentTheta.readFields(in); tmpGradient = new Tuple(); tmpGradient.readFields(in); count = new LongWritable(); count.readFields(in); /* update 1: add a variable to store lost at every iteration */ lost = new DoubleWritable(); lost.readFields(in); } @Override public void write(DataOutput out) throws IOException { lastTheta.write(out); currentTheta.write(out); tmpGradient.write(out); count.write(out); lost.write(out); } } public static class LinearRegressionVertex extends Vertex<LongWritable, Tuple, NullWritable, NullWritable> { @Override public void compute( ComputeContext<LongWritable, Tuple, NullWritable, NullWritable> context, Iterable<NullWritable> messages) throws IOException { context.aggregate(getValue()); } } public static class LinearRegressionVertexReader extends GraphLoader<LongWritable, Tuple, NullWritable, NullWritable> { @Override public void load(LongWritable recordNum, WritableRecord record, MutationContext<LongWritable, Tuple, NullWritable, NullWritable> context) throws IOException { LinearRegressionVertex vertex = new LinearRegressionVertex(); vertex.setId(recordNum); vertex.setValue(new Tuple(record.getAll())); context.addVertexRequest(vertex); } } public static class LinearRegressionAggregator extends Aggregator<GradientWritable> { @SuppressWarnings("rawtypes") @Override public GradientWritable createInitialValue(WorkerContext context) throws IOException { if (context.getSuperstep() == 0) { /* set initial value, all 0 */ GradientWritable grad = new GradientWritable(); grad.lastTheta = new Tuple(); grad.currentTheta = new Tuple(); grad.tmpGradient = new Tuple(); grad.count = new LongWritable(1); grad.lost = new DoubleWritable(0.0); int n = (int) Long.parseLong(context.getConfiguration() .get("Dimension")); for (int i = 0; i < n; i++) { grad.lastTheta.append(new DoubleWritable(0)); grad.currentTheta.append(new DoubleWritable(0)); grad.tmpGradient.append(new DoubleWritable(0)); } return grad; } else return (GradientWritable) context.getLastAggregatedValue(0); } public static double vecMul(Tuple value, Tuple theta) { /* perform this partial computing: y(i)−hθ(x(i)) for each sample */ /* value denote a piece of sample and value(0) is y */ double sum = 0.0; for (int j = 1; j < value.size(); j++) sum += Double.parseDouble(value.get(j).toString()) * Double.parseDouble(theta.get(j).toString()); Double tmp = Double.parseDouble(theta.get(0).toString()) + sum - Double.parseDouble(value.get(0).toString()); return tmp; } @Override public void aggregate(GradientWritable gradient, Object value) throws IOException { /* * perform on each vertex--each sample i:set theta(j) for each sample i * for each dimension */ double tmpVar = vecMul((Tuple) value, gradient.currentTheta); /* * update 2:local worker aggregate(), perform like merge() below. This * means the variable gradient denotes the previous aggregated value */ gradient.tmpGradient.set(0, new DoubleWritable( ((DoubleWritable) gradient.tmpGradient.get(0)).get() + tmpVar)); gradient.lost.set(Math.pow(tmpVar, 2)); /* * calculate (y(i)−hθ(x(i))) x(i)(j) for each sample i for each * dimension j */ for (int j = 1; j < gradient.tmpGradient.size(); j++) gradient.tmpGradient.set(j, new DoubleWritable( ((DoubleWritable) gradient.tmpGradient.get(j)).get() + tmpVar * Double.parseDouble(((Tuple) value).get(j).toString()))); } @Override public void merge(GradientWritable gradient, GradientWritable partial) throws IOException { /* perform SumAll on each dimension for all samples. */ Tuple master = (Tuple) gradient.tmpGradient; Tuple part = (Tuple) partial.tmpGradient; for (int j = 0; j < gradient.tmpGradient.size(); j++) { DoubleWritable s = (DoubleWritable) master.get(j); s.set(s.get() + ((DoubleWritable) part.get(j)).get()); } gradient.lost.set(gradient.lost.get() + partial.lost.get()); } @SuppressWarnings("rawtypes") @Override public boolean terminate(WorkerContext context, GradientWritable gradient) throws IOException { /* * 1\. calculate new theta 2\. judge the diff between last step and this * step, if smaller than the threshold, stop iteration */ gradient.lost = new DoubleWritable(gradient.lost.get() / (2 * context.getTotalNumVertices())); /* * we can calculate lost in order to make sure the algorithm is running on * the right direction (for debug) */ System.out.println(gradient.count + " lost:" + gradient.lost); Tuple tmpGradient = gradient.tmpGradient; System.out.println("tmpGra" + tmpGradient); Tuple lastTheta = gradient.lastTheta; Tuple tmpCurrentTheta = new Tuple(gradient.currentTheta.size()); System.out.println(gradient.count + " terminate_start_last:" + lastTheta); double alpha = 0.07; // learning rate // alpha = // Double.parseDouble(context.getConfiguration().get("Alpha")); /* perform theta(j) = theta(j)-alpha*tmpGradient */ long M = context.getTotalNumVertices(); /* * update 3: add (/M) on the code. The original code forget this step */ for (int j = 0; j < lastTheta.size(); j++) { tmpCurrentTheta .set( j, new DoubleWritable(Double.parseDouble(lastTheta.get(j) .toString()) - alpha / M * Double.parseDouble(tmpGradient.get(j).toString()))); } System.out.println(gradient.count + " terminate_start_current:" + tmpCurrentTheta); // judge if convergence is happening. double diff = 0.00d; for (int j = 0; j < gradient.currentTheta.size(); j++) diff += Math.pow(((DoubleWritable) tmpCurrentTheta.get(j)).get() - ((DoubleWritable) lastTheta.get(j)).get(), 2); if (/* * Math.sqrt(diff) < 0.00000000005d || */Long.parseLong(context.getConfiguration().get("Max_Iter_Num")) == gradient.count .get()) { context.write(gradient.currentTheta.toArray()); return true; } gradient.lastTheta = tmpCurrentTheta; gradient.currentTheta = tmpCurrentTheta; gradient.count.set(gradient.count.get() + 1); int n = (int) Long.parseLong(context.getConfiguration().get("Dimension")); /* * update 4: Important!!! Remember this step. Graph won't reset the * initial value for global variables at the beginning of each iteration */ for (int i = 0; i < n; i++) { gradient.tmpGradient.set(i, new DoubleWritable(0)); } return false; } } public static void main(String[] args) throws IOException { GraphJob job = new GraphJob(); job.setGraphLoaderClass(LinearRegressionVertexReader.class); job.setRuntimePartitioning(false); job.setNumWorkers(3); job.setVertexClass(LinearRegressionVertex.class); job.setAggregatorClass(LinearRegressionAggregator.class); job.addInput(TableInfo.builder().tableName(args[0]).build()); job.addOutput(TableInfo.builder().tableName(args[1]).build()); job.setMaxIteration(Integer.parseInt(args[2])); // Numbers of Iteration job.setInt("Max_Iter_Num", Integer.parseInt(args[2])); job.setInt("Dimension", Integer.parseInt(args[3])); // Dimension job.setFloat("Alpha", Float.parseFloat(args[4])); // Learning rate long start = System.currentTimeMillis(); job.run(); System.out.println("Job Finished in " + (System.currentTimeMillis() - start) / 1000.0 + " seconds"); } }
三角形計數算法用於計算經過每一個頂點的三角形個數。
算法實現的流程以下:
每一個頂點將其 ID 發送給全部出邊鄰居。
存儲入邊和出邊鄰居併發送給出邊鄰居。
對每條邊計算其終點的交集數量,並求和,結果輸出到表。
將表中的輸出結果求和併除以三,即獲得三角形個數。
三角形計數算法的代碼,以下所示:
import java.io.IOException; import com.aliyun.odps.data.TableInfo; import com.aliyun.odps.graph.ComputeContext; import com.aliyun.odps.graph.Edge; import com.aliyun.odps.graph.GraphJob; import com.aliyun.odps.graph.GraphLoader; import com.aliyun.odps.graph.MutationContext; import com.aliyun.odps.graph.Vertex; import com.aliyun.odps.graph.WorkerContext; import com.aliyun.odps.io.LongWritable; import com.aliyun.odps.io.NullWritable; import com.aliyun.odps.io.Tuple; import com.aliyun.odps.io.Writable; import com.aliyun.odps.io.WritableRecord; /** * Compute the number of triangles passing through each vertex. * * The algorithm can be computed in three supersteps: * I. Each vertex sends a message with its ID to all its outgoing * neighbors. * II. The incoming neighbors and outgoing neighbors are stored and * send to outgoing neighbors. * III. For each edge compute the intersection of the sets at destination * vertex and sum them, then output to table. * * The triangle count is the sum of output table and divide by three since * each triangle is counted three times. * **/ public class TriangleCount { public static class TCVertex extends Vertex<LongWritable, Tuple, NullWritable, Tuple> { @Override public void setup( WorkerContext<LongWritable, Tuple, NullWritable, Tuple> context) throws IOException { // collect the outgoing neighbors Tuple t = new Tuple(); if (this.hasEdges()) { for (Edge<LongWritable, NullWritable> edge : this.getEdges()) { t.append(edge.getDestVertexId()); } } this.setValue(t); } @Override public void compute( ComputeContext<LongWritable, Tuple, NullWritable, Tuple> context, Iterable<Tuple> msgs) throws IOException { if (context.getSuperstep() == 0L) { // sends a message with its ID to all its outgoing neighbors Tuple t = new Tuple(); t.append(getId()); context.sendMessageToNeighbors(this, t); } else if (context.getSuperstep() == 1L) { // store the incoming neighbors for (Tuple msg : msgs) { for (Writable item : msg.getAll()) { if (!this.getValue().getAll().contains((LongWritable)item)) { this.getValue().append((LongWritable)item); } } } // send both incoming and outgoing neighbors to all outgoing neighbors context.sendMessageToNeighbors(this, getValue()); } else if (context.getSuperstep() == 2L) { // count the sum of intersection at each edge long count = 0; for (Tuple msg : msgs) { for (Writable id : msg.getAll()) { if (getValue().getAll().contains(id)) { count ++; } } } // output to table context.write(getId(), new LongWritable(count)); this.voteToHalt(); } } } public static class TCVertexReader extends GraphLoader<LongWritable, Tuple, NullWritable, Tuple> { @Override public void load( LongWritable recordNum, WritableRecord record, MutationContext<LongWritable, Tuple, NullWritable, Tuple> context) throws IOException { TCVertex vertex = new TCVertex(); vertex.setId((LongWritable) record.get(0)); String[] edges = record.get(1).toString().split(","); for (int i = 0; i < edges.length; i++) { try { long destID = Long.parseLong(edges[i]); vertex.addEdge(new LongWritable(destID), NullWritable.get()); } catch(NumberFormatException nfe) { System.err.println("Ignore " + nfe); } } context.addVertexRequest(vertex); } } public static void main(String[] args) throws IOException { if (args.length < 2) { System.out.println("Usage: <input> <output>"); System.exit(-1); } GraphJob job = new GraphJob(); job.setGraphLoaderClass(TCVertexReader.class); job.setVertexClass(TCVertex.class); job.addInput(TableInfo.builder().tableName(args[0]).build()); job.addOutput(TableInfo.builder().tableName(args[1]).build()); long startTime = System.currentTimeMillis(); job.run(); System.out.println("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); } }
輸入點表的代碼,以下所示:
import java.io.IOException; import com.aliyun.odps.conf.Configuration; import com.aliyun.odps.data.TableInfo; import com.aliyun.odps.graph.ComputeContext; import com.aliyun.odps.graph.GraphJob; import com.aliyun.odps.graph.GraphLoader; import com.aliyun.odps.graph.Vertex; import com.aliyun.odps.graph.VertexResolver; import com.aliyun.odps.graph.MutationContext; import com.aliyun.odps.graph.VertexChanges; import com.aliyun.odps.graph.Edge; import com.aliyun.odps.io.LongWritable; import com.aliyun.odps.io.WritableComparable; import com.aliyun.odps.io.WritableRecord; /** * 本示例是用於展現,對於不一樣類型的數據類型,如何編寫圖做業程序載入數據。主要展現GraphLoader和 * VertexResolver的配合完成圖的構建。 * * ODPS Graph的做業輸入都爲ODPS的Table,假設做業輸入有兩張表,一張存儲點的信息,一張存儲邊的信息。 * 存儲點信息的表的格式,如: * +------------------------+ * | VertexID | VertexValue | * +------------------------+ * | id0| 9| * +------------------------+ * | id1| 7| * +------------------------+ * | id2| 8| * +------------------------+ * * 存儲邊信息的表的格式,如 * +-----------------------------------+ * | VertexID | DestVertexID| EdgeValue| * +-----------------------------------+ * | id0| id1| 1| * +-----------------------------------+ * | id0| id2| 2| * +-----------------------------------+ * | id2| id1| 3| * +-----------------------------------+ * * 結合兩張表的數據,表示id0有兩條出邊,分別指向id1和id2;id2有一條出邊,指向id1;id1沒有出邊。 * * 對於此種類型的數據,在GraphLoader::load(LongWritable, Record, MutationContext) * ,可使用 MutationContext#addVertexRequest(Vertex)向圖中請求添加點,使用 * link MutationContext#addEdgeRequest(WritableComparable, Edge)向圖中請求添加邊,而後,在 * link VertexResolver#resolve(WritableComparable, Vertex, VertexChanges, boolean) * 中,將load 方法中添加的點和邊,合併到一個Vertex對象中,做爲返回值,添加到最後參與計算的圖中。 * **/ public class VertexInputFormat { private final static String EDGE_TABLE = "edge.table"; /** * 將Record解釋爲Vertex和Edge,每一個Record根據其來源,表示一個Vertex或者一條Edge。 * * 相似於com.aliyun.odps.mapreduce.Mapper#map * ,輸入Record,生成鍵值對,此處的鍵是Vertex的ID, * 值是Vertex或Edge,經過上下文Context寫出,這些鍵值對會在LoadingVertexResolver出根據Vertex的ID彙總。 * * 注意:此處添加的點或邊只是根據Record內容發出的請求,並非最後參與計算的點或邊,只有在隨後的VertexResolver * 中添加的點或邊才參與計算。 **/ public static class VertexInputLoader extends GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> { private boolean isEdgeData; /** * 配置VertexInputLoader。 * * @param conf * 做業的配置參數,在main中使用GraphJob配置的,或者在console中set的 * @param workerId * 當前工做的worker的序號,從0開始,能夠用於構造惟一的vertex id * @param inputTableInfo * 當前worker載入的輸入表信息,能夠用於肯定當前輸入是哪一種類型的數據,即Record的格式 **/ @Override public void setup(Configuration conf, int workerId, TableInfo inputTableInfo) { isEdgeData = conf.get(EDGE_TABLE).equals(inputTableInfo.getTableName()); } /** * 根據Record中的內容,解析爲對應的邊,並請求添加到圖中。 * * @param recordNum * 記錄序列號,從1開始,每一個worker上單獨計數 * @param record * 輸入表中的記錄,三列,分別表示初點、終點、邊的權重 * @param context * 上下文,請求將解釋後的邊添加到圖中 **/ @Override public void load( LongWritable recordNum, WritableRecord record, MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context) throws IOException { if (isEdgeData) { /** * 數據來源於存儲邊信息的表。 * * 一、第一列表示初始點的ID **/ LongWritable sourceVertexID = (LongWritable) record.get(0); /** * 二、第二列表示終點的ID **/ LongWritable destinationVertexID = (LongWritable) record.get(1); /** * 三、地三列表示邊的權重 **/ LongWritable edgeValue = (LongWritable) record.get(2); /** * 四、建立邊,由終點ID和邊的權重組成 **/ Edge<LongWritable, LongWritable> edge = new Edge<LongWritable, LongWritable>( destinationVertexID, edgeValue); /** * 五、請求給初始點添加邊 **/ context.addEdgeRequest(sourceVertexID, edge); /** * 六、若是每條Record表示雙向邊,重複4與5 Edge<LongWritable, LongWritable> edge2 = new * Edge<LongWritable, LongWritable>( sourceVertexID, edgeValue); * context.addEdgeRequest(destinationVertexID, edge2); **/ } else { /** * 數據來源於存儲點信息的表。 * * 一、第一列表示點的ID **/ LongWritable vertexID = (LongWritable) record.get(0); /** * 二、第二列表示點的值 **/ LongWritable vertexValue = (LongWritable) record.get(1); /** * 三、建立點,由點的ID和點的值組成 **/ MyVertex vertex = new MyVertex(); /** * 四、初始化點 **/ vertex.setId(vertexID); vertex.setValue(vertexValue); /** * 五、請求添加點 **/ context.addVertexRequest(vertex); } } } /** * 彙總GraphLoader::load(LongWritable, Record, MutationContext)生成的鍵值對,相似於 * com.aliyun.odps.mapreduce.Reducer中的reduce。對於惟一的Vertex ID,全部關於這個ID上 * 添加\刪除、點\邊的行爲都會放在VertexChanges中。 * * 注意:此處並不僅針對load方法中添加的有衝突的點或邊才調用(衝突是指添加多個相同Vertex對象,添加劇復邊等), * 全部在load方法中請求生成的ID都會在此處被調用。 **/ public static class LoadingResolver extends VertexResolver<LongWritable, LongWritable, LongWritable, LongWritable> { /** * 處理關於一個ID的添加或刪除、點或邊的請求。 * * VertexChanges有四個接口,分別與MutationContext的四個接口對應: * VertexChanges::getAddedVertexList()與 * MutationContext::addVertexRequest(Vertex)對應, * 在load方法中,請求添加的ID相同的Vertex對象,會被彙總在返回的List中 * VertexChanges::getAddedEdgeList()與 * MutationContext::addEdgeRequest(WritableComparable, Edge) * 對應,請求添加的初始點ID相同的Edge對象,會被彙總在返回的List中 * VertexChanges::getRemovedVertexCount()與 * MutationContext::removeVertexRequest(WritableComparable) * 對應,請求刪除的ID相同的Vertex,彙總的請求刪除的次數做爲返回值 * VertexChanges#getRemovedEdgeList()與 * MutationContext#removeEdgeRequest(WritableComparable, WritableComparable) * 對應,請求刪除的初始點ID相同的Edge對象,會被彙總在返回的List中 * * 用戶經過處理關於這個ID的變化,經過返回值聲明此ID是否參與計算,若是返回的Vertex不爲null, * 則此ID會參與隨後的計算,若是返回null,則不會參與計算。 * * @param vertexId * 請求添加的點的ID,或請求添加的邊的初點ID * @param vertex * 已存在的Vertex對象,數據載入階段,始終爲null * @param vertexChanges * 此ID上的請求添加\刪除、點\邊的集合 * @param hasMessages * 此ID是否有輸入消息,數據載入階段,始終爲false **/ @Override public Vertex<LongWritable, LongWritable, LongWritable, LongWritable> resolve( LongWritable vertexId, Vertex<LongWritable, LongWritable, LongWritable, LongWritable> vertex, VertexChanges<LongWritable, LongWritable, LongWritable, LongWritable> vertexChanges, boolean hasMessages) throws IOException { /** * 一、獲取Vertex對象,做爲參與計算的點。 **/ MyVertex computeVertex = null; if (vertexChanges.getAddedVertexList() == null || vertexChanges.getAddedVertexList().isEmpty()) { computeVertex = new MyVertex(); computeVertex.setId(vertexId); } else { /** * 此處假設存儲點信息的表中,每一個Record表示惟一的點。 **/ computeVertex = (MyVertex) vertexChanges.getAddedVertexList().get(0); } /** * 二、將請求給此點添加的邊,添加到Vertex對象中,若是數據有重複的可能,根據算法須要決定是否去重。 **/ if (vertexChanges.getAddedEdgeList() != null) { for (Edge<LongWritable, LongWritable> edge : vertexChanges .getAddedEdgeList()) { computeVertex.addEdge(edge.getDestVertexId(), edge.getValue()); } } /** * 三、將Vertex對象返回,添加到最終的圖中參與計算。 **/ return computeVertex; } } /** * 肯定參與計算的Vertex的行爲。 * **/ public static class MyVertex extends Vertex<LongWritable, LongWritable, LongWritable, LongWritable> { /** * 將vertex的邊,按照輸入表的格式再寫到結果表。輸入表與輸出表的格式和數據都相同。 * * @param context * 運行時上下文 * @param messages * 輸入消息 **/ @Override public void compute( ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context, Iterable<LongWritable> messages) throws IOException { /** * 將點的ID和值,寫到存儲點的結果表 **/ context.write("vertex", getId(), getValue()); /** * 將點的邊,寫到存儲邊的結果表 **/ if (hasEdges()) { for (Edge<LongWritable, LongWritable> edge : getEdges()) { context.write("edge", getId(), edge.getDestVertexId(), edge.getValue()); } } /** * 只迭代一輪 **/ voteToHalt(); } } /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { if (args.length < 4) { throw new IOException( "Usage: VertexInputFormat <vertex input> <edge input> <vertex output> <edge output>"); } /** * GraphJob用於對Graph做業進行配置 */ GraphJob job = new GraphJob(); /** * 一、指定輸入的圖數據,並指定存儲邊數據所在的表。 */ job.addInput(TableInfo.builder().tableName(args[0]).build()); job.addInput(TableInfo.builder().tableName(args[1]).build()); job.set(EDGE_TABLE, args[1]); /** * 二、指定載入數據的方式,將Record解釋爲Edge,此處相似於map,生成的 key爲vertex的ID,value爲edge。 */ job.setGraphLoaderClass(VertexInputLoader.class); /** * 三、指定載入數據階段,生成參與計算的vertex。此處相似於reduce,將map 生成的edge合併成一個vertex。 */ job.setLoadingVertexResolverClass(LoadingResolver.class); /** * 四、指定參與計算的vertex的行爲。每輪迭代執行vertex.compute方法。 */ job.setVertexClass(MyVertex.class); /** * 五、指定圖做業的輸出表,將計算生成的結果寫到結果表中。 */ job.addOutput(TableInfo.builder().tableName(args[2]).label("vertex").build()); job.addOutput(TableInfo.builder().tableName(args[3]).label("edge").build()); /** * 六、提交做業執行。 */ job.run(); } }
輸入邊表的代碼,以下所示:
import java.io.IOException; import com.aliyun.odps.conf.Configuration; import com.aliyun.odps.data.TableInfo; import com.aliyun.odps.graph.ComputeContext; import com.aliyun.odps.graph.GraphJob; import com.aliyun.odps.graph.GraphLoader; import com.aliyun.odps.graph.Vertex; import com.aliyun.odps.graph.VertexResolver; import com.aliyun.odps.graph.MutationContext; import com.aliyun.odps.graph.VertexChanges; import com.aliyun.odps.graph.Edge; import com.aliyun.odps.io.LongWritable; import com.aliyun.odps.io.WritableComparable; import com.aliyun.odps.io.WritableRecord; /** * 本示例是用於展現,對於不一樣類型的數據類型,如何編寫圖做業程序載入數據。主要展現GraphLoader和 * VertexResolver的配合完成圖的構建。 * * ODPS Graph的做業輸入都爲ODPS的Table,假設做業輸入有兩張表,一張存儲點的信息,一張存儲邊的信息。 * 存儲點信息的表的格式,如: * +------------------------+ * | VertexID | VertexValue | * +------------------------+ * | id0| 9| * +------------------------+ * | id1| 7| * +------------------------+ * | id2| 8| * +------------------------+ * * 存儲邊信息的表的格式,如 * +-----------------------------------+ * | VertexID | DestVertexID| EdgeValue| * +-----------------------------------+ * | id0| id1| 1| * +-----------------------------------+ * | id0| id2| 2| * +-----------------------------------+ * | id2| id1| 3| * +-----------------------------------+ * * 結合兩張表的數據,表示id0有兩條出邊,分別指向id1和id2;id2有一條出邊,指向id1;id1沒有出邊。 * * 對於此種類型的數據,在GraphLoader::load(LongWritable, Record, MutationContext) * ,可使用 MutationContext#addVertexRequest(Vertex)向圖中請求添加點,使用 * link MutationContext#addEdgeRequest(WritableComparable, Edge)向圖中請求添加邊,而後,在 * link VertexResolver#resolve(WritableComparable, Vertex, VertexChanges, boolean) * 中,將load 方法中添加的點和邊,合併到一個Vertex對象中,做爲返回值,添加到最後參與計算的圖中。 * **/ public class VertexInputFormat { private final static String EDGE_TABLE = "edge.table"; /** * 將Record解釋爲Vertex和Edge,每一個Record根據其來源,表示一個Vertex或者一條Edge。 * <p> * 相似於com.aliyun.odps.mapreduce.Mapper#map * ,輸入Record,生成鍵值對,此處的鍵是Vertex的ID, * 值是Vertex或Edge,經過上下文Context寫出,這些鍵值對會在LoadingVertexResolver出根據Vertex的ID彙總。 * * 注意:此處添加的點或邊只是根據Record內容發出的請求,並非最後參與計算的點或邊,只有在隨後的VertexResolver * 中添加的點或邊才參與計算。 **/ public static class VertexInputLoader extends GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> { private boolean isEdgeData; /** * 配置VertexInputLoader。 * * @param conf * 做業的配置參數,在main中使用GraphJob配置的,或者在console中set的 * @param workerId * 當前工做的worker的序號,從0開始,能夠用於構造惟一的vertex id * @param inputTableInfo * 當前worker載入的輸入表信息,能夠用於肯定當前輸入是哪一種類型的數據,即Record的格式 **/ @Override public void setup(Configuration conf, int workerId, TableInfo inputTableInfo) { isEdgeData = conf.get(EDGE_TABLE).equals(inputTableInfo.getTableName()); } /** * 根據Record中的內容,解析爲對應的邊,並請求添加到圖中。 * * @param recordNum * 記錄序列號,從1開始,每一個worker上單獨計數 * @param record * 輸入表中的記錄,三列,分別表示初點、終點、邊的權重 * @param context * 上下文,請求將解釋後的邊添加到圖中 **/ @Override public void load( LongWritable recordNum, WritableRecord record, MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context) throws IOException { if (isEdgeData) { /** * 數據來源於存儲邊信息的表。 * * 一、第一列表示初始點的ID **/ LongWritable sourceVertexID = (LongWritable) record.get(0); /** * 二、第二列表示終點的ID **/ LongWritable destinationVertexID = (LongWritable) record.get(1); /** * 三、地三列表示邊的權重 **/ LongWritable edgeValue = (LongWritable) record.get(2); /** * 四、建立邊,由終點ID和邊的權重組成 **/ Edge<LongWritable, LongWritable> edge = new Edge<LongWritable, LongWritable>( destinationVertexID, edgeValue); /** * 五、請求給初始點添加邊 **/ context.addEdgeRequest(sourceVertexID, edge); /** * 六、若是每條Record表示雙向邊,重複4與5 Edge<LongWritable, LongWritable> edge2 = new * Edge<LongWritable, LongWritable>( sourceVertexID, edgeValue); * context.addEdgeRequest(destinationVertexID, edge2); **/ } else { /** * 數據來源於存儲點信息的表。 * * 一、第一列表示點的ID **/ LongWritable vertexID = (LongWritable) record.get(0); /** * 二、第二列表示點的值 **/ LongWritable vertexValue = (LongWritable) record.get(1); /** * 三、建立點,由點的ID和點的值組成 **/ MyVertex vertex = new MyVertex(); /** * 四、初始化點 **/ vertex.setId(vertexID); vertex.setValue(vertexValue); /** * 五、請求添加點 **/ context.addVertexRequest(vertex); } } } /** * 彙總GraphLoader::load(LongWritable, Record, MutationContext)生成的鍵值對,相似於 * com.aliyun.odps.mapreduce.Reducer中的reduce。對於惟一的Vertex ID,全部關於這個ID上 * 添加\刪除、點\邊的行爲都會放在VertexChanges中。 * * 注意:此處並不僅針對load方法中添加的有衝突的點或邊才調用(衝突是指添加多個相同Vertex對象,添加劇復邊等), * 全部在load方法中請求生成的ID都會在此處被調用。 **/ public static class LoadingResolver extends VertexResolver<LongWritable, LongWritable, LongWritable, LongWritable> { /** * 處理關於一個ID的添加或刪除、點或邊的請求。 * * VertexChanges有四個接口,分別與MutationContext的四個接口對應: * VertexChanges::getAddedVertexList()與 * MutationContext::addVertexRequest(Vertex)對應, * 在load方法中,請求添加的ID相同的Vertex對象,會被彙總在返回的List中 * VertexChanges::getAddedEdgeList()與 * MutationContext::addEdgeRequest(WritableComparable, Edge) * 對應,請求添加的初始點ID相同的Edge對象,會被彙總在返回的List中 * VertexChanges::getRemovedVertexCount()與 * MutationContext::removeVertexRequest(WritableComparable) * 對應,請求刪除的ID相同的Vertex,彙總的請求刪除的次數做爲返回值 * VertexChanges#getRemovedEdgeList()與 * MutationContext#removeEdgeRequest(WritableComparable, WritableComparable) * 對應,請求刪除的初始點ID相同的Edge對象,會被彙總在返回的List中 * * 用戶經過處理關於這個ID的變化,經過返回值聲明此ID是否參與計算,若是返回的Vertex不爲null, * 則此ID會參與隨後的計算,若是返回null,則不會參與計算。 * * @param vertexId * 請求添加的點的ID,或請求添加的邊的初點ID * @param vertex * 已存在的Vertex對象,數據載入階段,始終爲null * @param vertexChanges * 此ID上的請求添加\刪除、點\邊的集合 * @param hasMessages * 此ID是否有輸入消息,數據載入階段,始終爲false **/ @Override public Vertex<LongWritable, LongWritable, LongWritable, LongWritable> resolve( LongWritable vertexId, Vertex<LongWritable, LongWritable, LongWritable, LongWritable> vertex, VertexChanges<LongWritable, LongWritable, LongWritable, LongWritable> vertexChanges, boolean hasMessages) throws IOException { /** * 一、獲取Vertex對象,做爲參與計算的點。 **/ MyVertex computeVertex = null; if (vertexChanges.getAddedVertexList() == null || vertexChanges.getAddedVertexList().isEmpty()) { computeVertex = new MyVertex(); computeVertex.setId(vertexId); } else { /** * 此處假設存儲點信息的表中,每一個Record表示惟一的點。 **/ computeVertex = (MyVertex) vertexChanges.getAddedVertexList().get(0); } /** * 二、將請求給此點添加的邊,添加到Vertex對象中,若是數據有重複的可能,根據算法須要決定是否去重。 **/ if (vertexChanges.getAddedEdgeList() != null) { for (Edge<LongWritable, LongWritable> edge : vertexChanges .getAddedEdgeList()) { computeVertex.addEdge(edge.getDestVertexId(), edge.getValue()); } } /** * 三、將Vertex對象返回,添加到最終的圖中參與計算。 **/ return computeVertex; } } /** * 肯定參與計算的Vertex的行爲。 * **/ public static class MyVertex extends Vertex<LongWritable, LongWritable, LongWritable, LongWritable> { /** * 將vertex的邊,按照輸入表的格式再寫到結果表。輸入表與輸出表的格式和數據都相同。 * * @param context * 運行時上下文 * @param messages * 輸入消息 **/ @Override public void compute( ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context, Iterable<LongWritable> messages) throws IOException { /** * 將點的ID和值,寫到存儲點的結果表 **/ context.write("vertex", getId(), getValue()); /** * 將點的邊,寫到存儲邊的結果表 **/ if (hasEdges()) { for (Edge<LongWritable, LongWritable> edge : getEdges()) { context.write("edge", getId(), edge.getDestVertexId(), edge.getValue()); } } /** * 只迭代一輪 **/ voteToHalt(); } } /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { if (args.length < 4) { throw new IOException( "Usage: VertexInputFormat <vertex input> <edge input> <vertex output> <edge output>"); } /** * GraphJob用於對Graph做業進行配置 */ GraphJob job = new GraphJob(); /** * 一、指定輸入的圖數據,並指定存儲邊數據所在的表。 */ job.addInput(TableInfo.builder().tableName(args[0]).build()); job.addInput(TableInfo.builder().tableName(args[1]).build()); job.set(EDGE_TABLE, args[1]); /** * 二、指定載入數據的方式,將Record解釋爲Edge,此處相似於map,生成的 key爲vertex的ID,value爲edge。 */ job.setGraphLoaderClass(VertexInputLoader.class); /** * 三、指定載入數據階段,生成參與計算的vertex。此處相似於reduce,將map 生成的edge合併成一個vertex。 */ job.setLoadingVertexResolverClass(LoadingResolver.class); /** * 四、指定參與計算的vertex的行爲。每輪迭代執行vertex.compute方法。 */ job.setVertexClass(MyVertex.class); /** * 五、指定圖做業的輸出表,將計算生成的結果寫到結果表中。 */ job.addOutput(TableInfo.builder().tableName(args[2]).label("vertex").build()); job.addOutput(TableInfo.builder().tableName(args[3]).label("edge").build()); /** * 六、提交做業執行。 */ job.run(); } }
本文做者:雲花
本文爲雲棲社區原創內容,未經容許不得轉載。