源代碼放在個人github上,想細緻瞭解的能夠訪問:TriangleCount on githubjava
圖的三角形計數問題是一個基本的圖計算問題,是不少複雜網絡分析(好比社交網絡分析)的基礎。目前圖的三角形計數問題已經成爲了 Spark 系統中 GraphX 圖計算庫所提供的一個算法級 API。本次實驗任務就是要在 Hadoop 系統上實現圖的三角形計數任務。git
一個社交網絡能夠看作是一張圖(離散數學中的圖)。社交網絡中的人對應於圖的頂點;社交網絡中的人際關係對應於圖中的邊。在本次實驗任務中,咱們只考慮一種關係——用戶之間的關注關係。假設「王五」在 Twitter/微博中關注了「李四」,則在社交網絡圖中,有一條對應的從「王五」指向「李四」的有向邊。圖 1 中展現了一個簡單的社交網絡圖,人之間的關注關係經過圖中的有向邊標識了出來。本次的實驗任務就是在給定的社交網絡圖中,統計圖中全部三角形的數量。在統計前,須要先進行有向邊到無向邊的轉換,依據以下邏輯轉換:github
「A→B」表示從頂點 A 到頂點 B 有一條有向邊。A-B 表示頂點 A 和頂點 B 之間有一條無向邊。一個示例見圖 1,圖 1 右側的圖就是左側的圖去除邊方向後對應的無向圖。
請在無向圖上統計三角形的個數。在圖 1 的例子中,一共有 3 個三角形。
本次實驗將提供一個 Twitter 局部關係圖做爲輸入數據(給出的圖是有向圖),請統計該圖對應的無向圖中的三角形個數。算法
圖 1 一個簡單的社交網絡示例。左側的是一個社交網絡圖,右側的圖是將左側圖中的有向邊轉換爲無向邊後的無向圖。數組
輸入數據僅一個文件。該文件由若干行組成,每一行由兩個以空格分隔的整數組成:性能優化
A,B 分別是兩個頂點的 ID。這一行記錄表示圖中具備一條由 A 到 B 的有向邊。整個圖的結構由該文件惟一肯定。
下面的框中是文件部份內容的示例:網絡
87982906 17975898 17809581 35664799 524620711 270231980 247583674 230498574 348281617 255810948 159294262 230766095 14927205 5380672
擴展一:挑戰更大的數據集!使用 Google+的社交關係網數據集做爲輸入數據集。app
擴展二:考慮將邏輯轉換由or改成and的三角形個數是多少,改變後的邏輯轉換以下:ide
job:OutDegreeStat的實現:函數
Map階段:(vertex1: Text, vertex2: Text) -> (vertex1: Text, 1: IntWritable) and (vertex2: Text, 1: IntWritable),實現代碼以下:
public static class OutDegreeStatMapper extends Mapper<Object, Text, Text, IntWritable> { private final IntWritable one = new IntWritable(1); @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); Text vertex1 = new Text(itr.nextToken()); Text vertex2 = new Text(itr.nextToken()); if (!vertex1.equals(vertex2)) { context.write(vertex1, one); context.write(vertex2, one); } } }
Reduce階段:(vertex: Text, degree: Iterable<IntWritable>) -> (vertex: Text, degreeSum: IntWritable),實現代碼以下:
public static class OutDegreeStatReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val: values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } }
Combiner階段:Combiner邏輯與Reduce邏輯一致,只是爲了減小數據量從而減小通訊開銷
job:EdgeConvert的實現:
Map階段:(vertex1: Text, vertex2: Text) -> (vertex1: Text, vertex2: Text) and (vertex2: Text, vertex1: Text),實現代碼以下:
public static class EdgeConvertMapper extends Mapper<Object, Text, Text, Text> { @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); Text vertex1 = new Text(itr.nextToken()); Text vertex2 = new Text(itr.nextToken()); if (!vertex1.equals(vertex2)) { context.write(vertex1, vertex2); context.write(vertex2, vertex1); } } }
Reduce階段:在setup函數中讀取存儲節點度的文件,在reduce函數中(vertex1: Text, vertex2List: iterable<Text>) -> (vertex1 with minimal degree: Text, vertex2 with maximal degree: Text),==在對鄰接表節點進行排序時,要重寫一個String Comparator,讓String按它所表示的數值大小進行比較,==實現代碼以下:
public static class EdgeConvertReducer extends Reducer<Text, Text, Text, Text> { private Map<String, Integer> degree; private Map<String, Boolean> edgeExisted; private URI[] cacheFiles; @Override public void setup(Context context) throws IOException, InterruptedException { degree = new HashMap<String, Integer>(); //讀取存儲節點度的文件 Configuration conf = context.getConfiguration(); cacheFiles = context.getCacheFiles(); for (int i = 0; i < cacheFiles.length; i++) { SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(new Path(cacheFiles[i]))); IntWritable key = new IntWritable(); Text value = new Text(); int cnt = 0; while (reader.next(key, value)) { degree.put(value.toString(), cnt); cnt++; } reader.close(); } } @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ Text vertex = new Text(); List<String> outvertex = new ArrayList<String>(); edgeExisted = new HashMap<String, Boolean>(); //記錄已處理邊以免重複統計 for (Text val: values) { if (!edgeExisted.containsKey(val.toString())) { edgeExisted.put(val.toString(), true); //比較邊兩個端點的度大小 if (degree.get(val.toString()) > degree.get(key.toString())) { outvertex.add(val.toString()); } } } //對鄰接節點從小到大進行排序,方便後續求交集 Collections.sort(outvertex, new ComparatorString()); for (String vt: outvertex) { vertex.set(vt); context.write(key, vertex); } } } //繼承String比較器按它所表示的數值大小進行比較 public static class ComparatorString implements Comparator<String> { public int compare(String a, String b) { if (a.length() > b.length()) { return 1; } else if (a.length() < b.length()){ return -1; } else { return a.compareTo(b); } } }
job:GraphTriangleCount的實現:
Map階段:以job:EdgeConvert的輸出文件做爲讀入,該文件不包含重複邊所以無需判斷轉換,直接按原樣映射(vertex1: Text, vertex2: Text) -> (vertex1: Text, vertex2: Text)便可,實現代碼以下:
public static class GraphTriangleCountMapper extends Mapper<Text, Text, Text, Text> { @Override public void map(Text key, Text value, Context context) throws IOException, InterruptedException{ context.write(key, value); } }
Reduce階段:在setup函數中讀取存儲小度指向大度的鄰接表文件,在reduce函數中(vertex1: Text, vertex2List: Iterable<Text>) -> ("TriangleNum": Text, triangleNum: LongWritable),在cleanup函數中寫當前這個reducer的三角形計數結果,實現代碼以下:
public static class GraphTriangleCountReducer extends Reducer<Text, Text, Text, LongWritable> { private final static String edgePath = TriangleCountDriver.HDFS_PATH + TriangleCountDriver.EdgeConvertPath; //鄰接表文件路徑 private Map<String, Integer> vexIndex; //存儲節點的鄰接表索引 private ArrayList<ArrayList<String>> vec = new ArrayList<ArrayList<String>>(); //存儲全局鄰接表 private long triangleNum = 0; @Override public void setup(Context context) throws IOException, InterruptedException { int cnt = 0; String lastVertex = ""; String sv, tv; ArrayList<String> outVertices = new ArrayList<String>(); vexIndex = new TreeMap<String, Integer>(); //獲取文件系統的接口 Configuration conf = context.getConfiguration(); FileSystem fs = FileSystem.get(conf); //讀取小度指向大度的邊鄰接表 for (FileStatus fst: fs.listStatus(new Path(edgePath))) { if (!fst.getPath().getName().startsWith("_")) { SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(fst.getPath())); Text key = new Text(); Text value = new Text(); while (reader.next(key, value)) { sv = key.toString(); tv = value.toString(); if (!sv.equals(lastVertex)) { if (cnt != 0) vec.add(outVertices); vexIndex.put(sv, cnt); cnt++; outVertices = new ArrayList<String>(); outVertices.add(tv); } else { outVertices.add(tv); } lastVertex = sv; } reader.close(); } } vec.add(outVertices); } @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ for (Text val: values) if (vexIndex.containsKey(val.toString())) //調用求交集函數獲取包含邊(key,val)的三角形個數 triangleNum += intersect(vec.get(vexIndex.get(key.toString())), vec.get(vexIndex.get(val.toString()))); } @Override public void cleanup(Context context) throws IOException, InterruptedException{ //將計數結果寫入文件 context.write(new Text("TriangleNum"), new LongWritable(triangleNum)); } //求有序集合的交集 private long intersect(ArrayList<String> avex, ArrayList<String> bvex) { long num = 0; int i = 0, j = 0; int cv; while (i != avex.size() && j != bvex.size()) { if (avex.get(i).length() > bvex.get(j).length()) { cv = 1; } else if (avex.get(i).length() < bvex.get(j).length()) { cv = -1; } else { cv = avex.get(i).compareTo(bvex.get(j)); } if (cv == 0) { i++; j++; num++; } else if (cv > 0) { j++; } else { i++; } } return num; } }
對於a->b and b->a then a-b的條件,只需改變job:EdgeConvert的實現便可,新建一個job:UndirectionalEdgeConvert:
Map階段:以原數據文件做爲輸入進行映射轉換:(vertex1: Text, vertex2: Text) -> ((vertex1, vertex2): Text, flag: Text) and ((vertex2, vertex1): Text, flag:Text),按vertex1進行分區,其中flag做爲節點序標記,用於幫助在Reducer中對雙向邊進行判斷,若是輸入的vertex1 > vertex2則flag置1,若是vertex1 < vertex2則flag置0,實現代碼以下:
public static class UndirectionalEdgeConvertMapper extends Mapper<Object, Text, Text, Text> { @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); String vertex1 = itr.nextToken(); String vertex2 = itr.nextToken(); String flag = "0"; //節點序標記,幫助Reducer判斷雙向邊 if (!vertex1.equals(vertex2)) { if (vertex1.compareTo(vertex2) > 0) flag = "1"; context.write(new Text(vertex1 + '\t' + vertex2), new Text(flag)); context.write(new Text(vertex2 + '\t' + vertex1), new Text(flag)); } } }
Reduce階段:((vertex1, vertex2): Text, flagList: Iterable<Text>) -> (vertex1 with minimal degree: Text, vertex2 with maximal degree: Text),若是flagList既包含0又包含1,則(vertex1, verex2)屬於雙向邊,而後判斷若是vertex2的度大於vertex1,則將vertex2加入到vertex1的鄰接表中,reduce函數部分的實現代碼以下:
@Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ String[] term = key.toString().split("\t"); //若是鄰接表出點變了則寫出lastkey爲出點的鄰接表 if (!lastKey.equals(term[0])) { if (outvertex.size() != 0) { Text vertex = new Text(); Text lastKeyText = new Text(lastKey); Collections.sort(outvertex, new EdgeConvert.ComparatorString()); for (String vt: outvertex) { vertex.set(vt); context.write(lastKeyText, vertex); } outvertex = new ArrayList<String>(); } } //判斷(term[0], term[1])是不是雙向邊 boolean flag0 = false, flag1 = false; for (Text val: values) { if (val.toString().equals("0")) flag0 = true; else flag1 = true; } if (flag0 && flag1) { if (degree.get(term[1]) > degree.get(term[0])) { outvertex.add(term[1]); } } lastKey = term[0]; }
2.0版本,在1.0的版本上進行了數據儲存和表示方面的優化,相同實驗環境(6個Reducer,每一個Reducer2G物理內存,Reducer中的java heapsize -Xmx2048m)跑Goolge+數據能快50s左右,具體優化細節以下:
實驗環境:CPU型號Intel(R) Xeon(R) CPU E5-2630 v2 @ 2.60GHz,雙物理CPU,單CPU6核12線程,因此一共24個虛擬核,程序設置6個reducer,每一個reducer配置2GB物理內存,reducer中的java heapsize配置-Xmx2048m
=="or"表示if a->b or b->a then a-b的狀況,"and"表示if a->b and b->a then a-b的狀況==
數據集 | 三角形個數 | Driver程序在集羣上的運行時間(秒) |
---|---|---|
Twitter(or) | 13082506 | 127s |
Google+(or) | 1073677742 | 278s |
Twitter(and) | 1818304 | 125s |
Goolge+(and) | 27018510 | 156s |
數據集 | 三角形個數 | Driver程序在集羣上的運行時間(秒) |
---|---|---|
Twitter(or) | 13082506 | 115s |
Google+(or) | 1073677742 | 230s |
Twitter(and) | 1818304 | 118s |
Goolge+(and) | 27018510 | 181s |
評估:2.0版本相對1.0版本在節點數據類型上做了優化,當數據量很大的時候,or狀況的性能有顯著的提高,Google+數據比1.0版本快了差很少50s左右,可是and狀況下2.0版本跑Google+數據性能卻降低了,我的猜想多是job:UnidirectionalEdgeConvert中的Mapper,Reducer,Partitioner,比較函數中涉及大量的位操做或者int與long之間的類型轉換,這個開銷比1.0版本的對字符串排序開銷更大。目前沒有很好的想法來避免頻繁的位操做與類型轉換,有idea的朋友能夠給我留言~