基於mapreduce實現圖的三角形計數

源代碼放在個人github上,想細緻瞭解的能夠訪問:TriangleCount on githubjava

1、實驗要求

1.1 實驗背景

        圖的三角形計數問題是一個基本的圖計算問題,是不少複雜網絡分析(好比社交網絡分析)的基礎。目前圖的三角形計數問題已經成爲了 Spark 系統中 GraphX 圖計算庫所提供的一個算法級 API。本次實驗任務就是要在 Hadoop 系統上實現圖的三角形計數任務。git

1.2 實驗任務

        一個社交網絡能夠看作是一張圖(離散數學中的圖)。社交網絡中的人對應於圖的頂點;社交網絡中的人際關係對應於圖中的邊。在本次實驗任務中,咱們只考慮一種關係——用戶之間的關注關係。假設「王五」在 Twitter/微博中關注了「李四」,則在社交網絡圖中,有一條對應的從「王五」指向「李四」的有向邊。圖 1 中展現了一個簡單的社交網絡圖,人之間的關注關係經過圖中的有向邊標識了出來。本次的實驗任務就是在給定的社交網絡圖中,統計圖中全部三角形的數量。在統計前,須要先進行有向邊到無向邊的轉換,依據以下邏輯轉換:github

IF ( A→B) OR (B→A) THEN A-B

        「A→B」表示從頂點 A 到頂點 B 有一條有向邊。A-B 表示頂點 A 和頂點 B 之間有一條無向邊。一個示例見圖 1,圖 1 右側的圖就是左側的圖去除邊方向後對應的無向圖。
        請在無向圖上統計三角形的個數。在圖 1 的例子中,一共有 3 個三角形。
        本次實驗將提供一個 Twitter 局部關係圖做爲輸入數據(給出的圖是有向圖),請統計該圖對應的無向圖中的三角形個數。算法

圖 1 一個簡單的社交網絡示例。左側的是一個社交網絡圖,右側的圖是將左側圖中的有向邊轉換爲無向邊後的無向圖。數組

1.3 輸入說明

        輸入數據僅一個文件。該文件由若干行組成,每一行由兩個以空格分隔的整數組成:性能優化

A B

A,B 分別是兩個頂點的 ID。這一行記錄表示圖中具備一條由 A 到 B 的有向邊。整個圖的結構由該文件惟一肯定。
下面的框中是文件部份內容的示例:網絡

87982906 17975898
17809581 35664799
524620711 270231980
247583674 230498574
348281617 255810948
159294262 230766095
14927205 5380672

1.4 擴展

  • 擴展一:挑戰更大的數據集!使用 Google+的社交關係網數據集做爲輸入數據集。app

  • 擴展二:考慮將邏輯轉換由or改成and的三角形個數是多少,改變後的邏輯轉換以下:ide

IF ( A→B) AND (B→A) THEN A-B

2、實驗設計與實現

2.1 算法設計

  • step1:統計圖中每個點的度,不關心是入度仍是出度,而後對統計到的全部點的度進行排序
  • step2:將圖中每一條單向邊轉換成雙向邊,對於圖中a->b and b->a的兩條邊,分別轉換後須要去重,在轉換後的圖中篩選出小度指向大度的邊來創建鄰接表,而後對每一個點的鄰接點按從小到大進行排序
  • step3:對原圖中的邊進行轉換,確保每條邊是由數值小的點指向數值大的點並去重,而後遍歷每一條邊:求邊的兩個端點對應的鄰接點集的交集大小即爲包含這條邊的三角形個數。對每條邊對應的三角形個數進行累加便可獲得全圖包含的三角形個數。

2.2 程序設計

  • 根據算法步驟將程序設計成4個job來實現:
    1. job:OutDegreeStat用於對每一個點的度進行統計,在類OutDegreeStat中實現
    2. job:SortedOutDegree用於對全部點的度進行排序,在類OutDegreeStat中實現,在job1以後運行
    3. job:EdgeConvert用於創建存儲小度指向大度的邊的鄰接表,在類EdgeConvert中實現
    4. job:GraphTriangleCount用於遍歷每條邊求端點對應鄰接點集的交集來對三角形進行計數,在類GraphTriangleCount中實現

