MaxCompute 圖計算開發指南

快速入門step by step

MaxCompute Studio

建立完成 MaxCompute Java Module後,便可以開始開發Graph了。html

代碼示例

在examples目錄下有graph的一些代碼示例,可參考示例熟悉Graph程序的結構。java

編寫Graph

  1. 在module的源碼目錄即src>main >javanewMaxCompute Javanode

  2. 選擇GraphLoader/Vertex等類型,NameOK**,模板會自動填充框架代碼,可在此基礎上繼續修改。算法

本地調試Graph

Graph開發好後,下一步就是要測試本身的代碼,看是否符合預期。咱們支持本地運行Graph,具體的:sql

  1. 運行Graph: 在驅動類(有main函數且調用GraphJob.run方法)上右鍵,點擊運行run configuration**對話框,配置Graph須要在哪一個MaxCompute Project上運行便可。

  1. 點擊OK,若是指定MaxCompute project的表數據未被下載到warehouse中,則首先下載數據;若是採用mock項目或已被下載則跳過。接下來,graph local run框架會讀取warehouse中指定表的數據做爲輸入,開始本地運行Graph,用戶能夠在控制檯看到日誌輸出。每運行一次本地調試,都會在Intellij工程目錄下新建一個臨時目錄,見下圖:

**說明** 關於warehouse的詳細介紹請參考[開發UDF](https://help.aliyun.com/document_detail/50902.html#warehous)中本地warehouse目錄部分。

生產運行Graph

本地調試經過後,接下來就能夠把Graph發佈到服務端,在MaxCompute分佈式環境下運行了:編程

  1. 首先,將本身的Graph程序打成jar包,併發布到服務端。如何打包發佈?api

  2. 經過Studio無縫集成的MaxCompute Console(在Project ExplorerOpen in Console**),在Console命令行中輸入相似以下的 jar命令試用數組

  3. -libjars xxx.jar -classpath /Users/home/xxx.jar com.aliyun.odps.graph.examples.PageRank pagerank_in pagerank_out;網絡

更詳細的Graph開發介紹請參見[編寫Graph](https://help.aliyun.com/document_detail/27813.html#concept-gzg-1c2-vdb)。
<a name="Eclipse"></a>
## Eclipse
建立MaxCompute項目後,用戶能夠編寫本身的Graph程序,參照下文步驟操做完成本地調試。<br />在此示例中,咱們選用插件提供的 PageRank.java來完成本地調試工做。選中 **examples**下的 PageRank.java文件,以下圖。 <br />

[![(https://intranetproxy.alipay.com/skylark/lark/0/2019/png/23934/1548230797155-ae6cada9-4f2c-468e-af69-5eb9408189ee.png#align=left&display=inline&height=292&originHeight=444&originWidth=1136&size=0&status=done&width=746)](http://static-aliyun-doc.oss-cn-hangzhou.aliyuncs.com/assets/img/12154/15450228643218_zh-CN.png)<br /><br />右鍵單擊,選擇 ****Debug As** >ODPS MapReduce|Graph****,以下圖。 <br />[![](https://intranetproxy.alipay.com/skylark/lark/0/2019/png/23934/1548230797213-b9d3485d-946a-4677-9d77-491eac02f672.png#align=left&display=inline&height=447&originHeight=862&originWidth=1440&size=0&status=done&width=746)](http://static-aliyun-doc.oss-cn-hangzhou.aliyuncs.com/assets/img/12154/15450228643220_zh-CN.png)<br /><br />單擊後出現對話框,做以下配置。 <br />[![](https://intranetproxy.alipay.com/skylark/lark/0/2019/png/23934/1548230797159-0fbb12c2-6831-426e-8708-75177c738d02.png#align=left&display=inline&height=617&originHeight=617&originWidth=526&size=0&status=done&width=526)](http://static-aliyun-doc.oss-cn-hangzhou.aliyuncs.com/assets/img/12154/15450228643221_zh-CN.png) <br /><br />查看做業運行結果,以下圖。 <br />[![](https://intranetproxy.alipay.com/skylark/lark/0/2019/png/23934/1548230797170-91a5ad24-952b-451d-af3a-eae27cac38ad.png#align=left&display=inline&height=331&originHeight=351&originWidth=792&size=0&status=done&width=746)](http://static-aliyun-doc.oss-cn-hangzhou.aliyuncs.com/assets/img/12154/15450228643222_zh-CN.png)<br /><br />能夠查看在本地的計算結果,以下圖。 <br />[![](https://intranetproxy.alipay.com/skylark/lark/0/2019/png/23934/1548230797179-6170f594-a88a-48fe-9d2d-b9d891a97ca8.png#align=left&display=inline&height=539&originHeight=836&originWidth=1157&size=0&status=done&width=746)](http://static-aliyun-doc.oss-cn-hangzhou.aliyuncs.com/assets/img/12154/15450228643223_zh-CN.png)<br /><br />調試經過後,用戶能夠將程序打包,並以Jar資源的形式上傳到MaxCompute,並提交Graph做業。

<a name="b54d1384"></a>
# MaxCompute Graph的最佳實踐
<a name="cc20fa1e"></a>
## 基於MaxCompute Graph實現用戶聚類
<a name="95da745d"></a>
### 場景說明
在商品品牌預測中,提供了一份用戶行爲數據,以下:

| 字段 | 字段說明 | 提取說明 |
| --- | --- | --- |
|  user_id | 用戶標識 | 抽樣&字段加密 |
| brand_id | 品牌ID | 抽了樣&字段加密 |
| type | 用戶對品牌的行爲類型 | 點擊:0<br />購買:1<br />收藏:2<br />加入購物車:3 |
| visit_datetime | 行爲時間 | 格式某月某日,如7月6日, 隱藏年份 |

假設需求是但願基於用戶的購買行爲對用戶聚類。當用戶瀏覽時,能夠給TA推薦同一個聚類(興趣度相近)的其餘用戶購買了什麼。
<a name="094c47ac"></a>
### [](https://www.atatech.org/articles/32335#1)問題分析
在推薦領域,該問題屬於基於用戶的協同過濾範疇,它主要包括兩個步驟:一是找到和目標用戶興趣度相近的用戶集合;二是給目標用戶推薦該集合中其餘用戶感興趣(而目標用戶沒聽過)的item。<br />對用戶聚類即構建興趣度相近的用戶集合,常見的一種方式是經過Kmeans算法來實現。假定要把樣本劃分爲k個類別,Kmeans算法的計算過程以下:
* 選擇k個初始中心節點;
* 在每次迭代中,對每一個樣本,計算其到中心節點的距離;
* 更新中心節點
* 若是中心節點不變(或小於閾值),迭代結束;不然繼續步驟2)、3)迭代

Kmeans算法的優點在於簡潔快速,其關鍵在於初始中心節點的選擇和距離公式。<br />在這個示例中,首先應該對數據進行預處理,構造用戶的特徵向量。出於簡單,這裏選擇10個最hot的品牌(構造次數最多),基於用戶對這10個品牌的購買次數,構造特徵以下:<br />user_id, cnt1, …, cnt10,其中cnt表示對應品牌的購買次數。<br />而後經過Graph編程框架,經過KMeans算法實現聚類。
<a name="9195cc17"></a>
### [](https://www.atatech.org/articles/32335#2)數據準備
原始數據表爲tmall_user_brand,數據準備主要包括生成特徵和選擇初始節點。
<a name="1c72fb00"></a>
### [](https://www.atatech.org/articles/32335#3)生成特徵
生成特徵包括以下步驟:
1\. 選擇top 10 brands,生成表b
1\. 統計用戶購買每一個品牌的次數,生成表t
1\. 對錶b和t進行聯接,統計用戶購買top 10品牌的次數,生成表ub

假設ub表數據以下:

user_id brand_id count rank
a b1 5 1
a b3 2 3
a b4 3 4
b b3 1 3
b b7 9 7併發

<br />須要生成的特徵表以下<br /><br />

user_id, cnt1, … , cnt10
a 5 0 2 3 0 0 0 0 0 0
b 0 0 1 0 0 0 9 0 0 0

<br />這裏爲了代碼簡短,經過SQL來「補」數據,經過sum(case when…)方式實現。<br />完整的SQL語句以下:<br />

create table t_user_feature as
select

user_id,
sum(case when rank=1 then cnt else 0 end) as cnt1,
sum(case when rank=2 then cnt else 0 end) as cnt2,
sum(case when rank=3 then cnt else 0 end) as cnt3,
sum(case when rank=4 then cnt else 0 end) as cnt4,
sum(case when rank=5 then cnt else 0 end) as cnt5,
sum(case when rank=6 then cnt else 0 end) as cnt6,
sum(case when rank=7 then cnt else 0 end) as cnt7,
sum(case when rank=8 then cnt else 0 end) as cnt8,
sum(case when rank=9 then cnt else 0 end) as cnt9,
sum(case when rank=10 then cnt else 0 end) as cnt10

from(

select /*+ MAPJOIN(b) */
    t.user_id, t.brand_id, t.cnt, b.rank
from(
    select user_id, brand_id, count(*) as cnt 
    from tmall_user_brand
    where type='1'
    group by user_id, brand_id
)t  
join(
    select brand_id, rank
    from(
        select brand_id,
            row_number() over (partition by 1 order by buy_cnt desc) as rank
        from(
            select brand_id, count(*) as buy_cnt
            from tmall_user_brand
            where type='1'
            group by brand_id
        )t1 
    )t2 
    where t2.rank <=10
)b  
on t.brand_id = b.brand_id

)ub
group by user_id;
alter table t_user_feature set lifecycle 7;

<a name="b28d718c"></a>
### 選擇初始節點
對於Kmeans算法,初始節點的選取對聚類結果很重要,有不少paper研究如何選擇初始節點。這裏出於簡單,直接隨機選取3個節點,SQL以下:

drop table if exists t_kmeans_seed;
create table t_kmeans_seed as
select user_id,

cnt1,cnt2,cnt3,cnt4,cnt5,cnt6,cnt7,cnt8,cnt9,cnt10

from(

select
    user_id,
    cnt1,cnt2,cnt3,cnt4,cnt5,cnt6,cnt7,cnt8,cnt9,cnt10,
    cluster_sample(3) over (partition by 1) as flag
from t_user_feature

)t1
where flag = true;
alter table t_kmeans_seed set lifecycle 7;

<a name="cecbfece"></a>
### [實現Kmeans聚類](https://www.atatech.org/articles/32335#5)
這裏咱們基於在線手冊Graph示例程序的「k-均值聚類算法」來實現。代碼以下:

package example.demo;
public class KmeansDemo {
private final static Logger LOG = Logger.getLogger(KmeansDemo.class);
private static String RESOURCE_TABLE;
public static class KmeansVertex extends

Vertex<Text, Tuple, NullWritable, NullWritable> {
@Override
public void compute(
    ComputeContext<Text, Tuple, NullWritable, NullWritable> context,
    Iterable<NullWritable> messages) throws IOException {
  context.aggregate(this.getValue());
}

}
public static class KmeansVertexReader extends

GraphLoader<Text, Tuple, NullWritable, NullWritable> {
@Override
public void load(LongWritable recordNum, WritableRecord record,
    MutationContext<Text, Tuple, NullWritable, NullWritable> context)
    throws IOException {
  Tuple val = new Tuple();
  for(int i=1; i<record.size(); ++i) {
    val.append(record.get(i));
  }
  KmeansVertex vertex = new KmeansVertex();
  vertex.setId(new Text(String.valueOf(record.get(0))));
  vertex.setValue(val);
  context.addVertexRequest(vertex);
}

}
public static class KmeansAggrValue implements Writable {

Tuple centers = new Tuple();
Tuple sums = new Tuple();
Tuple counts = new Tuple();
@Override
public void write(DataOutput out) throws IOException {
  centers.write(out);
  sums.write(out);
  counts.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
  centers = new Tuple();
  centers.readFields(in);
  sums = new Tuple();
  sums.readFields(in);
  counts = new Tuple();
  counts.readFields(in);
}
@Override
public String toString() {
  return "centers " + centers.toString() + ", sums " + sums.toString()
      + ", counts " + counts.toString();
}

}
public static class KmeansAggregator extends Aggregator {

@Override
public KmeansAggrValue createStartupValue(WorkerContext context)
        throws IOException {
  KmeansAggrValue aggrVal = null;
  aggrVal = new KmeansAggrValue();
  aggrVal.centers = new Tuple();
  aggrVal.sums = new Tuple();
  aggrVal.counts = new Tuple();
  RESOURCE_TABLE = context.getConfiguration().get("RESOURCE_TABLE");
  Iterable<WritableRecord> iter = context.readResourceTable(RESOURCE_TABLE);
  for(WritableRecord record : iter) {
    Tuple center = new Tuple();
    Tuple sum = new Tuple();
    for (int i = 1; i < record.size(); ++i) {
      center.append(record.get(i));
      sum.append(new LongWritable(0L));
    }
    LongWritable count = new LongWritable(0L);
    aggrVal.sums.append(sum);
    aggrVal.counts.append(count);
    aggrVal.centers.append(center);
  }
  return aggrVal;
}
@Override
public KmeansAggrValue createInitialValue(WorkerContext context)
    throws IOException {
  return (KmeansAggrValue) context.getLastAggregatedValue(0);
}
@Override
public void aggregate(KmeansAggrValue value, Object item) {
  int min = 0;
  long mindist = Long.MAX_VALUE;
  Tuple point = (Tuple) item;
  for (int i = 0; i < value.centers.size(); i++) {
    Tuple center = (Tuple) value.centers.get(i);
    // use Euclidean Distance, no need to calculate sqrt
    long dist = 0L;
    for (int j = 0; j < center.size(); j++) {
      long v = ((LongWritable) point.get(j)).get()
          - ((LongWritable) center.get(j)).get();
      dist += v * v;
    }
    if (dist < mindist) {
      mindist = dist;
      min = i;
    }
  }
  // update sum and count
  Tuple sum = (Tuple) value.sums.get(min);
  for (int i = 0; i < point.size(); i++) {
    LongWritable s = (LongWritable) sum.get(i);
    s.set(s.get() + ((LongWritable) point.get(i)).get());
  }
  LongWritable count = (LongWritable) value.counts.get(min);
  count.set(count.get() + 1L);
}
@Override
public void merge(KmeansAggrValue value, KmeansAggrValue partial) {
  for (int i = 0; i < value.sums.size(); i++) {
    Tuple sum = (Tuple) value.sums.get(i);
    Tuple that = (Tuple) partial.sums.get(i);
    for (int j = 0; j < sum.size(); j++) {
      LongWritable s = (LongWritable) sum.get(j);
      s.set(s.get() + ((LongWritable) that.get(j)).get());
    }
  }
  for (int i = 0; i < value.counts.size(); i++) {
    LongWritable count = (LongWritable) value.counts.get(i);
    count.set(count.get() + ((LongWritable) partial.counts.get(i)).get());
  }
}
@SuppressWarnings("rawtypes")
@Override
public boolean terminate(WorkerContext context, KmeansAggrValue value)
    throws IOException {
  // compute new centers
  Tuple newCenters = new Tuple(value.sums.size());
  for (int i = 0; i < value.sums.size(); i++) {
    Tuple sum = (Tuple) value.sums.get(i);
    Tuple newCenter = new Tuple(sum.size());
    LongWritable c = (LongWritable) value.counts.get(i);
    if(c.equals(0L)) {
      continue;
    }
    for (int j = 0; j < sum.size(); j++) {
      LongWritable s = (LongWritable) sum.get(j);
      newCenter.set(j, new LongWritable(new Double((double)s.get()/ c.get()+0.5).longValue()));
      // reset sum for next iteration
      s.set(0L);
    }
    // reset count for next iteration
    c.set(0L);
    newCenters.set(i, newCenter);
  }
  // update centers
  Tuple oldCenters = value.centers;
  value.centers = newCenters;
  LOG.info("old centers: " + oldCenters + ", new centers: " + newCenters);
  // compare new/old centers
  boolean converged = true;
  for (int i = 0; i < value.centers.size() && converged; i++) {
    Tuple oldCenter = (Tuple) oldCenters.get(i);
    Tuple newCenter = (Tuple) newCenters.get(i);
    long sum = 0L;
    for (int j = 0; j < newCenter.size(); j++) {
      long v = ((LongWritable) newCenter.get(j)).get()
          - ((LongWritable) oldCenter.get(j)).get();
      sum += v * v;
    }
    double dist = Math.sqrt(sum);
    LOG.info("old center: " + oldCenter + ", new center: " + newCenter
        + ", dist: " + dist);
    // converge threshold for each center: 0.05
    converged = dist < 0.05d;
  }
  if (converged || context.getSuperstep() == context.getMaxIteration() - 1) {
    // converged or reach max iteration, output centers
    for (int i = 0; i < value.centers.size(); i++) {
      context.write(((Tuple) value.centers.get(i)).toArray());
    }
    // true means to terminate iteration
    return true;
  }
  // false means to continue iteration
  return false;
}

}
private static void printUsage() {

System.out.println("Usage: <in> <out> <resource> [Max iterations (default 30)]");
System.exit(-1);

}
public static void main(String[] args) throws IOException {

if (args.length < 3)
  printUsage();
GraphJob job = new GraphJob();
job.setGraphLoaderClass(KmeansVertexReader.class);
job.setRuntimePartitioning(false);
job.setVertexClass(KmeansVertex.class);
job.setAggregatorClass(KmeansAggregator.class);
job.addInput(TableInfo.builder().tableName(args[0]).build());
job.addOutput(TableInfo.builder().tableName(args[1]).build());
job.set("RESOURCE_TABLE", args[2]);
// default max iteration is 30
job.setMaxIteration(30);
if (args.length >= 4)
  job.setMaxIteration(Integer.parseInt(args[3]));
long start = System.currentTimeMillis();
job.run();
System.out.println("Job Finished in "
    + (System.currentTimeMillis() - start) / 1000.0 + " seconds");

}
}

<br />和MapReduce編程框架相似,在main函數,先實例化一個GraphJob,對job設置後,經過job.run()提交。<br />KmeansVertexReader類實現加載圖,定義圖節點。因爲kmeans算法是計算節點距離,所以不須要定義邊;此外它須要對迭代結果進行彙總,因此經過KmeansAggregator繼承Aggregator,實現每一步迭代計算。<br /><br />
<a name="f1e68d34"></a>
### [](https://www.atatech.org/articles/32335#6)運行和輸出
準備結果表SQL以下:

create table t_kmeans_result(

cnt1 bigint,
cnt2 bigint,
cnt3 bigint,
cnt4 bigint,
cnt5 bigint,
cnt6 bigint,
cnt7 bigint,
cnt8 bigint,
cnt9 bigint,
cnt10 bigint) lifecycle 7;
<br />在console中執行以下命令:<br /><br />

add jar /home/admin/duckrun/dev/open_graph_example/target/open_graph_example-0.1.jar -f;
add table t_kmeans_seed -f;
jar -resources open_graph_example-0.1.jar,t_kmeans_seed -classpath /home/admin/duckrun/dev/open_graph_examp

<a name="d41d8cd9"></a>
## 
<a name="388ac3a4"></a>
## 基於MaxCompute Graph實現並行化層次聚類
<a name="8e1b944f"></a>
### 背景
圖聚類是常見的一種聚類場景。和基於向量的聚類不一樣,圖的每一個節點只和有限個節點有距離,沒法定義任意兩點之間的距離。所以,像k-means這類常規方法就不適合圖聚類。本文要介紹的是用層次聚類(hierarchical clustering)的方法作圖聚類,其中爲簡單起見,圖是無向的。
<a name="90179ba4"></a>
### [](https://www.atatech.org/articles/25067#1)聚類過程
標準的自底向上的層次聚類過程是這樣的:每次選取距離最小的兩個點merge,直到最後只剩一個點(包含全部的原始點)爲止。聚類過程涉及到點和點,以及簇和簇之間距離計算的不一樣方法;具體的能夠參考[維基百科的解釋](http://en.wikipedia.org/wiki/Hierarchical_clustering)。<br />基於無向圖的層次聚類和標準層次聚類是相似的,用邊的權值來度量節點之間的距離,同時更新合併節點的鄰居節點之間的邊。用僞代碼描述過程以下:

圖加載;
While(不知足聚類中止條件) {
選取距離最小的邊edgeAB;
生產新的節點AB;
生產新的邊,AB和A,B的全部鄰居之間;
刪除A和A鄰居之間的邊,刪除B和B鄰居之間的邊;
刪除A,刪除B;
}

<a name="85901219"></a>
### [](https://www.atatech.org/articles/25067#2)MaxCompute Graph實現細節
層次聚類實現的核心是經過Vertex的compute來實現的。定義Vertex的執行狀態,分別包括:選舉狀態(minedge_electing);等待選舉結果狀態(waiting_election);中止狀態(waiting_delete)。

Vertex.compute() {
switch(current_state) {
case minedge_electing:
if(存在鄰居節點)

選取和鄰居節點之間最小的邊,發送給aggregator;

else

voteToHalt(); //沒有鄰居,中止計算退出;

break;
case waiting_election:

從aggregator獲取全局選取的最小邊minEdge;
if(minEdge的距離值>閥值距離) 
  voteToHalt();  //沒有能夠再作聚合的簇了,中止計算並退出;
else if(minEdge不是本節點和某個鄰居節點之間的邊)
  轉換狀態到minedge_electing,準備下一輪選舉迭代;

else {
//假設本節點爲A, minEdge對應的鄰居爲B
addVertexRequest(AB); //mergeA和B新生產節點
for(Vertex neighbor: A’s neighbors) {

removeEdgeRequest(A->neighbor);
removeEdgeRequest(neighbor->A);
if(neighbor不是B) {
  addEdgeRequest(AB->neighbor);
  addEdgeRequest(neighbor->AB);

}
}
removeVertexRequest(A);
轉換狀態到waiting_delete;
}
break;
case waiting_delete:
voteToHalt();
break;
}
}

全局Aggregator定義:兩兩比較邊的距離值,選取最小的那個;<br />節點衝突Resolver定義:當A節點發現minEdge是edge(A B)的同時,B也一樣發現,其處理流程和A是對稱相等的,所以會出現衝突(重複增長新節點AB,重複增長和刪除邊edge(AB, C),當C和A,B都有鏈接的時候)。以下圖所示:C節點是A,B的共同鄰居,所以A,B合併爲新的節點AB後,針對C節點就須要特別處理衝突的狀況;而D,E的處理就相對簡單。<br />![image.png](https://intranetproxy.alipay.com/skylark/lark/0/2019/png/23934/1548231591291-bbd4dc18-782b-4c32-bd27-cdabe809260d.png#align=left&display=inline&height=267&name=image.png&originHeight=267&originWidth=496&size=38044&status=done&width=496)
<a name="dd76d17c"></a>
### [](https://www.atatech.org/articles/25067#3)並行近似優化
上述的聚類流程中,真正並行化執行只是在選舉最短距離的過程(單機版須要掃描全部的邊,graph分佈式由節點把相鄰的最短距離report to aggregator),而merge僅僅只有兩個節點參與。因爲graph框架自己的耗費,實際測試發現程序執行速度並不理想。<br />既然在節點merge的過程沒有並行化,那麼就思考是否在這塊能夠作並行化處理,答案是確定的。例以下圖中,邊edgeAB能夠merge的同時,是否能夠考慮把edgeGH也merge。<br />![image.png](https://intranetproxy.alipay.com/skylark/lark/0/2019/png/23934/1548231613961-623689b7-4a89-4a8b-8465-b9230daac081.png#align=left&display=inline&height=331&name=image.png&originHeight=331&originWidth=548&size=56037&status=done&width=548)<br />從圖上看出edgeAB和edgeGH之間路徑相對比較遠,同步merge G,H對全局結果的影響不大,按照標準的全局選舉流程,最終也會選擇G,H來merge。固然,理論上來講,有可能因爲A,B合併了之後,致使和周圍節點邊更新,從而影響了後續的全局選舉結果。所以,並行化的merge節點最終是一個近似的結果。爲了保證近似結果的可靠性,第一在於同時可merge節點之間的路徑要足夠的遠,相互影響的可能性就小。考慮一個極端的狀況,就是路徑無窮大,實質是不連通的狀況,那麼同時merge就徹底沒有風險了。第二,必須保證節點merge之後,生成新邊的權重要合理,以保證並行化merge順序和非並行化merge順序近似一致,有關這一點後續會細說。<br />修改選舉最短距離邊的實現,不用全局選舉的結果,而是在必定路徑範圍內選舉出最短距離邊,而後merge,這樣就同時會選舉出多個局部最短距離邊。可同步merge的邊必須知足一個最短路徑閥值,以下圖所示:edgeAB和edgeDE是能夠同步merge的,不會起衝突,由於對節點C而言,分別增長了兩個新的節點;若是edgeAB和edgeCD同步merge,那就會起衝突,由於兩個新生產的節點之間也須要產生鄰居關係。所以,必須保證同步merge的邊之間至少存在一個不變化的節點,這樣就避免了新節點之間的鄰居關係生成。<br />![image.png](https://intranetproxy.alipay.com/skylark/lark/0/2019/png/23934/1548231646757-ea9d8f3f-61bc-418a-8766-f1b47f876c7c.png#align=left&display=inline&height=158&name=image.png&originHeight=158&originWidth=551&size=41850&status=done&width=551)<br />在局部選舉的過程當中,依然採用的是節點report本身所知道的最短距離,只是將report給aggregator,改成report給鄰居,而且經過屢次迭代實現傳播功能。局部選舉的僞代碼以下:

Step1:
選取和鄰居節點之間距離最小的邊,發送給全部鄰居節點以及本節點;進入step2;
Step2:
從接受的消息中選取距離最小的邊(包括了在step1中鄰居以及本節點選取的結果),發送給全部鄰居節點以及本節點;進入step3;
Step3:
從接受的消息中選取距離最小的邊(包括了在step2中鄰居以及本節點選取的結果),發送給全部鄰居節點以及本節點;進入step4;
……….
StepN:
從接受的消息中選取距離最小的邊(包括了在stepN-1中鄰居以及本節點選取的結果),若是minEdge是本節點的一條邊,那麼就進行merge,不然進入step1;

事實上,每一step就是不斷地選舉局部最短距離邊,而且把這個信息逐層擴散,這樣就確保了在必定的路徑範圍內永遠只選舉一個最短距離邊。N的設置能夠配置,顯然,N越小,並行化程度就越高。固然,必須避免衝突,所以N的最小取值爲3。
<a name="ce23477e"></a>
### [](https://www.atatech.org/articles/25067#4)邊權重更新
層次聚類過程當中,簇和簇之間距離的計算能夠參考[維基百科](http://en.wikipedia.org/wiki/Hierarchical_clustering) 提到的各類方法。本文參考的是[Ward方法](http://en.wikipedia.org/wiki/Ward%27s_method) 來計算節點merge之後和鄰居節點之間的邊權重。另外要說明的是有關邊距離的度量,因爲本文提出的方法是針對淘寶商品[interest entity node](http://dthink.alibaba-inc.com/articles/commonalg/interestgraph.htm)聚類的實現,而輸入是node和node之間協同類似度(看了又看,買了又買);所以節點之間的距離度量是和類似度成反比的。類似度越大,等同於距離就越小。爲簡單起見,就直接用類似度做爲距離的度量。每次選舉局部距離最小的節點對,便是選舉類似度最大的節點對。<br />基於Ward的思想,把要merge的兩個簇的節點數量做爲衡量的標準,同時考慮到下降邊權重減弱的速度,最終用如下的方法作更新:<br />假設要merge的兩個節點分別爲A和B,節點nA,nB分別是A和B的鄰居;<br />nA,nB和新節點AB的類似度計算:

sim(nA, AB)=sim(A, nA) * alphaA;
sim(nB, AB)=sim(B, nB) * alphaB;
當size(A) + size(B)=2的時候,alphaA=alphaB=0.9;
不然alphaA=sqrt(sizeA) / sqrt(sizeA + sizeB), alphaB=sqrt(sizeB) / sqrt(sizeA + sizeB)。
當nA和nB爲同一個節點的時候,也即A,B共同鄰居,和新節點AB的類似度最終合併爲:(sim(A, nA) + sim(B, nB)) * 0.618。

<a name="d41d8cd9-1"></a>
### [](https://www.atatech.org/articles/25067#5)

<a name="992bf7f5"></a>
## 基於MaxCompute Graph實現大規模網絡的關係擴散
關係數據相關的實體有天然人、企業、媒介、帳號等,如何對由億級別的節點和邊組成的大規模網絡進行有效的圖計算是一個剛性需求。
<a name="2fe57705"></a>
### [](https://www.atatech.org/articles/104874#0)問題抽象
若是有一個億級別的大規模有向網絡(就假設爲微博的用戶關注關係網絡好了,便於理解),如何進行關係擴散找到用戶可能想關注的其餘用戶呢?打個比方,A用戶關注了B,B又關注了C,那麼可能C就是A想要關注的潛在用戶,如今咱們要作的事就是把全部的C找出來推薦給A,最好還要把A到C的關注鏈路也一併輸出,便於其餘深刻的分析。咱們的目標定爲四度關係擴散,A—>B—>C—>D—>E,找到E。
<a name="01ea6c5e"></a>
### [](https://www.atatech.org/articles/104874#1)暴力解法
最直接的想法就是對已有的一度關係表進行一次join獲得兩度關係,進行兩次join獲得三度關係,依次類推。假設網絡是均勻分佈的,每一個人關注的人數量級差很少,利用MaxCompute強大的計算能力,這種方法還有可能會計算出結果。然而現實的網絡結構每每會存在小部分的出邊和入邊遠大於平均水平的超點(微博大V),這些點在join的過程當中極易形成數據傾斜,一次join還能勉勉強強接受,但兩次三次join最後99.9%會以計算失敗了結。<br />那麼利用MaxCompute Graph的sendMessage機制可否解決這個問題呢?在每一迭代步裏,每一個節點都將自身的節點值添加到上游節點傳來的路徑後面,再將路徑當作message傳遞給下游節點,以下圖所示,計算過程以下:<br />[![](https://intranetproxy.alipay.com/skylark/lark/0/2019/png/23934/1548231788722-320626e5-6f77-45d9-861c-a62c2dc3c52d.png#align=left&display=inline&height=181&originHeight=181&originWidth=319&size=0&status=done&width=319)](http://ata2-img.cn-hangzhou.img-pub.aliyun-inc.com/98ed95e8939df0408db2fee4eb23c0b9.png)<br />第一步:每一個節點的value設置爲自身的id,並將value發送全部出邊的終點;<br />第二步:每一個節點將收到的全部消息存儲爲一個list,將自身id添加到list裏面的每一個元素後面,再將這個list發送給下游節點;<br />第三~五步:重複第二步。第五步輸出的長度爲5的路徑便是咱們想要的結果。<br />[![](https://intranetproxy.alipay.com/skylark/lark/0/2019/png/23934/1548231788729-ac475dbe-49a2-4801-846b-d25ca9edd198.png#align=left&display=inline&height=295&originHeight=596&originWidth=1508&size=0&status=done&width=746)](http://ata2-img.cn-hangzhou.img-pub.aliyun-inc.com/644dbbfd4438f4ada3e51e1a1b91d147.png)<br />但最終實踐證實,這種方法在到第二步之後就會內存不夠報錯,儘管已經將各項參數調到最大,仍是不行。主要緣由是發送消息採用數組的形式太佔內存,每一步都將自身節點添加到全部路徑後面也會有重複存儲的問題,看來還有不少優化的空間。
<a name="ac500c12"></a>
### [](https://www.atatech.org/articles/104874#2)兩度關係
咱們先從最簡單的兩度關係入手,因爲MaxCompute Graph是以點爲粒度進行輸出的,所以咱們只需找到每一個節點的頭和尾,至關於把兩度路徑的中間節點固定住,遍歷頭部節點和尾部節點,就能夠輸出全部的兩度路徑了。實現很簡單,首先定義一個MyValue的class存儲全部的上游節點值和下游節點值以及自身節點值:

public static class MyValue implements Writable {

private Tuple downVertex;   //下游節點
private Text selfId;        //自身節點
private Tuple upVertex;     //上游節點
public MyValue() {
    downVertex = new Tuple();
    selfId = new Text();
    upVertex = new Tuple();
}
public MyValue(Text id) {
    downVertex = new Tuple();
    selfId = new Text(id);
    upVertex = new Tuple();
}
public void setSelfId(Text id) {
    selfId = id;
}
public void setDownVertex(Tuple value) {
    downVertex = value;
}
public void setUpVertex(Tuple value) {
    upVertex = value;
}
public Tuple getDownVertex() {
    return downVertex;
}
public Text getSelfId() {
    return selfId;
}
public Tuple getUpVertex() {
    return upVertex;
}
public void addDownVertex(Writable value) {
    downVertex.append(value);
}
public void addUpVertex(Writable value) {
    upVertex.append(value);
}
@Override
public void write(DataOutput out) throws IOException {
    upVertex.write(out);
    selfId.write(out);
    downVertex.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
    upVertex.readFields(in);
    selfId.readFields(in);
    downVertex.readFields(in);
}

}

而後進行簡單的5步迭代便可獲得結果:

switch ((int) context.getSuperstep()) {

case 0:   //設置自身節點值
    getValue().setSelfId(getId());
    break;
case 1:   //發送自身id給下游節點
    if (hasEdges()) {
        context.sendMessageToNeighbors(this, new MyValue(getId()));
    }
    break;
case 2:   //存儲收到的消息,存儲爲上游節點列表 
    for (MyValue msg : messages) {
        getValue().addUpVertex(msg.getSelfId());
    }
    break;
case 3:   //發送自身id給上游節點
    for (Writable id : getValue().getUpVertex().getAll()) {
        context.sendMessage((Text) id, new MyValue(getId()));
    }
    break;
case 4:   //存儲收到的消息,存儲爲下游節點列表
    for (MyValue msg : messages) {
        getValue().addDownVertex(msg.getSelfId());
    }
    break;

}

最後將結果輸出便可:

@Override
public void cleanup(WorkerContext context)

throws IOException {
context.write(new Text(getValue().getUpVertex().toDelimitedString(',')),
        getId(),
        new Text(getValue().getDownVertex().toDelimitedString(',')));

}

輸出結果的第一列和第三列均爲數組,第二列爲當前的節點,利用trans_array函數便可將數組轉換爲多行。這裏有個坑須要注意,sql不能寫成下面的形式:

select trans_array(2, ',', node1, node2, node3) as (node1, node2, node3)
from
(

select trans_array(2, ',', node2, node3, node1) as (node2, node3, node1)
from result_table

)t1

由於這樣會把那些出邊和入邊很是多的節點同時解析兩列trans_array的工做量分配到一個mapper上,形成嚴重的數據傾斜,寫成下面的形式便可進行兩次的資源分配,極大地下降數據傾斜的程度。

drop table if exists result_table_left;
create table result_table_left lifecycle 7 as
select trans_array(2, ',', node2, node3, node1) as (node2, node3, node1)
from result_table;
drop table if exists result_table_right;
create table result_table_right lifecycle 7 as
select trans_array(2, ',', node1, node2, node3) as (node1, node2, node3)
from result_table_left;

<a name="b1b13f88"></a>
### [](https://www.atatech.org/articles/104874#3)三度關係
既然兩度關係能夠利用MaxCompute Graph的特性固定住中間的節點,那麼天然地,咱們能夠想到,三度關係能夠固定住中間的兩個節點,變成以關係對的粒度(也就是邊的粒度)進行路徑頭和尾的遍歷。可是Graph的輸出是以點爲粒度,想要實現邊的粒度還須要再多發送一次消息,以下所示:

switch ((int) context.getSuperstep()) {

case 0:   //設置自身節點值
    getValue().setSelfId(getId());
    break;
case 1:   //發送自身id給下游節點
    if (hasEdges()) {
        context.sendMessageToNeighbors(this, new MyValue(getId()));
    }
    break;
case 2:   //存儲收到的消息,存儲爲上游節點列表 
    for (MyValue msg : messages) {
        getValue().addUpVertex(msg.getSelfId());
    }
    break;
case 3:   //發送自身id給上游節點
    for (Writable id : getValue().getUpVertex().getAll()) {
        context.sendMessage((Text) id, new MyValue(getId()));
    }
    break;
case 4:   //存儲收到的消息,存儲爲下游節點列表
    for (MyValue msg : messages) {
        getValue().addDownVertex(msg.getSelfId());
    }
    break;
case 5:   //再將下游節點的值發送給上游
    for (Writable id : getValue().getUpVertex().getAll()) {
        context.sendMessage((Text) id, getValue());
    }
    break;
case 6:  //結果輸出 [上游節點列表,本節點,當前消息所屬的下游節點,下游節點的下游節點列表]
    for (MyValue msg : messages) {
        context.write(new Text(getValue().getUpVertex().toDelimitedString(',')),
                getId(),
                msg.getSelfId(),
                new Text(msg.getDownVertex().toDelimitedString(',')));
    }
    break;

}

最後再像二度關係裏面用兩次trans_array解析便可獲得全部的三度關係路徑了。
<a name="3ad2b9ec"></a>
### [](https://www.atatech.org/articles/104874#4)四度關係
一樣按照以前的思路,四度關係至關於固定住中間的三個節點再進行頭部節點和尾部節點的遍歷。那麼問題來了,固定住一個節點能夠看作以點爲粒度進行遍歷,固定住兩個節點能夠看作是以邊爲粒度進行遍歷,那麼固定住三個節點至關於什麼呢?問題好像不可解了。可是咱們能夠換個思路來看,若是咱們把固定住三個節點轉換爲固定住兩個節點呢?以下圖所示,咱們已經經過兩度關係的輸出獲得全部的三個節點的路徑,如A和C,那麼咱們在A和C上新加一條邊,將邊的值設置爲中間節點B的節點值,這樣就能夠變成兩個節點了!而原來的邊還保留,只是邊的值爲空。<br /><br />[![](https://intranetproxy.alipay.com/skylark/lark/0/2019/png/23934/1548231788739-2cb97ed7-188f-4b2e-9305-d3aeba2d977a.png#align=left&display=inline&height=82&originHeight=82&originWidth=174&size=0&status=done&width=174)](http://ata2-img.cn-hangzhou.img-pub.aliyun-inc.com/10e374d629e8d1fb718255226111abf8.png)<br />所以,咱們從新用一度關係邊和兩度關係的虛擬邊構建一個新的網絡,再在新的網絡上運用三度關係的迭代方法。注意,添加虛擬邊後會讓節點的上下游節點列表變大,所以,前四步構建上下游節點列表時需加一條判斷邊的值爲空的條件,而後第五步和第六步輸出路徑時須要判斷邊不爲空。

switch ((int) context.getSuperstep()) {

case 0:   //設置自身節點值
    getValue().setSelfId(getId());
    break;
case 1:   //發送自身id給下游節點
    if (hasEdges()) {
        for (Edge<Text, Text> e : getEdges()) {
            if (e.getValue().equals(new Text(""))) {
                context.sendMessage(e.getDestVertexId(), new MyValue(getId()));
            }
        }
    }
    break;
case 2:   //存儲收到的消息,存儲爲上游節點列表 
    for (MyValue msg : messages) {
        getValue().addUpVertex(msg.getSelfId());
    }
    break;
case 3:   //發送自身id給上游節點
    for (Writable id : getValue().getUpVertex().getAll()) {
        context.sendMessage((Text) id, new MyValue(getId()));
    }
    break;
case 4:   //存儲收到的消息,存儲爲下游節點列表
    for (MyValue msg : messages) {
        getValue().addDownVertex(msg.getSelfId());
    }
    break;
case 5:   //再將本節點的值和邊值發送給下游
    if (hasEdges()) {
        MyValue msg = new MyValue();
        msg.setDownVertex(getValue().getDownVertex());
        msg.setUpVertex(getValue().getUpVertex());
        for (Edge<Text, Text> e : getEdges()) {
            if (!e.getValue().equals(new Text(""))) {
                String id = getId().toString();
                String edge = e.getValue().toString();
                msg.setSelfId(new Text(id+"+"+edge);                     
                context.sendMessage(e.getDestVertexId(), msg);
            }
        }
    }
    break;
case 6:  //結果輸出 [上游的上游節點列表,上游節點+中間節點,本節點,本節點的下游節點列表]
    for (MyValue msg : messages) {
        context.write(new Text(msg.getUpVertex().toDelimitedString(','))
                msg.getSelfId(),
                getId(),
                new Text(getValue().getDownVertex().toDelimitedString(',')));
    }
    break;

}

最後再用兩次trans_array和split解析便可獲得全部的四度關係路徑了。
<a name="5a3530b9"></a>
### [](https://www.atatech.org/articles/104874#5)環路截斷
前面的討論沒有考慮環路的狀況,實際中環路是很常見的,好比兩我的互相關注。有環路時,輸出的路徑須要截斷。<br />兩度關係輸出爲3個節點,只需判斷頭尾不相同便可,頭尾相同將頭節點置爲空,退化爲一度關係。好比A—>B—>A截斷爲B—>A;<br />三度關係輸出爲4個節點,中間兩個節點確定不相同,判斷第一個節點是否和第3、第四個節點相同,相同將第一個節點截斷,再判斷第二個節點和第四個節點是否相同,相同的話在第二個節點處截斷,即同時將第一個和第二個節點置爲空。如A—>B—>C—>A截斷爲B—>C—>A,B—>A—>C—>A截斷爲C—>A;<br />四度關係同理,再也不贅述。
<a name="433531fd"></a>
### [](https://www.atatech.org/articles/104874#6)結語
本例實踐了一張有1億節點,2億邊的有向圖,對其進行了關係擴散,最終的結果兩度關係有221億,三度關係有2180億,四度關係已經上萬億了,計算耗時兩度關係40分鐘,三度關係90分鐘左右,四度及以上整個過程的瓶頸已經不在計算了,而在MaxCompute Graph輸出上,輸出的耗時基本以小時爲單位計算。

 



本文做者:雲花

原文連接

本文爲雲棲社區原創內容,未經容許不得轉載

相關文章
相關標籤/搜索