MaxCompute 圖計算用戶手冊(上)

概要

ODPS GRAPH是一套面向迭代的圖計算處理框架。圖計算做業使用圖進行建模,圖由點(Vertex)和邊(Edge)組成,點和邊包含權值(Value),ODPS GRAPH支持下述圖編輯操做:html

  • 修改點或邊的權值;
  • 增長/刪除點;
  • 增長/刪除邊;

備註:java

  • 編輯點和邊時,點與邊的關係須要用戶維護。

經過迭代對圖進行編輯、演化,最終求解出結果,典型應用:PageRank單源最短距離算法 ,K-均值聚類算法 等等。用戶可使用 ODPS GRAPH 提供的接口Java SDK編寫圖計算程序。node

[]()Graph數據結構

ODPS GRAPH可以處理的圖必須是是一個由點(Vertex)和邊(Edge)組成的有向圖。因爲ODPS僅提供二維表的存儲結構,所以須要用戶自行將圖數據分解爲二維表格式存儲在ODPS中,在進行圖計算分析時,使用自定義的GraphLoader將二維表數據轉換爲ODPS Graph引擎中的點和邊。至於如何將圖數據分解爲二維表格式,用戶能夠根據各自的業務場景作決定。在 示例程序 中,咱們給出的示例分別使用不一樣的表格式來表達圖的數據結構,僅供你們參考。
點的結構能夠簡單表示爲 < ID, Value, Halted, Edges >,分別表示點標識符(ID),權值(Value),狀態(Halted, 表示是否要中止迭代),出邊集合(Edges,以該點爲起始點的全部邊列表)。邊的結構能夠簡單表示爲,分別表示目標點(DestVertexID)和權值(Value)。linux


例如,上圖由下面的點組成:git

Vertex  
v0 <0, 0, false, [ <1, 5 >, <2, 10 > ] >
v1 <1, 5, false, [ <2, 3>, <3, 2>, <5, 9>]>
v2 <2, 8, false, [<1, 2>, <5, 1 >]>
v3 <3, Long.MAX_VALUE, false, [<0, 7>, <5, 6>]>
v5 <5, Long.MAX_VALUE, false, [<3, 4 > ]>

 

[]()Graph 程序邏輯

 

[]()1. 加載圖:

圖加載:框架調用用戶自定義的GraphLoader將輸入表的記錄解析爲點或邊;分佈式化:框架調用用戶自定義的Partitioner對點進行分片(默認分片邏輯:點ID哈希值而後對Worker數取模),分配到相應的Worker;github


例如,上圖假設Worker數是2,那麼v0, v2會被分配到Worker0,由於ID對2取模結果爲0,而v1, v3, v5將被分配到Worker1,ID對2取模結果爲1;算法

[]()2. 迭代計算:

  • 一次迭代爲一個」超步」(SuperStep),遍歷全部非結束狀態(Halted值爲false)的點或者收到消息的點(處於結束狀態的點收到信息會被自動喚醒),並調用其compute(ComputeContext context, Iterable messages)方法;
  • 在用戶實現的compute(ComputeContext context, Iterable messages)方法中:apache

    • 處理上一個超步發給當前點的消息(Messages);
    • 根據須要對圖進行編輯:1). 修改點/邊的取值;2). 發送消息給某些點;3). 增長/刪除點或邊;
    • 經過Aggregator彙總信息到全局信息;
    • 設置當前點狀態,結束或非結束狀態;
    • 迭代進行過程當中,框架會將消息以異步的方式發送到對應Worker並在下一個超步進行處理,用戶無需關心;

[]()3. 迭代終止(知足如下任意一條):

  • 全部點處於結束狀態(Halted值爲true)且沒有新消息產生;
  • 達到最大迭代次數;
  • 某個Aggregator的terminate方法返回true;

僞代碼描述以下:json

// 1. load
for each record in input_table {
  GraphLoader.load();
}
// 2. setup
WorkerComputer.setup();
for each aggr in aggregators {
  aggr.createStartupValue();
}
for each v in vertices {
  v.setup();
}
// 3. superstep
for (step = 0; step < max; step ++) {
  for each aggr in aggregators {
    aggr.createInitialValue();
  }
  for each v in vertices {
     v.compute();
   }
}
// 4. cleanup
for each v in vertices {
  v.cleanup();
}
WorkerComputer.cleanup();

 

 

Aggregator

Aggregator是ODPS-GRAPH做業中經常使用的feature之一,特別是解決機器學習問題時。ODPS-GRAPH中Aggregator用於彙總並處理全局信息。本文將詳細介紹的Aggregator的執行機制、相關API,並以Kmeans Clustering爲例子說明Aggregator的具體用法。windows

Aggregator機制

Aggregator的邏輯分兩部分,一部分在全部Worker上執行,即分佈式執行,另外一部分只在AggregatorOwner所在Worker上執行,即單點。其中在全部Worker上執行的操做包括建立初始值及局部聚合,而後將局部聚合結果發送給AggregatorOwner所在Worker上。AggregatorOwner所在Worker上聚合普通Worker發送過來的局部聚合對象,獲得全局聚合結果,而後判斷迭代是否結束。全局聚合的結果會在下一輪超步分發給全部Worker,供下一輪迭代使用。 以下圖所示 :

Aggregator的API

Aggregator共提供了五個API供用戶實現。下面逐個介紹5個API的調用時機及常規用途。

  1. createStartupValue(context)
    該API在全部Worker上執行一次,調用時機是全部超步開始以前,一般用以初始化AggregatorValue。在第0輪超步中,調用WorkerContext.getLastAggregatedValue() 或ComputeContext.getLastAggregatedValue()能夠獲取該API初始化的AggregatorValue對象。
  2. createInitialValue(context)
    該API在全部Worker上每輪超步開始時調用一次,用以初始化本輪迭代所用的AggregatorValue。一般操做是經過WorkerContext.getLastAggregatedValue() 獲得上一輪迭代的結果,而後執行部分初始化操做。
  3. aggregate(value, item)
    該API一樣在全部Worker上執行,與上述API不一樣的是,該API由用戶顯示調用ComputeContext#aggregate(item)來觸發,而上述兩個API,則由框架自動調用。該API用以執行局部聚合操做,其中第一個參數value是本Worker在該輪超步已經聚合的結果(初始值是createInitialValue返回的對象),第二個參數是用戶代碼調用ComputeContext#aggregate(item)傳入的參數。該API中一般用item來更新value實現聚合。全部aggregate執行完後,獲得的value就是該Worker的局部聚合結果,而後由框架發送給AggregatorOwner所在的Worker。
  4. merge(value, partial)
    該API執行於AggregatorOwner所在Worker,用以合併各Worker局部聚合的結果,達到全局聚合對象。與aggregate相似,value是已經聚合的結果,而partial待聚合的對象,一樣用partial更新value。
    假定有3個worker,分別是w0、w一、w2,其局部聚合結果是p0、p一、p2。假定發送到AggregatorOwner所在Worker的順序爲p一、p0、p2。那麼merge執行次序爲,首先執行merge(p1, p0),這樣p1和p0就聚合爲p1',而後執行merge(p1', p2),p1'和p2聚合爲p1'',而p1''即爲本輪超步全局聚合的結果。
    從上述示例能夠看出,當只有一個worker時,不須要執行merge方法,也就是說merge()不會被調用。
  5. terminate(context, value)
    當AggregatorOwner所在Worker執行完merge()後,框架會調用terminate(context, value)執行最後的處理。其中第二個參數value,即爲merge()最後獲得全局聚合,在該方法中能夠對全局聚合繼續修改。執行完terminate()後,框架會將全局聚合對象分發給全部Worker,供下一輪超步使用。
    terminate()方法的一個特殊之處在於,若是返回true,則整個做業就結束迭代,不然繼續執行。在機器學習場景中,一般判斷收斂後返回true以結束做業。
     

Kmeans Clustering示例