2.3 程序實現

  • job:OutDegreeStat的實現:函數

    1. Map階段:(vertex1: Text, vertex2: Text) -> (vertex1: Text, 1: IntWritable) and (vertex2: Text, 1: IntWritable),實現代碼以下:

      public static class OutDegreeStatMapper extends Mapper<Object, Text, Text, IntWritable> {
              private final IntWritable one = new IntWritable(1);
      
              @Override
              public void map(Object key, Text value, Context context)
                      throws IOException, InterruptedException {
                  String line = value.toString();
                  StringTokenizer itr = new StringTokenizer(line);
                  Text vertex1 = new Text(itr.nextToken());
                  Text vertex2 = new Text(itr.nextToken());
                  if (!vertex1.equals(vertex2)) {
                      context.write(vertex1, one);
                      context.write(vertex2, one);
                  }
              }
          }
    2. Reduce階段:(vertex: Text, degree: Iterable<IntWritable>) -> (vertex: Text, degreeSum: IntWritable),實現代碼以下:

      public static class OutDegreeStatReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
              @Override
              public void reduce(Text key, Iterable<IntWritable> values, Context context)
                      throws IOException, InterruptedException {
                  int sum = 0;
                  for (IntWritable val: values) {
                      sum += val.get();
                  }
                  context.write(key, new IntWritable(sum));
              }
          }
    3. Combiner階段:Combiner邏輯與Reduce邏輯一致,只是爲了減小數據量從而減小通訊開銷

  • job:SortedOutDegree的實現:
    1. Map階段:因爲mapreduce的reduce階段會按key進行排序,爲了按度進行排序,只需用hadoop自帶的InverseMapper類對鍵值對作逆映射(vertex: Text, degree: IntWritable) -> (degree: IntWritable, vertex: Text)便可。
    2. Reduce階段:無需設置Reducer類,hadoop的Reduce階段自動會對degree進行排序
  • job:EdgeConvert的實現:

    1. Map階段:(vertex1: Text, vertex2: Text) -> (vertex1: Text, vertex2: Text) and (vertex2: Text, vertex1: Text),實現代碼以下:

      public static class EdgeConvertMapper extends Mapper<Object, Text, Text, Text> {
              @Override
              public void map(Object key, Text value, Context context)
                      throws IOException, InterruptedException {
                  StringTokenizer itr = new StringTokenizer(value.toString());
                  Text vertex1 = new Text(itr.nextToken());
                  Text vertex2 = new Text(itr.nextToken());
                  if (!vertex1.equals(vertex2)) {
                      context.write(vertex1, vertex2);
                      context.write(vertex2, vertex1);
                  }
              }
          }
    2. Reduce階段:在setup函數中讀取存儲節點度的文件,在reduce函數中(vertex1: Text, vertex2List: iterable<Text>) -> (vertex1 with minimal degree: Text, vertex2 with maximal degree: Text),==在對鄰接表節點進行排序時,要重寫一個String Comparator,讓String按它所表示的數值大小進行比較,==實現代碼以下:

      public static class EdgeConvertReducer extends Reducer<Text, Text, Text, Text> {
              private Map<String, Integer> degree;
              private Map<String, Boolean> edgeExisted;
              private URI[] cacheFiles;
      
              @Override
              public void setup(Context context)
                      throws IOException, InterruptedException {
                  degree = new HashMap<String, Integer>();
      
                  //讀取存儲節點度的文件
                  Configuration conf = context.getConfiguration();
                  cacheFiles = context.getCacheFiles();
                  for (int i = 0; i < cacheFiles.length; i++) {
                      SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(new Path(cacheFiles[i])));
                      IntWritable key = new IntWritable();
                      Text value = new Text();
                      int cnt = 0;
                      while (reader.next(key, value)) {
                          degree.put(value.toString(), cnt);
                          cnt++;
                      }
                      reader.close();
                  }
              }
      
              @Override
              public void reduce(Text key, Iterable<Text> values, Context context)
                      throws IOException, InterruptedException{
                  Text vertex = new Text();
                  List<String> outvertex = new ArrayList<String>();
                  edgeExisted = new HashMap<String, Boolean>();  //記錄已處理邊以免重複統計
                  for (Text val: values) {
                      if (!edgeExisted.containsKey(val.toString())) {
                          edgeExisted.put(val.toString(), true);
                          //比較邊兩個端點的度大小
                          if (degree.get(val.toString()) > degree.get(key.toString())) {
                              outvertex.add(val.toString());
                          }
                      }
                  }
                  //對鄰接節點從小到大進行排序,方便後續求交集
                  Collections.sort(outvertex, new ComparatorString());
                  for (String vt: outvertex) {
                      vertex.set(vt);
                      context.write(key, vertex);
                  }
              }
          }
      
          //繼承String比較器按它所表示的數值大小進行比較
          public static class ComparatorString implements Comparator<String> {
              public int compare(String a, String b) {
                  if (a.length() > b.length()) {
                      return 1;
                  } else if (a.length() < b.length()){
                      return -1;
                  } else {
                      return a.compareTo(b);
                  }
              }
          }
  • job:GraphTriangleCount的實現:

    1. Map階段:以job:EdgeConvert的輸出文件做爲讀入,該文件不包含重複邊所以無需判斷轉換,直接按原樣映射(vertex1: Text, vertex2: Text) -> (vertex1: Text, vertex2: Text)便可,實現代碼以下:

      public static class GraphTriangleCountMapper extends Mapper<Text, Text, Text, Text> {
              @Override
              public void map(Text key, Text value, Context context)
                      throws IOException, InterruptedException{
                  context.write(key, value);
              }
          }
    2. Reduce階段:在setup函數中讀取存儲小度指向大度的鄰接表文件,在reduce函數中(vertex1: Text, vertex2List: Iterable<Text>) -> ("TriangleNum": Text, triangleNum: LongWritable),在cleanup函數中寫當前這個reducer的三角形計數結果,實現代碼以下:

      public static class GraphTriangleCountReducer extends Reducer<Text, Text, Text, LongWritable> {
              private final static String edgePath = TriangleCountDriver.HDFS_PATH + TriangleCountDriver.EdgeConvertPath;  //鄰接表文件路徑
              private Map<String, Integer> vexIndex;  //存儲節點的鄰接表索引
              private ArrayList<ArrayList<String>> vec = new ArrayList<ArrayList<String>>();  //存儲全局鄰接表
              private long triangleNum = 0;
      
              @Override
              public void setup(Context context)
                      throws IOException, InterruptedException {
                  int cnt = 0;
                  String lastVertex = "";
                  String sv, tv;
                  ArrayList<String> outVertices = new ArrayList<String>();
                  vexIndex = new TreeMap<String, Integer>();
                  //獲取文件系統的接口
                  Configuration conf = context.getConfiguration();
                  FileSystem fs = FileSystem.get(conf);
                  //讀取小度指向大度的邊鄰接表
                  for (FileStatus fst: fs.listStatus(new Path(edgePath))) {
                      if (!fst.getPath().getName().startsWith("_")) {
                          SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(fst.getPath()));
                          Text key = new Text();
                          Text value = new Text();
                          while (reader.next(key, value)) {
                              sv = key.toString();
                              tv = value.toString();
                              if (!sv.equals(lastVertex)) {
                                  if (cnt != 0) vec.add(outVertices);
                                  vexIndex.put(sv, cnt);
                                  cnt++;
                                  outVertices = new ArrayList<String>();
                                  outVertices.add(tv);
                              } else {
                                  outVertices.add(tv);
                              }
                              lastVertex = sv;
                          }
                          reader.close();
                      }
                  }
                  vec.add(outVertices);
              }
      
      
              @Override
              public void reduce(Text key, Iterable<Text> values, Context context)
                      throws IOException, InterruptedException{
                  for (Text val: values)
                      if (vexIndex.containsKey(val.toString()))
                          //調用求交集函數獲取包含邊(key,val)的三角形個數
                          triangleNum += intersect(vec.get(vexIndex.get(key.toString())), vec.get(vexIndex.get(val.toString())));
              }
      
              @Override
              public void cleanup(Context context) throws IOException, InterruptedException{
                  //將計數結果寫入文件
                  context.write(new Text("TriangleNum"), new LongWritable(triangleNum));
              }
      
              //求有序集合的交集
              private long intersect(ArrayList<String> avex, ArrayList<String> bvex) {
                  long num = 0;
                  int i = 0, j = 0;
                  int cv;
                  while (i != avex.size() && j != bvex.size()) {
                      if (avex.get(i).length() > bvex.get(j).length()) {
                          cv = 1;
                      } else if (avex.get(i).length() < bvex.get(j).length()) {
                          cv = -1;
                      } else {
                          cv = avex.get(i).compareTo(bvex.get(j));
                      }
                      if (cv == 0) {
                          i++;
                          j++;
                          num++;
                      } else if (cv > 0) {
                          j++;
                      } else {
                          i++;
                      }
                  }
                  return num;
              }
          }

2.4 擴展2的設計與實現

  • 對於a->b and b->a then a-b的條件,只需改變job:EdgeConvert的實現便可,新建一個job:UndirectionalEdgeConvert:

    1. Map階段:以原數據文件做爲輸入進行映射轉換:(vertex1: Text, vertex2: Text) -> ((vertex1, vertex2): Text, flag: Text) and ((vertex2, vertex1): Text, flag:Text),按vertex1進行分區,其中flag做爲節點序標記,用於幫助在Reducer中對雙向邊進行判斷,若是輸入的vertex1 > vertex2則flag置1,若是vertex1 < vertex2則flag置0,實現代碼以下:

      public static class UndirectionalEdgeConvertMapper extends Mapper<Object, Text, Text, Text> {
              @Override
              public void map(Object key, Text value, Context context)
                      throws IOException, InterruptedException {
                  StringTokenizer itr = new StringTokenizer(value.toString());
                  String vertex1 = itr.nextToken();
                  String vertex2 = itr.nextToken();
                  String flag = "0";  //節點序標記,幫助Reducer判斷雙向邊
                  if (!vertex1.equals(vertex2)) {
                      if (vertex1.compareTo(vertex2) > 0) flag = "1";
                      context.write(new Text(vertex1 + '\t' + vertex2), new Text(flag));
                      context.write(new Text(vertex2 + '\t' + vertex1), new Text(flag));
                  }
              }
          }
    2. Reduce階段:((vertex1, vertex2): Text, flagList: Iterable<Text>) -> (vertex1 with minimal degree: Text, vertex2 with maximal degree: Text),若是flagList既包含0又包含1,則(vertex1, verex2)屬於雙向邊,而後判斷若是vertex2的度大於vertex1,則將vertex2加入到vertex1的鄰接表中,reduce函數部分的實現代碼以下:

      @Override
      public void reduce(Text key, Iterable<Text> values, Context context)
          throws IOException, InterruptedException{
          String[] term = key.toString().split("\t");
          //若是鄰接表出點變了則寫出lastkey爲出點的鄰接表
          if (!lastKey.equals(term[0])) {
              if (outvertex.size() != 0) {
                  Text vertex = new Text();
                  Text lastKeyText = new Text(lastKey);
                  Collections.sort(outvertex, new EdgeConvert.ComparatorString());
                  for (String vt: outvertex) {
                      vertex.set(vt);
                      context.write(lastKeyText, vertex);
                  }
                  outvertex = new ArrayList<String>();
              }
          }
      
          //判斷(term[0], term[1])是不是雙向邊
          boolean flag0 = false, flag1 = false;
          for (Text val: values) {
              if (val.toString().equals("0")) flag0 = true;
              else flag1 = true;
          }
          if (flag0 && flag1) {
              if (degree.get(term[1]) > degree.get(term[0])) {
                  outvertex.add(term[1]);
              }
          }
          lastKey = term[0];
      }