下面以典型的KmeansClustering做爲示例,來看下Aggregator具體用法。附件有完整代碼,這裏咱們逐個部分解析代碼。

  1. GraphLoader部分
    GraphLoader部分用以加載輸入表,並轉換爲圖的點或邊。這裏咱們輸入表的每行數據爲一個樣本,一個樣本構造一個點,並用Vertex的value來存放樣本。
    咱們首先定義一個Writable類KmeansValue做爲Vertex的value類型:
    `java
  2. static class KmeansValue implements Writable {

DenseVector sample;
public KmeansValue() { 
}
public KmeansValue(DenseVector v) {
this.sample = v;
}
@Override
public void write(DataOutput out) throws IOException {
wirteForDenseVector(out, sample);
}
@Override
public void readFields(DataInput in) throws IOException {
sample = readFieldsForDenseVector(in);
}
}

KmeansValue中封裝一個DenseVector對象來存放一個樣本,這裏DenseVector類型來自[matrix-toolkits-java](https://github.com/fommil/matrix-toolkits-java/),而wirteForDenseVector()及readFieldsForDenseVector()用以實現序列化及反序列化,可參見附件中的完整代碼。<br />咱們自定義的KmeansReader代碼以下:<br />```java
public static class KmeansReader extends 
 GraphLoader<LongWritable, KmeansValue, NullWritable, NullWritable> {
 @Override
 public void load(
     LongWritable recordNum,
     WritableRecord record,
     MutationContext<LongWritable, KmeansValue, NullWritable, NullWritable> context)
     throws IOException {
   KmeansVertex v = new KmeansVertex();
   v.setId(recordNum);
   int n = record.size();
   DenseVector dv = new DenseVector(n);
   for (int i = 0; i < n; i++) {
     dv.set(i, ((DoubleWritable)record.get(i)).get());
   }
   v.setValue(new KmeansValue(dv));
   context.addVertexRequest(v);
 }
}

KmeansReader中,每讀入一行數據(一個Record)建立一個點,這裏用recordNum做爲點的ID,將record內容轉換成DenseVector對象並封裝進VertexValue中。

  1. Vertex部分
    自定義的KmeansVertex代碼以下。邏輯很是簡單,每輪迭代要作的事情就是將本身維護的樣本執行局部聚合。具體邏輯參見下面Aggregator的實現。
    `java
  2. static class KmeansVertex extends

Vertex {
@Override
public void compute(

ComputeContext<LongWritable, KmeansValue, NullWritable, NullWritable> context,
 Iterable<NullWritable> messages) throws IOException {

context.aggregate(getValue());
}
}

1. Aggregator部分<br />整個Kmeans的主要邏輯集中在Aggregator中。首先是自定義的KmeansAggrValue,用以維護要聚合及分發的內容。<br />```java
public static class KmeansAggrValue implements Writable {
 DenseMatrix centroids;
 DenseMatrix sums; // used to recalculate new centroids
 DenseVector counts; // used to recalculate new centroids
 @Override
 public void write(DataOutput out) throws IOException {
   wirteForDenseDenseMatrix(out, centroids);
   wirteForDenseDenseMatrix(out, sums);
   wirteForDenseVector(out, counts);
 }
 @Override
 public void readFields(DataInput in) throws IOException {
   centroids = readFieldsForDenseMatrix(in);
   sums = readFieldsForDenseMatrix(in);
   counts = readFieldsForDenseVector(in);
 }
}

KmeansAggrValue中維護了三個對象,其中centroids是當前的K箇中心點,假定樣本是m維的話,centroids就是一個K*m的矩陣。sums是和centroids大小同樣的矩陣,每一個元素記錄了到特定中心點最近的樣本特定維之和,例如sums(i,j)是到第i箇中心點最近的樣本的第j維度之和。
counts是個K維的向量,記錄到每一箇中心點距離最短的樣本個數。sums和counts一塊兒用以計算新的中心點,也是要聚合的主要內容。 接下來是自定義的Aggregator實現類KmeansAggregator,咱們按照上述API的順序逐個看其實現。
首先是createStartupValue()。
`java
public static class KmeansAggregator extends Aggregator {
public KmeansAggrValue createStartupValue(WorkerContext context) throws IOException {
KmeansAggrValue av = new KmeansAggrValue();
byte[] centers = context.readCacheFile("centers");
String lines[] = new String(centers).split("n");
int rows = lines.length;
int cols = lines[0].split(",").length; // assumption rows >= 1 
av.centroids = new DenseMatrix(rows, cols);
av.sums = new DenseMatrix(rows, cols);
av.sums.zero();
av.counts = new DenseVector(rows);
av.counts.zero();
for (int i = 0; i < lines.length; i++) {

String[] ss = lines[i].split(",");
 for (int j = 0; j < ss.length; j++) {
   av.centroids.set(i, j, Double.valueOf(ss[j]));
 }

}
return av;
}

咱們在該方法中初始化一個KmeansAggrValue對象,而後從資源文件centers中讀取初始中心點,並賦值給centroids。而sums和counts初始化爲0。<br />接來下是createInitialValue()的實現:<br />```java
@Override
 public void aggregate(KmeansAggrValue value, Object item)
     throws IOException {
   DenseVector sample = ((KmeansValue)item).sample;
   // find the nearest centroid
   int min = findNearestCentroid(value.centroids, sample);
   // update sum and count
   for (int i = 0; i < sample.size(); i ++) {
     value.sums.add(min, i, sample.get(i));
   }
   value.counts.add(min, 1.0d);
 }

該方法中調用findNearestCentroid()(實現見附件)找到樣本item歐拉距離最近的中心點索引,而後將其各個維度加到sums上,最後counts計數加1。
以上三個方法執行於全部worker上,實現局部聚合。接下來看下在AggregatorOwner所在Worker執行的全局聚合相關操做。
首先是merge的實現:
`java
@Override
public void merge(KmeansAggrValue value, KmeansAggrValue partial)

throws IOException {

value.sums.add(partial.sums);
value.counts.add(partial.counts);
}

merge的實現邏輯很簡單,就是把各個worker聚合出的sums和counts相加便可。<br />最後是terminate()的實現:<br />```java
@Override
 public boolean terminate(WorkerContext context, KmeansAggrValue value)
     throws IOException {
   // Calculate the new means to be the centroids (original sums)
   DenseMatrix newCentriods = calculateNewCentroids(value.sums, value.counts, value.centroids);
   // print old centroids and new centroids for debugging
   System.out.println("\nsuperstep: " + context.getSuperstep() + 
       "\nold centriod:\n" + value.centroids + " new centriod:\n" + newCentriods);
   boolean converged = isConverged(newCentriods, value.centroids, 0.05d);
   System.out.println("superstep: " + context.getSuperstep() + "/" 
       + (context.getMaxIteration() - 1) + " converged: " + converged);
   if (converged || context.getSuperstep() == context.getMaxIteration() - 1) {
     // converged or reach max iteration, output centriods
     for (int i = 0; i < newCentriods.numRows(); i++) {
       Writable[] centriod = new Writable[newCentriods.numColumns()];
       for (int j = 0; j < newCentriods.numColumns(); j++) {
         centriod[j] = new DoubleWritable(newCentriods.get(i, j));
       }
       context.write(centriod);
     }
     // true means to terminate iteration
     return true;
   }
   // update centriods
   value.centroids.set(newCentriods);
   // false means to continue iteration
   return false;
 }