3、性能分析與優化

3.1 性能分析

  • 該算法的性能瓶頸在遍歷每一條邊而後邊兩個端節點對應鄰接表的交集,而後對每一個頂點出發的鄰接表進行排序也比較耗時,算法總體的時間複雜度是O(E^1.5^),E爲邊的數目
  • 目前這個1.0版本的實現魯棒性比較好,節點編號用Text存儲,因此不管節點編號多大均可以存儲以及比較,輸入的圖能夠容許重複邊的出現,不會影響結果的正確性。但因爲mapreduce涉及大量的排序過程,用Text存儲節點也就意味着使用字符串排序,字符串之間的比較固然比整型比較開銷大,從而會影響程序的總體性能。除此以外,hadoop須要對數據進行序列化以後才能在網絡上傳輸,數據以文本文件輸入致使大量的數據序列化轉換也會下降程序性能。

3.2 性能優化

2.0版本,在1.0的版本上進行了數據儲存和表示方面的優化,相同實驗環境(6個Reducer,每一個Reducer2G物理內存,Reducer中的java heapsize -Xmx2048m)跑Goolge+數據能快50s左右,具體優化細節以下:

  • 將離散化稀疏的節點轉換成順序化的,這樣就能夠用IntWritabel表示節點(前提是節點數未超過INT_MAX)而不用Text來表示節點編號,這樣就能夠避免字符串排序,減小map和reduce階段的排序開銷
  • 將原始Text輸入文件轉換成Sequence,由於hadoop傳輸在網絡上的數據是序列化的,這樣能夠避免數據的序列化轉換開銷。可是因爲數據是串行轉換的,影響總體性能,可是能夠在第一次運行事後存起來,之後運行直接加載sequence的數據文件便可。這一步是和第一步順序化節點一塊兒完成的,轉換後的sequence文件存儲的是順序化的節點表示的邊。
  • 在a->b and b->a then a-b的條件下,在獲取小度指向大度的邊集任務中,mapper須要將一條邊的點對合併爲key以在reducer中判斷是不是雙向邊,看似只能用Text來存儲了,實則這裏有一個trick,在對節點順序化以後的節點數一般不會超過INT_MAX,所以可使用考慮將兩個int型表示的節點轉換成long,key存儲在高32位,value存儲在低32位,經過簡單的位操做便可實現,這樣mapper輸出的key就是long而非Text,從而避免了字符串的比較排序,因爲mapreduce涉及大量排序過程,所以在涉及程序的時候儘可能用一些trick避免用Text表示key.

3、程序運行結果及時耗

實驗環境:CPU型號Intel(R) Xeon(R) CPU E5-2630 v2 @ 2.60GHz,雙物理CPU,單CPU6核12線程,因此一共24個虛擬核,程序設置6個reducer,每一個reducer配置2GB物理內存,reducer中的java heapsize配置-Xmx2048m

=="or"表示if a->b or b->a then a-b的狀況,"and"表示if a->b and b->a then a-b的狀況==

  • 1.0版本的測試結果:
數據集 三角形個數 Driver程序在集羣上的運行時間(秒)
Twitter(or) 13082506 127s
Google+(or) 1073677742 278s
Twitter(and) 1818304 125s
Goolge+(and) 27018510 156s
  • 2.0版本的測試結果(不包含輸入文件轉換的時間):
數據集 三角形個數 Driver程序在集羣上的運行時間(秒)
Twitter(or) 13082506 115s
Google+(or) 1073677742 230s
Twitter(and) 1818304 118s
Goolge+(and) 27018510 181s

評估:2.0版本相對1.0版本在節點數據類型上做了優化,當數據量很大的時候,or狀況的性能有顯著的提高,Google+數據比1.0版本快了差很少50s左右,可是and狀況下2.0版本跑Google+數據性能卻降低了,我的猜想多是job:UnidirectionalEdgeConvert中的Mapper,Reducer,Partitioner,比較函數中涉及大量的位操做或者int與long之間的類型轉換,這個開銷比1.0版本的對字符串排序開銷更大。目前沒有很好的想法來避免頻繁的位操做與類型轉換,有idea的朋友能夠給我留言~

相關文章
相關標籤/搜索