teminate()中首先根據sums和counts調用calculateNewCentroids()求平均計算出新的中心點。而後調用isConverged()根據新老中心點歐拉距離判斷是否已經收斂。若是收斂或迭代次數達到最大數,則將新的中心點輸出並返回true,以結束迭代。不然更新中心點並返回false以繼續迭代。其中calculateNewCentroids()和isConverged()的實現見附件。

  1. main方法
    main方法用以構造GraphJob,而後設置相應配置,並提交做業。代碼以下:
    `java
  2. static void main(String[] args) throws IOException {

if (args.length < 2)
printUsage();
GraphJob job = new GraphJob();
job.setGraphLoaderClass(KmeansReader.class);
job.setRuntimePartitioning(false);
job.setVertexClass(KmeansVertex.class);
job.setAggregatorClass(KmeansAggregator.class);
job.addInput(TableInfo.builder().tableName(args[0]).build());
job.addOutput(TableInfo.builder().tableName(args[1]).build());
// default max iteration is 30
job.setMaxIteration(30);
if (args.length >= 3)
job.setMaxIteration(Integer.parseInt(args[2]));
long start = System.currentTimeMillis();
job.run();
System.out.println("Job Finished in "

+ (System.currentTimeMillis() - start) / 1000.0 + " seconds");

}

這裏須要注意的是job.setRuntimePartitioning(false),設置爲false後,各個worker加載的數據再也不根據Partitioner從新分區,即誰加載的數據誰維護。

<a name="a7d80080"></a>
# 功能介紹
<a name="1922c3a4"></a>
## 運行做業
MaxCompute 客戶端提供一個Jar命令用於運行 MaxCompute GRAPH做業,其使用方式與 [MapReduce](http://help.aliyun-inc.com/internaldoc/detail/27875.html)中的[Jar命令](http://help.aliyun-inc.com/internaldoc/detail/27878.html) 相同,這裏僅做簡要介紹:

Usage: jar [] [ARGS]

-conf <configuration_file>         Specify an application configuration file
-classpath <local_file_list>       classpaths used to run mainClass
-D <name>=<value>                  Property value pair, which will be used to run mainClass
-local                             Run job in local mode
-resources <resource_name_list>    file/table resources used in graph, seperate by comma
其中 < GENERIC_OPTIONS>包括(均爲可選參數):
* -conf <configuration file > :指定JobConf配置文件;
* -classpath <local_file_list > : 本地執行時的classpath,主要用於指定main函數所在的jar包。大多數狀況下,用戶更習慣於將main函數與Graph做業編寫在一個包中,例如:單源最短距離算法 ,所以,在執行示例程序時,-resources及-classpath的參數中都出現了用戶的jar包,但兩者意義不一樣,-resources引用的是Graph做業,運行於分佈式環境中,而-classpath引用的是main函數,運行於本地,指定的jar包路徑也是本地文件路徑。包名之間使用系統默認的文件分割符做分割(一般狀況下,windows系統是分號」;」,linux系統是冒號」:」);
* -D <prop_name > = < prop_value > : 本地執行時,<mainClass > 的java屬性,能夠定義多個;
* -local:以本地模式執行Graph做業,主要用於程序調試;
* -resources <resource_name_list > : Graph做業運行時使用的資源聲明。通常狀況下,resource_name_list中須要指定Graph做業所在的資源名稱。若是用戶在Graph做業中讀取了其餘ODPS資源,那麼,這些資源名稱也須要被添加到resource_name_list中。資源之間使用逗號分隔,使用跨項目空間使用資源時,須要前面加上:PROJECT_NAME/resources/,示例:-resources otherproject/resources/resfile;

同時,用戶也能夠直接運行GRAPH做業的main函數直接將做業提交到 MaxCompute ,而不是經過 MaxCompute 客戶端提交做業。以[PageRank算法](http://help.aliyun-inc.com/internaldoc/detail/27908.html) 爲例:

public static void main(String[] args) throws IOException {
if (args.length < 2)

printUsage();

GraphJob job = new GraphJob();
job.setGraphLoaderClass(PageRankVertexReader.class);
job.setVertexClass(PageRankVertex.class);
job.addInput(TableInfo.builder().tableName(args[0]).build());
job.addOutput(TableInfo.builder().tableName(args[1]).build());
// 將做業中使用的資源添加到cache resource,對應於jar命令中 -resources 和 -libjars 中指定的資源
job.addCacheResource("mapreduce-examples.jar");
// 將使用的jar及其餘文件添加到class cache resource,對應於jar命令中 -libjars 中指定的資源
job.addCacheResourceToClassPath("mapreduce-examples.jar");
// 設置console中,odps_config.ini對應的配置項,使用時替換爲本身的配置
OdpsConf.getInstance().setProjName("project_name");
OdpsConf.getInstance().setEndpoint("end_point");
OdpsConf.getInstance().setAccessId("access_id");
OdpsConf.getInstance().setAccessKey("access_key");
// default max iteration is 30
job.setMaxIteration(30);
if (args.length >= 3)

job.setMaxIteration(Integer.parseInt(args[2]));

long startTime = System.currentTimeMillis();
job.run();
System.out.println("Job Finished in "

+ (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");

}

<a name="6354d6d6"></a>
## []()輸入輸出
MaxCompute GRAPH做業的輸入輸出限制爲表,不容許用戶自定義輸入輸出格式。<br />定義做業輸入,支持多路輸入:

GraphJob job = new GraphJob();
job.addInput(TableInfo.builder().tableName(「tblname」).build()); //表做爲輸入
job.addInput(TableInfo.builder().tableName(「tblname」).partSpec("pt1=a/pt2=b").build()); //分區做爲輸入
//只讀取輸入表的 col2 和 col0 列,在 GraphLoader 的 load 方法中,record.get(0) 獲得的是col2列,順序一致
job.addInput(TableInfo.builder().tableName(「tblname」).partSpec("pt1=a/pt2=b").build(), new String[]{"col2", "col0"});
備註:

關於做業輸入定義,更多的信息參見GraphJob的addInput相關方法說明,框架讀取輸入表的記錄傳給用戶自定義的GraphLoader載入圖數據;
限制: 暫時不支持分區過濾條件。更多應用限制請參考 應用限制;
定義做業輸出,支持多路輸出,經過label標識每路輸出:

GraphJob job = new GraphJob();
//輸出表爲分區表時須要給到最末一級分區
job.addOutput(TableInfo.builder().tableName("table_name").partSpec("pt1=a/pt2=b").build());
// 下面的參數 true 表示覆蓋tableinfo指定的分區,即INSERT OVERWRITE語義,false表示INSERT INTO語義
job.addOutput(TableInfo.builder().tableName("table_name").partSpec("pt1=a/pt2=b").lable("output1").build(), true);

> 備註:
> * 關於做業輸出定義,更多的信息參見GraphJob的addOutput 相關方法說明;
* Graph做業在運行時能夠經過WorkerContext的write方法寫出記錄到輸出表,多路輸出須要指定標識,如上面的 「output1」;
* 更多應用限制請參考 [應用限制](http://help.aliyun-inc.com/internaldoc/detail/27905.html);

<a name="d41d8cd9"></a>
# 
<a name="bbfcdb67"></a>
## 讀取資源
<a name="24cb2794"></a>
### []()GRAPH程序中添加資源
除了經過jar命令指定GRAPH讀取的資源外,還能夠經過GraphJob的下面兩個方法指定:

void addCacheResources(String resourceNames)
void addCacheResourcesToClassPath(String resourceNames)

<a name="90d49894"></a>
### []()GRAPH程序中使用資源
在 GRAPH 程序中能夠經過相應的上下文對象WorkerContext的下述方法讀取資源:

public byte[] readCacheFile(String resourceName) throws IOException;
public Iterable readCacheArchive(String resourceName) throws IOException;
public Iterable readCacheArchive(String resourceName, String relativePath)throws IOException;
public Iterable readResourceTable(String resourceName);
public BufferedInputStream readCacheFileAsStream(String resourceName) throws IOException;
public Iterable readCacheArchiveAsStream(String resourceName) throws IOException;
public Iterable readCacheArchiveAsStream(String resourceName, String relativePath) throws IOException;

> 備註:
> * 一般在WorkerComputer的setup方法裏讀取資源,而後保存在Worker Value中,以後經過getWorkerValue方法取得;
* 建議用上面的流接口,邊讀邊處理,內存耗費少;
* 更多應用限制請參考 [應用限制](http://help.aliyun-inc.com/internaldoc/detail/27905.html);


<a name="5f839cd3"></a>
# SDK介紹
Graph SDK maven 配置:


com.aliyun.odps
odps-sdk-graph
0.20.7
sources

完整Java Doc文檔,請點擊 [這裏](http://odps.alibaba-inc.com/doc/prddoc/odps_sdk_v2/apidocs/index.html)

| 主要接口 | 說明 |
| :--- | :--- |
| GraphJob | GraphJob繼承自JobConf,用於定義、提交和管理一個 ODPS Graph 做業。 |
| Vertex | Vertex是圖的點的抽象,包含屬性:id,value,halted,edges,經過GraphJob的setVertexClass接口提供 Vertex 實現。 |
| Edge | Edge是圖的邊的抽象,包含屬性:destVertexId, value,圖數據結構採用鄰接表,點的出邊保存在點的 edges 中。 |
| GraphLoader | GraphLoader用於載入圖,經過 GraphJob 的 setGraphLoaderClass 接口提供 GraphLoader 實現。 |
| VertexResolver | VertexResolver用於自定義圖拓撲修改時的衝突處理邏輯,經過GraphJob的 setLoadingVertexResolverClass 和 setComputingVertexResolverClass 接口提供圖加載和迭代計算過程當中的圖拓撲修改的衝突處理邏輯。 |
| Partitioner | Partitioner 用於對圖進行劃分使得計算能夠分片進行,經過GraphJob的 setPartitionerClass 接口提供 Partitioner 實現,默認採用 HashPartitioner,即對點 ID 求哈希值而後對 Worker 數目取模。 |
| WorkerComputer | WorkerComputer容許在 Worker 開始和退出時執行用戶自定義的邏輯,經過GraphJob的 setWorkerComputerClass 接口提供WorkerComputer 實現。 |
| Aggregator | Aggregator 的 setAggregatorClass(Class ...) 定義一個或多個 Aggregator |
| Combiner | Combiner 的 setCombinerClass 設置 Combiner |
| Counters | 計數器,在做業運行邏輯中,能夠經過 WorkerContext 接口取得計數器並進行計數,框架會自動進行彙總 |
| WorkerContext | 上下文對象,封裝了框架的提供的功能,如修改圖拓撲結構,發送消息,寫結果,讀取資源等等 |

<a name="8e705b7e"></a>
# 開發和調試
ODPS沒有爲用戶提供Graph開發插件,但用戶仍然能夠基於Eclipse開發ODPS Graph程序,建議的開發流程是:
* 編寫Graph代碼,使用本地調試進行基本的測試;
* 進行集羣調試,驗證結果;
<a name="9973bec7"></a>
## 開發示例
本節以[SSSP](http://help.aliyun-inc.com/internaldoc/detail/27907.html) 算法爲例講述如何用Eclipse開發和調試Graph程序。<br />下面是開發SSSP的步驟:
1. 建立Java工程,例如:graph_examples;<br />
1. 將ODPS客戶端lib目錄下的jar包加到Eclipse工程的Build Path裏。一個配置好的Eclipse工程以下圖所示。<br />
1. 開發ODPS Graph程序,實際開發過程當中,經常會先拷貝一個例子(例如[SSSP](http://help.aliyun-inc.com/internaldoc/detail/27907.html)),而後再作修改。在本示例中,咱們僅修改了package路徑爲:package com.aliyun.odps.graph.example。
1. 編譯打包,在Eclipse環境中,右鍵點擊源代碼目錄(圖中的src目錄),Export -> Java -> JAR file 生成JAR包,選擇目標jar包的保存路徑,例如:D:\odps\clt\odps-graph-example-sssp.jar;
1. 使用ODPS客戶端運行SSSP,相關操做參考[快速開始之運行Graph](http://help.aliyun-inc.com/internaldoc/detail/27813.html)。

Eclipse 配置截圖: ![](https://intranetproxy.alipay.com/skylark/lark/0/2019/png/23934/1548150832059-15fe7b48-5b7f-45b9-b9fd-5d8dd2014732.png#align=left&display=inline&height=448&originHeight=770&originWidth=1281&size=0&width=746)
> 注意:
> * 相關的開發步驟請參考[Graph開發插件介紹](http://help.aliyun-inc.com/internaldoc/detail/27985.html).

<a name="8f6be038"></a>
## 本地調試
ODPS GRAPH支持本地調試模式,可使用Eclipse進行斷點調試。<br />斷點調試步驟以下:
* 下載一個odps-graph-local的maven包。
* 選擇Eclipse工程,右鍵點擊GRAPH做業主程序(包含main函數)文件,配置其運行參數(Run As -> Run Configurations…),以下圖。
* 在Arguments tab頁中,設置Program arguments 參數爲「1 sssp_in sssp_out」,做爲主程序的輸入參數;
* 在Arguments tab頁中,設置VM arguments參數爲:<br />-Dodps.runner.mode=local -Dodps.project.name=<project.name> -Dodps.end.point=<end.point> -Dodps.access.id=<access.id> -Dodps.access.key=<access.key>

![](https://intranetproxy.alipay.com/skylark/lark/0/2019/png/23934/1548150832059-b6f956c0-ecdc-4de9-9338-3ff9e9305261.png#align=left&display=inline&height=597&originHeight=640&originWidth=800&size=0&width=746)
* 對於本地模式(即odps.end.point參數不指定),須要在warehouse建立sssp_in,sssp_out表,爲輸入表 sssp_in 添加數據,輸入數據以下。關於warehouse的介紹請參考[MapReduce本地運行](http://help.aliyun-inc.com/internaldoc/detail/27882.html) 部分;

1,"2:2,3:1,4:4"
2,"1:2,3:2,4:1"
3,"1:1,2:2,5:1"
4,"1:4,2:1,5:1"
5,"3:1,4:1"

* 點擊Run按鈕便可本地跑SSSP;

其中:參數設置可參考ODPS客戶端中conf/odps_config.ini的設置,上述是幾個經常使用參數,其餘參數也說明以下:
* odps.runner.mode:取值爲local,本地調試功能必須指定;
* odps.project.name:指定當前project,必須指定;
* odps.end.point:指定當前odps服務的地址,能夠不指定,若是不指定,只從warehouse讀取表或資源的meta和數據,不存在則拋異常,若是指定,會先從warehouse讀取,不存在時會遠程鏈接odps讀取;
* odps.access.id:鏈接odps服務的id,只在指定odps.end.point時有效;
* odps.access.key:鏈接odps服務的key,只在指定odps.end.point時有效;
* odps.cache.resources:指定使用的資源列表,效果與jar命令的「-resources」相同;
* odps.local.warehouse: 本地warehouse路徑,不指定時默認爲./warehouse;

在 Eclipse 中本地跑 SSSP的調試輸出信息以下:

Counters: 3

com.aliyun.odps.graph.local.COUNTER
             TASK_INPUT_BYTE=211
             TASK_INPUT_RECORD=5
             TASK_OUTPUT_BYTE=161
             TASK_OUTPUT_RECORD=5

graph task finish

> 注意:在上面的示例中,須要本地warehouse下有sssp_in及sssp_out表。sssp_in及sssp_out的詳細信息請參考[快速開始之運行Graph](http://help.aliyun-inc.com/internaldoc/detail/27813.html)中的介紹。

<a name="035a3e86"></a>
## 本地做業臨時目錄
每運行一次本地調試,都會在 Eclipse 工程目錄下新建一個臨時目錄,見下圖:<br />![](https://intranetproxy.alipay.com/skylark/lark/0/2019/png/23934/1548150832063-a27eee32-76f2-496b-9e7c-4aa622ebf33d.png#align=left&display=inline&height=199&originHeight=199&originWidth=272&size=0&width=272)<br />一個本地運行的GRAPH做業臨時目錄包括如下幾個目錄和文件:
* counters - 存放做業運行的一些計數信息;
* inputs - 存放做業的輸入數據,優先取自本地的 warehouse,若是本地沒有,會經過 ODPS SDK 從服務端讀取(若是設置了 odps.end.point),默認一個 input 只讀10 條數據,能夠經過 -Dodps.mapred.local.record.limit 參數進行修改,可是也不能超過1萬條記錄;
* outputs - 存放做業的輸出數據,若是本地warehouse中存在輸出表,outputs裏的結果數據在做業執行完後會覆蓋本地warehouse中對應的表;
* resources - 存放做業使用的資源,與輸入相似,優先取自本地的warehouse,若是本地沒有,會經過ODPS SDK從服務端讀取(若是設置了 odps.end.point);
* job.xml - 做業配置
* superstep - 存放每一輪迭代的消息持久化信息。> 注意:
> * 若是須要本地調試時輸出詳細日誌,須要在 src 目錄下放一個 log4j 的配置文件:log4j.properties_odps_graph_cluster_debug。


<a name="f949cc7a"></a>
## 集羣調試
在經過本地的調試以後,能夠提交做業到集羣進行測試,一般步驟:
1. 配置ODPS客戶端;
1. 使用「add jar /path/work.jar -f;」命令更新jar包;
1. 使用jar命令運行做業,查看運行日誌和結果數據,以下所示;
> 注意:
> * 集羣運行Graph的詳細介紹能夠參考[快速開始之運行Graph](http://help.aliyun-inc.com/internaldoc/detail/27813.html)。

<a name="5865fdf5"></a>
## 性能調優
下面主要從 ODPS Graph 框架角度介紹常見性能優化的幾個方面:
<a name="5c70b720"></a>
## 做業參數配置
對性能有所影響的 GraphJob 配置項包括:
* setSplitSize(long) // 輸入表切分大小,單位MB,大於0,默認64;
* setNumWorkers(int) // 設置做業worker數量,範圍:[1, 1000], 默認值-1, worker數由做業輸入字節數和split size決定;
* setWorkerCPU(int) // Map CPU資源,100爲1cpu核,[50,800]之間,默認200;
* setWorkerMemory(int) // Map 內存資源,單位MB,[256M,12G]之間,默認4096M;
* setMaxIteration(int) // 設置最大迭代次數,默認 -1,小於或等於 0 時表示最大迭代次數不做爲做業終止條件;
* setJobPriority(int) // 設置做業優先級,範圍:[0, 9],默認9,數值越大優先級越小。

一般狀況下:
1. 能夠考慮使用setNumWorkers方法增長 worker 數目;
1. 能夠考慮使用setSplitSize方法減小切分大小,提升做業載入數據速度;
1. 加大 worker 的 cpu 或內存;
1. 設置最大迭代次數,有些應用若是結果精度要求不高,能夠考慮減小迭代次數,儘快結束;

接口 setNumWorkers 與 setSplitSize 配合使用,能夠提升數據的載入速度。假設 setNumWorkers 爲 workerNum, setSplitSize 爲 splitSize, 總輸入字節數爲 inputSize, 則輸入被切分後的塊數 splitNum = inputSize / splitSize,workerNum 和 splitNum 之間的關係:
1. 若 splitNum == workerNum,每一個 worker 負責載入一個 split;
1. 若 splitNum > workerNum,每一個 worker 負責載入一個或多個 split;
1. 若 splitNum < workerNum, 每一個 worker 負責載入零個或一個 split。

所以,應調節 workerNum 和 splitSize,在知足前兩種狀況時,數據載入比較快。迭代階段只調節 workerNum 便可。 若是設置 runtime partitioning 爲 false,則建議直接使用 setSplitSize 控制 worker 數量,或者保證知足前兩種狀況,在出現第三種狀況時,部分 worker 上點數會爲0. 能夠在 jar 命令前使用set odps.graph.split.size=<m>; set odps.graph.worker.num=<n>; 與 setNumWorkers 和 setSplitSize 等效。<br />另一種常見的性能問題:數據傾斜,反應到 Counters 就是某些 worker 處理的點或邊數量遠遠超過其餘 worker。<br />數據傾斜的緣由一般是某些 key 對應的點、邊,或者消息的數量遠遠超出其餘 key,這些 key 被分到少許的 worker 處理,從而致使這些 worker 相對於其餘運行時間長不少,解決方法:
* 能夠試試 Combiner,把這些 key 對應點的消息進行本地聚合,減小消息發生;
* 改進業務邏輯。
<a name="6bb8613c"></a>
## 運用Combiner
開發人員可定義 Combiner 來減小存儲消息的內存和網絡數據流量,縮短做業的執行時間。細節見 SDK中Combiner的介紹。
<a name="654376db"></a>
## 減小數據輸入量
數據量大時,讀取磁盤中的數據可能耗費一部分處理時間,所以,減小須要讀取的數據字節數能夠提升整體的吞吐量,從而提升做業性能。可供選擇的方法有以下幾種:
* 減小輸入數據量:對某些決策性質的應用,處理數據採樣後子集所獲得的結果只可能影響結果的精度,而並不會影響總體的準確性,所以能夠考慮先對數據進行特定採樣後再導入輸入表中進行處理
* 避免讀取用不到的字段:ODPS Graph 框架的 TableInfo 類支持讀取指定的列(以列名數組方式傳入),而非整個表或表分區,這樣也能夠減小輸入的數據量,提升做業性能
<a name="e3f29de3"></a>
## 內置jar包
下面這些 jar 包會默認加載到運行 GRAPH 程序的 JVM 中,用戶能夠沒必要上傳這些資源,也沒必要在命令行的 -libjars 帶上這些 jar 包:
* commons-codec-1.3.jar
* commons-io-2.0.1.jar
* commons-lang-2.5.jar
* commons-logging-1.0.4.jar
* commons-logging-api-1.0.4.jar
* guava-14.0.jar
* json.jar
* log4j-1.2.15.jar
* slf4j-api-1.4.3.jar
* slf4j-log4j12-1.4.3.jar
* xmlenc-0.52.jar
> 注意:
> * 在起 JVM 的CLASSPATH 裏,上述內置 jar 包會放在用戶 jar 包的前面,因此可能產生版本衝突,例如:用戶的程序中使用了 commons-codec-1.5.jar 某個類的函數,可是這個函數不在 commons-codec-1.3.jar 中,這時只能看 1.3 版本里是否有知足你需求的實現,或者等待ODPS升級新版本。


<a name="babfb10e"></a>
# 應用限制
* 單個job引用的resource數量不超過256個,table、archive按照一個單位計算;
* 單個job引用的resource總計字節數大小不超過512M;
* 單個job的輸入路數不能超過1024(輸入表的個數不能超過64),單個job的輸出路數不能超過256;
* 多路輸出中指定的label不能爲null或者爲空字符串,長度不能超過256,只能包括A-Z,a-z,0-9,_,#,.,-等;
* 單個job中自定義counter的數量不能超過64,counter的group name和counter name中不能帶有#,二者長度和不能超過100;
* 單個job的worker數由框架計算得出,最大爲 1000, 超過拋異常;
* 單個worker佔用cpu默認爲200,範圍[50, 800];
* 單個worker佔用memory默認爲4096,範圍[256M, 12G];
* 單個worker重複讀一個resource次數限制不大於64次;
* plit size默認爲64M,用戶可設置,範圍:0 < split_size <= (9223372036854775807 >> 20);
* ODPS Graph程序中的GraphLoader/Vertex/Aggregator等在集羣運行時,受到Java沙箱的限制(Graph做業的主程序則不受此限制),具體限制如 [Java沙箱](http://help.aliyun-inc.com/internaldoc/detail/34631.html) 所示。

<a name="3e49d1b2"></a>
# 示例程序
<a name="f56cb8a8"></a>
## 單源最短距離
Dijkstra 算法是求解有向圖中單源最短距離(Single Source Shortest Path,簡稱爲 SSSP)的經典算法。<br />最短距離:對一個有權重的有向圖 G=(V,E),從一個源點 s 到匯點 v 有不少路徑,其中邊權和最小的路徑,稱從 s 到 v 的最短距離。<br />算法基本原理,以下所示:
* 初始化:源點 s 到 s 自身的距離(d[s]=0),其餘點 u 到 s 的距離爲無窮(d[u]=∞)。<br />
* 迭代:若存在一條從 u 到 v 的邊,那麼從 s 到 v 的最短距離更新爲:d[v]=min(d[v], d[u]+weight(u, v)),直到全部的點到 s 的距離再也不發生變化時,迭代結束。<br />

由算法基本原理能夠看出,此算法很是適合使用 MaxCompute Graph 程序進行求解:每一個點維護到源點的當前最短距離值,當這個值變化時,將新值加上邊的權值發送消息通知其鄰接點,下一輪迭代時,鄰接點根據收到的消息更新其當前最短距離,當全部點當前最短距離再也不變化時,迭代結束。
<a name="b5ea48ff"></a>
### []()代碼示例
單源最短距離的代碼,以下所示:

import java.io.IOException;
import com.aliyun.odps.io.WritableRecord;
import com.aliyun.odps.graph.Combiner;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.Edge;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.data.TableInfo;
public class SSSP {
public static final String START_VERTEX = "sssp.start.vertex.id";
public static class SSSPVertex extends

Vertex<LongWritable, LongWritable, LongWritable, LongWritable> {
private static long startVertexId = -1;
public SSSPVertex() {
  this.setValue(new LongWritable(Long.MAX_VALUE));
}
public boolean isStartVertex(
    ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context) {
  if (startVertexId == -1) {
    String s = context.getConfiguration().get(START_VERTEX);
    startVertexId = Long.parseLong(s);
  }
  return getId().get() == startVertexId;
}
@Override
public void compute(
    ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,
    Iterable<LongWritable> messages) throws IOException {
  long minDist = isStartVertex(context) ? 0 : Integer.MAX_VALUE;
  for (LongWritable msg : messages) {
    if (msg.get() < minDist) {
      minDist = msg.get();
    }
  }
  if (minDist < this.getValue().get()) {
    this.setValue(new LongWritable(minDist));
    if (hasEdges()) {
      for (Edge<LongWritable, LongWritable> e : this.getEdges()) {
        context.sendMessage(e.getDestVertexId(), new LongWritable(minDist
            + e.getValue().get()));
      }
    }
  } else {
    voteToHalt();
  }
}
@Override
public void cleanup(
    WorkerContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
    throws IOException {
  context.write(getId(), getValue());
}

}
public static class MinLongCombiner extends

Combiner<LongWritable, LongWritable> {
@Override
public void combine(LongWritable vertexId, LongWritable combinedMessage,
    LongWritable messageToCombine) throws IOException {
  if (combinedMessage.get() > messageToCombine.get()) {
    combinedMessage.set(messageToCombine.get());
  }
}

}
public static class SSSPVertexReader extends

GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {
@Override
public void load(
    LongWritable recordNum,
    WritableRecord record,
    MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
    throws IOException {
  SSSPVertex vertex = new SSSPVertex();
  vertex.setId((LongWritable) record.get(0));
  String[] edges = record.get(1).toString().split(",");
  for (int i = 0; i < edges.length; i++) {
    String[] ss = edges[i].split(":");
    vertex.addEdge(new LongWritable(Long.parseLong(ss[0])),
        new LongWritable(Long.parseLong(ss[1])));
  }
  context.addVertexRequest(vertex);
}

}
public static void main(String[] args) throws IOException {

if (args.length < 2) {
  System.out.println("Usage: <startnode> <input> <output>");
  System.exit(-1);
}
GraphJob job = new GraphJob();
job.setGraphLoaderClass(SSSPVertexReader.class);
job.setVertexClass(SSSPVertex.class);
job.setCombinerClass(MinLongCombiner.class);
job.set(START_VERTEX, args[0]);
job.addInput(TableInfo.builder().tableName(args[1]).build());
job.addOutput(TableInfo.builder().tableName(args[2]).build());
long startTime = System.currentTimeMillis();
job.run();
System.out.println("Job Finished in "
    + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");

}
}

上述代碼,說明以下:
* 第 19 行:定義 SSSPVertex ,其中:
  * 點值表示該點到源點 startVertexId 的當前最短距離。<br />
  * compute() 方法使用迭代公式:d[v]=min(d[v], d[u]+weight(u, v)) 更新點值。<br />
  * cleanup() 方法把點及其到源點的最短距離寫到結果表中。<br />
* 第 58 行:當點值沒發生變化時,調用 voteToHalt() 告訴框架該點進入 halt 狀態,當全部點都進入 halt 狀態時,計算結束。<br />
* 第 70 行:定義 MinLongCombiner,對發送給同一個點的消息進行合併,優化性能,減小內存佔用。<br />
* 第 83 行:定義 SSSPVertexReader 類,加載圖,將表中每一條記錄解析爲一個點,記錄的第一列是點標識,第二列存儲該點起始的全部的邊集,內容如:2:2,3:1,4:4。<br />
* 第 106 行:主程序(main 函數),定義 GraphJob,指定 Vertex/GraphLoader/Combiner 等的實現,指定輸入輸出表。
<a name="PageRank"></a>
## PageRank
PageRank 算法是計算網頁排名的經典算法:輸入是一個有向圖 G,其中頂點表示網頁,若是存在網頁 A 到網頁 B 的連接,那麼存在鏈接 A 到 B 的邊。<br />算法基本原理,以下所示:
* 初始化:點值表示 PageRank 的 rank 值(double 類型),初始時,全部點取值爲 1/TotalNumVertices。<br />
* 迭代公式:PageRank(i)=0.15/TotalNumVertices+0.85*sum,其中 sum 爲全部指向 i 點的點(設爲 j) PageRank(j)/out_degree(j) 的累加值。<br />

由算法基本原理能夠看出,此算法很是適合使用 MaxCompute Graph 程序進行求解:每一個點 j 維護其 PageRank 值,每一輪迭代都將 PageRank(j)/out_degree(j) 發給其鄰接點(向其投票),下一輪迭代時,每一個點根據迭代公式從新計算 PageRank 取值。
<a name="b5ea48ff"></a>
### []()代碼示例

import java.io.IOException;
import org.apache.log4j.Logger;
import com.aliyun.odps.io.WritableRecord;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.NullWritable;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.io.Writable;
public class PageRank {
private final static Logger LOG = Logger.getLogger(PageRank.class);
public static class PageRankVertex extends

Vertex<Text, DoubleWritable, NullWritable, DoubleWritable> {
@Override
public void compute(
    ComputeContext<Text, DoubleWritable, NullWritable, DoubleWritable> context,
    Iterable<DoubleWritable> messages) throws IOException {
  if (context.getSuperstep() == 0) {
    setValue(new DoubleWritable(1.0 / context.getTotalNumVertices()));
  } else if (context.getSuperstep() >= 1) {
    double sum = 0;
    for (DoubleWritable msg : messages) {
      sum += msg.get();
    }
    DoubleWritable vertexValue = new DoubleWritable(
        (0.15f / context.getTotalNumVertices()) + 0.85f * sum);
    setValue(vertexValue);
  }
  if (hasEdges()) {
    context.sendMessageToNeighbors(this, new DoubleWritable(getValue()
        .get() / getEdges().size()));
  }
}
@Override
public void cleanup(
    WorkerContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
    throws IOException {
  context.write(getId(), getValue());
}

}
public static class PageRankVertexReader extends

GraphLoader<Text, DoubleWritable, NullWritable, DoubleWritable> {
@Override
public void load(
    LongWritable recordNum,
    WritableRecord record,
    MutationContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
    throws IOException {
  PageRankVertex vertex = new PageRankVertex();
  vertex.setValue(new DoubleWritable(0));
  vertex.setId((Text) record.get(0));
  System.out.println(record.get(0));
  for (int i = 1; i < record.size(); i++) {
    Writable edge = record.get(i);
    System.out.println(edge.toString());
    if (!(edge.equals(NullWritable.get()))) {
      vertex.addEdge(new Text(edge.toString()), NullWritable.get());
    }
  }
  LOG.info("vertex edgs size: "
      + (vertex.hasEdges() ? vertex.getEdges().size() : 0));
  context.addVertexRequest(vertex);
}

}
private static void printUsage() {

System.out.println("Usage: <in> <out> [Max iterations (default 30)]");
System.exit(-1);

}
public static void main(String[] args) throws IOException {

if (args.length < 2)
  printUsage();
GraphJob job = new GraphJob();
job.setGraphLoaderClass(PageRankVertexReader.class);
job.setVertexClass(PageRankVertex.class);
job.addInput(TableInfo.builder().tableName(args[0]).build());
job.addOutput(TableInfo.builder().tableName(args[1]).build());
// default max iteration is 30
job.setMaxIteration(30);
if (args.length >= 3)
  job.setMaxIteration(Integer.parseInt(args[2]));
long startTime = System.currentTimeMillis();
job.run();
System.out.println("Job Finished in "
    + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");

}
}

上述代碼,說明以下:
* 第 23 行:定義 PageRankVertex ,其中:
  * 點值表示該點(網頁)的當前 PageRank 取值。<br />
  * compute() 方法使用迭代公式:`PageRank(i)=0.15/TotalNumVertices+0.85*sum`更新點值。<br />
  * cleanup() 方法把點及其 PageRank 取值寫到結果表中。<br />
* 第 55 行:定義 PageRankVertexReader 類,加載圖,將表中每一條記錄解析爲一個點,記錄的第一列是起點,其餘列爲終點。<br />
* 第 88 行:主程序(main 函數),定義 GraphJob,指定 Vertex/GraphLoader 等的實現,以及最大迭代次數(默認 30),並指定輸入輸出表。
<a name="5732cb27"></a>
## K-均值聚類
k-均值聚類(Kmeans) 算法是很是基礎並大量使用的聚類算法。<br />算法基本原理:以空間中 k 個點爲中心進行聚類,對最靠近它們的點進行歸類。經過迭代的方法,逐次更新各聚類中心的值,直至獲得最好的聚類結果。<br />假設要把樣本集分爲 k 個類別,算法描述以下:
1. 適當選擇 k 個類的初始中心。<br />
1. 在第 i 次迭代中,對任意一個樣本,求其到 k 箇中心的距離,將該樣本歸到距離最短的中心所在的類。<br />
1. 利用均值等方法更新該類的中心值。<br />
1. 對於全部的 k 個聚類中心,若是利用上兩步的迭代法更新後,值保持不變或者小於某個閾值,則迭代結束,不然繼續迭代。<br />
<a name="b5ea48ff"></a>
### []()代碼示例
K-均值聚類算法的代碼,以下所示:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.log4j.Logger;
import com.aliyun.odps.io.WritableRecord;
import com.aliyun.odps.graph.Aggregator;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.NullWritable;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.io.Tuple;
import com.aliyun.odps.io.Writable;
public class Kmeans {
private final static Logger LOG = Logger.getLogger(Kmeans.class);
public static class KmeansVertex extends

Vertex<Text, Tuple, NullWritable, NullWritable> {
@Override
public void compute(
    ComputeContext<Text, Tuple, NullWritable, NullWritable> context,
    Iterable<NullWritable> messages) throws IOException {
  context.aggregate(getValue());
}

}
public static class KmeansVertexReader extends

GraphLoader<Text, Tuple, NullWritable, NullWritable> {
@Override
public void load(LongWritable recordNum, WritableRecord record,
    MutationContext<Text, Tuple, NullWritable, NullWritable> context)
    throws IOException {
  KmeansVertex vertex = new KmeansVertex();
  vertex.setId(new Text(String.valueOf(recordNum.get())));
  vertex.setValue(new Tuple(record.getAll()));
  context.addVertexRequest(vertex);
}

}
public static class KmeansAggrValue implements Writable {

Tuple centers = new Tuple();
Tuple sums = new Tuple();
Tuple counts = new Tuple();
@Override
public void write(DataOutput out) throws IOException {
  centers.write(out);
  sums.write(out);
  counts.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
  centers = new Tuple();
  centers.readFields(in);
  sums = new Tuple();
  sums.readFields(in);
  counts = new Tuple();
  counts.readFields(in);
}
@Override
public String toString() {
  return "centers " + centers.toString() + ", sums " + sums.toString()
      + ", counts " + counts.toString();
}

}
public static class KmeansAggregator extends Aggregator {

@SuppressWarnings("rawtypes")
@Override
public KmeansAggrValue createInitialValue(WorkerContext context)
    throws IOException {
  KmeansAggrValue aggrVal = null;
  if (context.getSuperstep() == 0) {
    aggrVal = new KmeansAggrValue();
    aggrVal.centers = new Tuple();
    aggrVal.sums = new Tuple();
    aggrVal.counts = new Tuple();
    byte[] centers = context.readCacheFile("centers");
    String lines[] = new String(centers).split("\n");
    for (int i = 0; i < lines.length; i++) {
      String[] ss = lines[i].split(",");
      Tuple center = new Tuple();
      Tuple sum = new Tuple();
      for (int j = 0; j < ss.length; ++j) {
        center.append(new DoubleWritable(Double.valueOf(ss[j].trim())));
        sum.append(new DoubleWritable(0.0));
      }
      LongWritable count = new LongWritable(0);
      aggrVal.sums.append(sum);
      aggrVal.counts.append(count);
      aggrVal.centers.append(center);
    }
  } else {
    aggrVal = (KmeansAggrValue) context.getLastAggregatedValue(0);
  }
  return aggrVal;
}
@Override
public void aggregate(KmeansAggrValue value, Object item) {
  int min = 0;
  double mindist = Double.MAX_VALUE;
  Tuple point = (Tuple) item;
  for (int i = 0; i < value.centers.size(); i++) {
    Tuple center = (Tuple) value.centers.get(i);
    // use Euclidean Distance, no need to calculate sqrt
    double dist = 0.0d;
    for (int j = 0; j < center.size(); j++) {
      double v = ((DoubleWritable) point.get(j)).get()
          - ((DoubleWritable) center.get(j)).get();
      dist += v * v;
    }
    if (dist < mindist) {
      mindist = dist;
      min = i;
    }
  }
  // update sum and count
  Tuple sum = (Tuple) value.sums.get(min);
  for (int i = 0; i < point.size(); i++) {
    DoubleWritable s = (DoubleWritable) sum.get(i);
    s.set(s.get() + ((DoubleWritable) point.get(i)).get());
  }
  LongWritable count = (LongWritable) value.counts.get(min);
  count.set(count.get() + 1);
}
@Override
public void merge(KmeansAggrValue value, KmeansAggrValue partial) {
  for (int i = 0; i < value.sums.size(); i++) {
    Tuple sum = (Tuple) value.sums.get(i);
    Tuple that = (Tuple) partial.sums.get(i);
    for (int j = 0; j < sum.size(); j++) {
      DoubleWritable s = (DoubleWritable) sum.get(j);
      s.set(s.get() + ((DoubleWritable) that.get(j)).get());
    }
  }
  for (int i = 0; i < value.counts.size(); i++) {
    LongWritable count = (LongWritable) value.counts.get(i);
    count.set(count.get() + ((LongWritable) partial.counts.get(i)).get());
  }
}
@SuppressWarnings("rawtypes")
@Override
public boolean terminate(WorkerContext context, KmeansAggrValue value)
    throws IOException {
  // compute new centers
  Tuple newCenters = new Tuple(value.sums.size());
  for (int i = 0; i < value.sums.size(); i++) {
    Tuple sum = (Tuple) value.sums.get(i);
    Tuple newCenter = new Tuple(sum.size());
    LongWritable c = (LongWritable) value.counts.get(i);
    for (int j = 0; j < sum.size(); j++) {
      DoubleWritable s = (DoubleWritable) sum.get(j);
      double val = s.get() / c.get();
      newCenter.set(j, new DoubleWritable(val));
      // reset sum for next iteration
      s.set(0.0d);
    }
    // reset count for next iteration
    c.set(0);
    newCenters.set(i, newCenter);
  }
  // update centers
  Tuple oldCenters = value.centers;
  value.centers = newCenters;
  LOG.info("old centers: " + oldCenters + ", new centers: " + newCenters);
  // compare new/old centers
  boolean converged = true;
  for (int i = 0; i < value.centers.size() && converged; i++) {
    Tuple oldCenter = (Tuple) oldCenters.get(i);
    Tuple newCenter = (Tuple) newCenters.get(i);
    double sum = 0.0d;
    for (int j = 0; j < newCenter.size(); j++) {
      double v = ((DoubleWritable) newCenter.get(j)).get()
          - ((DoubleWritable) oldCenter.get(j)).get();
      sum += v * v;
    }
    double dist = Math.sqrt(sum);
    LOG.info("old center: " + oldCenter + ", new center: " + newCenter
        + ", dist: " + dist);
    // converge threshold for each center: 0.05
    converged = dist < 0.05d;
  }
  if (converged || context.getSuperstep() == context.getMaxIteration() - 1) {
    // converged or reach max iteration, output centers
    for (int i = 0; i < value.centers.size(); i++) {
      context.write(((Tuple) value.centers.get(i)).toArray());
    }
    // true means to terminate iteration
    return true;
  }
  // false means to continue iteration
  return false;
}

}
private static void printUsage() {

System.out.println("Usage: <in> <out> [Max iterations (default 30)]");
System.exit(-1);

}
public static void main(String[] args) throws IOException {

if (args.length < 2)
  printUsage();
GraphJob job = new GraphJob();
job.setGraphLoaderClass(KmeansVertexReader.class);
job.setRuntimePartitioning(false);
job.setVertexClass(KmeansVertex.class);
job.setAggregatorClass(KmeansAggregator.class);
job.addInput(TableInfo.builder().tableName(args[0]).build());
job.addOutput(TableInfo.builder().tableName(args[1]).build());
// default max iteration is 30
job.setMaxIteration(30);
if (args.length >= 3)
  job.setMaxIteration(Integer.parseInt(args[2]));
long start = System.currentTimeMillis();
job.run();
System.out.println("Job Finished in "
    + (System.currentTimeMillis() - start) / 1000.0 + " seconds");

}
}

<br />上述代碼,說明以下:
* 第 26 行:定義 KmeansVertex,compute() 方法很是簡單,只是調用上下文對象的 aggregate 方法,傳入當前點的取值(Tuple 類型,向量表示)。<br />
* 第 38 行:定義 KmeansVertexReader 類,加載圖,將表中每一條記錄解析爲一個點,點標識可有可無,這裏取傳入的 recordNum 序號做爲標識,點值爲記錄的全部列組成的 Tuple。<br />
* 第 83 行:定義 KmeansAggregator,這個類封裝了 Kmeans 算法的主要邏輯,其中:
  * createInitialValue 爲每一輪迭代建立初始值(k 類中心點),如果第一輪迭代(superstep=0),該取值爲初始中心點,不然取值爲上一輪結束時的新中心點。<br />
  * aggregate 方法爲每一個點計算其到各個類中心的距離,並歸爲距離最短的類,並更新該類的 sum 和 count。<br />
  * merge 方法合併來自各個 worker 收集的 sum 和 count。<br />
  * terminate 方法根據各個類的 sum 和 count 計算新的中心點,若新中心點與以前的中心點距離小於某個閾值或者迭代次數到達最大迭代次數設置,則終止迭代(返回 false),寫最終的中心點到結果表。<br />
* 第 236 行:主程序(main 函數),定義 GraphJob,指定 Vertex/GraphLoader/Aggregator 等的實現,以及最大迭代次數(默認 30),並指定輸入輸出表。<br />
* 第 243 行:job.setRuntimePartitioning(false),對於 Kmeans 算法,加載圖是不須要進行點的分發,設置 RuntimePartitioning 爲 false,以提高加載圖時的性能。
<a name="BiPartiteMatchiing"></a>
## BiPartiteMatchiing
二分圖是指圖的全部頂點可分爲兩個集合,每條邊對應的兩個頂點分別屬於這兩個集合。對於一個二分圖 G,M 是它的一個子圖,若是 M 的邊集中任意兩條邊都不依附於同一個頂點,則稱 M 爲一個匹配。二分圖匹配經常使用於有明確供需關係場景(如交友網站等)下的信息匹配行爲。<br />算法描述,以下所示:
* 從左邊第 1 個頂點開始,挑選未匹配點進行搜索,尋找增廣路。<br />
* 若是通過一個未匹配點,說明尋找成功。<br />
* 更新路徑信息,匹配邊數 +1,中止搜索。<br />
* 若是一直沒有找到增廣路,則再也不從這個點開始搜索。<br />
<a name="b5ea48ff"></a>
### []()代碼示例
BiPartiteMatchiing 算法的代碼,以下所示:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Random;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.NullWritable;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.io.WritableRecord;
public class BipartiteMatching {
private static final Text UNMATCHED = new Text("UNMATCHED");
public static class TextPair implements Writable {

public Text first;
public Text second;
public TextPair() {
  first = new Text();
  second = new Text();
}
public TextPair(Text first, Text second) {
  this.first = new Text(first);
  this.second = new Text(second);
}
@Override
public void write(DataOutput out) throws IOException {
  first.write(out);
  second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
  first = new Text();
  first.readFields(in);
  second = new Text();
  second.readFields(in);
}
@Override
public String toString() {
  return first + ": " + second;
}

}
public static class BipartiteMatchingVertexReader extends

GraphLoader<Text, TextPair, NullWritable, Text> {
@Override
public void load(LongWritable recordNum, WritableRecord record,
    MutationContext<Text, TextPair, NullWritable, Text> context)
    throws IOException {
  BipartiteMatchingVertex vertex = new BipartiteMatchingVertex();
  vertex.setId((Text) record.get(0));
  vertex.setValue(new TextPair(UNMATCHED, (Text) record.get(1)));
  String[] adjs = record.get(2).toString().split(",");
  for (String adj : adjs) {
    vertex.addEdge(new Text(adj), null);
  }
  context.addVertexRequest(vertex);
}

}
public static class BipartiteMatchingVertex extends

Vertex<Text, TextPair, NullWritable, Text> {
private static final Text LEFT = new Text("LEFT");
private static final Text RIGHT = new Text("RIGHT");
private static Random rand = new Random();
@Override
public void compute(
    ComputeContext<Text, TextPair, NullWritable, Text> context,
    Iterable<Text> messages) throws IOException {
  if (isMatched()) {
    voteToHalt();
    return;
  }
  switch ((int) context.getSuperstep() % 4) {
  case 0:
    if (isLeft()) {
      context.sendMessageToNeighbors(this, getId());
    }
    break;
  case 1:
    if (isRight()) {
      Text luckyLeft = null;
      for (Text message : messages) {
        if (luckyLeft == null) {
          luckyLeft = new Text(message);
        } else {
          if (rand.nextInt(1) == 0) {
            luckyLeft.set(message);
          }
        }
      }
      if (luckyLeft != null) {
        context.sendMessage(luckyLeft, getId());
      }
    }
    break;
  case 2:
    if (isLeft()) {
      Text luckyRight = null;
      for (Text msg : messages) {
        if (luckyRight == null) {
          luckyRight = new Text(msg);
        } else {
          if (rand.nextInt(1) == 0) {
            luckyRight.set(msg);
          }
        }
      }
      if (luckyRight != null) {
        setMatchVertex(luckyRight);
        context.sendMessage(luckyRight, getId());
      }
    }
    break;
  case 3:
    if (isRight()) {
      for (Text msg : messages) {
        setMatchVertex(msg);
      }
    }
    break;
  }
}
@Override
public void cleanup(
    WorkerContext<Text, TextPair, NullWritable, Text> context)
    throws IOException {
  context.write(getId(), getValue().first);
}
private boolean isMatched() {
  return !getValue().first.equals(UNMATCHED);
}
private boolean isLeft() {
  return getValue().second.equals(LEFT);
}
private boolean isRight() {
  return getValue().second.equals(RIGHT);
}
private void setMatchVertex(Text matchVertex) {
  getValue().first.set(matchVertex);
}

}
private static void printUsage() {

System.err.println("BipartiteMatching <input> <output> [maxIteration]");

}
public static void main(String[] args) throws IOException {

if (args.length < 2) {
  printUsage();
}
GraphJob job = new GraphJob();
job.setGraphLoaderClass(BipartiteMatchingVertexReader.class);
job.setVertexClass(BipartiteMatchingVertex.class);
job.addInput(TableInfo.builder().tableName(args[0]).build());
job.addOutput(TableInfo.builder().tableName(args[1]).build());
int maxIteration = 30;
if (args.length > 2) {
  maxIteration = Integer.parseInt(args[2]);
}
job.setMaxIteration(maxIteration);
job.run();

}
}

<a name="d41d8cd9"></a>
##


原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索