PageRank在Hadoop和spark下的實現以及對比

關於PageRank的地位,沒必要多說。
主要思想:對於每一個網頁,用戶都有可能點擊網頁上的某個連接,例如
A:B,C,D
B:A,D
C:A
D:B,C
由這個咱們能夠獲得網頁的轉移矩陣
     A    B    C    D
A  0    1/2  1    0
B 1/3   0    0    0
C 1/3  1/2  0    0
D 1/3  0     0    1/2
 
Aij表示網頁j到網頁i的轉移機率。假設起始狀態每一個用戶對ABCD四個網站的點擊機率相同都是0.25,那麼各個網站第一次被訪問的機率爲(0.25,0.25,0.25,0.25),第二次訪問考慮到在頁面跳轉,利用轉移矩陣對於網站A的機率爲(0,1/2,1,0)*(0.25,0.25,0.25,0.25)T,一次類推,通過若干次迭代會收斂到某個值。可是考慮到有些連接是單鏈即沒有別的連接只想他,他也不指向別的連接,以及有些連接是本身指向本身,那麼上述的方式將沒法收斂。因此後面加了一個阻尼係數通常取0.85,至於爲何是這樣,挺複雜的證實。
最後的公式爲alaph=factor*matrix*(alaph)T+(1-facotr)/n*
詳細的介紹能夠參考: http://blog.jobbole.com/71431/
接下來即是對比Hadoop和spark了。這裏只是單純的討論兩個環境下編程的效率,不討論性能。
Hadoop:
輸入的文件:
A 0.25:B,C,D
B 0.25:A,D
C 0.25:A
D 0.25:B,C
這裏得先說一句,之因此加了0.25是由於初始的機率爲1/n,而n爲網站數,這裏統計網站數又得須要一個MapReduce來實現,因此做罷,權當n是手工輸入的。
因爲每次迭代後的結果只能放在文件中,因此這裏花了不少時間在規範如何輸出,以及map和reduce之間如何傳值的問題。
在map中,咱們要作的是從輸入文件中獲取alaph和每一個網站的轉移機率。例如
A 0.25:B,C,D
B的轉移機率爲1/3並且是從A轉向B的,因此輸出的是<"B","link:A 0.333">link表示這是個轉移機率,A表示是從A出發的
alaph的表示:<"B","alaph: A 0.25">這裏的A表示這個alaph值對應這A。
因爲咱們這裏迭代後的輸入文件都是從輸出文件中獲取,因此咱們須要將輸出文件搞的和一開始輸入文件同樣,因此在map階段須要輸出<"A","content:B,C,D">方便reduce輸出和輸入文件同樣格式的輸出。
在reduce階段,此時對於鍵值B而言,會收到以下
<"B","link:A 0.333">
<"B","link:D 0.5">
<"B","alaph: A 0.25">
<"B","alaph: D 0.25">
<"B","content:A,D">
咱們根據不一樣的單詞,將value整合。這的alaph=0.333*0.25+0.5*0.25,接着再加上阻尼係數等,獲得最後的alaph值。而後利用content對應的value,最後輸出<"B:0.375","A,D">
這樣迭代若干次。
附上代碼:
 
  1 package org.apache.hadoop.PageRank;
  2 
  3 import java.util.ArrayList;
  4 
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.fs.FileSystem;
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.Text;
  9 import org.apache.hadoop.mapreduce.Job;
 10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 12 
 13 public class PageRank {
 14 
 15     public static void run(){
 16         
 17     }
 18     
 19     public static void main(String[] args) throws Exception {
 20         double factor=0;
 21         if(args.length>1){
 22             factor=Double.parseDouble(args[0]);
 23         }else{
 24             factor=0.85;
 25         }
 26         String input="hdfs://10.107.8.110:9000/PageRank_input";
 27         String output="hdfs://10.107.8.110:9000/PageRank/output";
 28         ArrayList<String> pathList=new ArrayList<String>();        
 29         for(int i=0;i<20;i++){
 30             Configuration conf = new Configuration();
 31             conf.set("num","4");
 32             conf.set("factor",String.valueOf(factor));
 33             Job job = Job.getInstance(conf, "PageRank");
 34             job.setJarByClass(org.apache.hadoop.PageRank.PageRank.class);
 35             job.setMapperClass(MyMapper.class);
 36             job.setReducerClass(MyReducer.class);
 37             job.setOutputKeyClass(Text.class);
 38             job.setOutputValueClass(Text.class);
 39             FileInputFormat.setInputPaths(job, new Path(input));
 40             FileOutputFormat.setOutputPath(job, new Path(output));
 41             input=output;
 42             pathList.add(output);
 43             output=output+1;
 44             
 45             System.out.println("the "+i+"th iterator is finished");
 46             job.waitForCompletion(true);
 47         }
 48         for(int i=0;i<pathList.size()-1;i++){
 49             Configuration conf=new Configuration();
 50             Path path=new Path(pathList.get(i));
 51             FileSystem fs=path.getFileSystem(conf);
 52             fs.delete(path,true);
 53         }
 54     }
 55 
 56 }
 57 
 58 
 59 
 60 package org.apache.hadoop.PageRank;
 61 
 62 import java.io.IOException;
 63 import java.util.HashMap;
 64 import java.util.Map;
 65 
 66 
 67 import org.apache.hadoop.io.LongWritable;
 68 import org.apache.hadoop.io.Text;
 69 import org.apache.hadoop.mapreduce.Mapper;
 70 
 71 public class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
 72 
 73     
 74     public void map(LongWritable ikey, Text ivalue, Context context)
 75             throws IOException, InterruptedException {
 76         String[] line=ivalue.toString().split(":");
 77         String content=line[1];
 78         int num=content.split(",").length;
 79         String word=line[0].split("    ")[0];
 80         String alaph=line[0].split("    ")[1];
 81         context.write(new Text(word),new Text("content:"+content));
 82         for(String w:content.split(",")){
 83             context.write(new Text(w),new Text("link:"+word+" "+String.valueOf(1.0/num)));
 84             context.write(new Text(w),new Text("alaph:"+word+" "+alaph));
 85         }
 86     }
 87 
 88 }
 89 
 90 
 91 
 92 package org.apache.hadoop.PageRank;
 93 
 94 import java.io.IOException;
 95 import java.util.HashMap;
 96 import java.util.Map;
 97 
 98 import org.apache.hadoop.conf.Configuration;
 99 import org.apache.hadoop.io.Text;
100 import org.apache.hadoop.mapreduce.Reducer;
101 
102 public class MyReducer extends Reducer<Text, Text, Text, Text> {
103 
104     public void reduce(Text _key, Iterable<Text> values, Context context)
105             throws IOException, InterruptedException {
106         // process values
107         Configuration conf=context.getConfiguration();
108         double factor=Double.parseDouble(conf.get("factor"));
109         int num=Integer.parseInt(conf.get("num"));
110         
111         Map<String,Double> alaph=new HashMap<String,Double>();
112         Map<String,Double> link=new HashMap<String,Double>();
113         
114         String content="";
115         for (Text val : values) {
116             String[] line=val.toString().split(":");
117             if(line[0].compareTo("content")==0){
118                 content=line[1];
119             }else {
120                 String[] s=line[1].split(" ");
121                 double d=Double.parseDouble(s[1]);
122                 if(line[0].compareTo("alaph")==0){
123                     alaph.put(s[0],d);
124                 }else if(line[0].compareTo("link")==0){
125                     link.put(s[0],d);
126                 }
127             }
128         }
129         double sum=0;
130         for(Map.Entry<String,Double> entry:alaph.entrySet()){
131             sum+=link.get(entry.getKey())*entry.getValue();
132         }
133         
134         System.out.println("    ");
135         System.out.println("sum is "+sum);
136         System.out.println("    ");
137         double result=factor*sum+(1-factor)/num;
138         context.write(_key,new Text(String.valueOf(result)+":"+content));
139         
140     }
141 
142 }

 

 
 
 
 
咱們能夠看出,其實在MapReduce中咱們將大把的精力花在了map的輸出上,而之因此這樣是由於咱們不能直接利用他的結果,而且爲了能迭代,咱們又只能格式化輸出,若是數據不少的,那麼在map階段將有不少的資源須要傳遞。總而言之,Hadoop讓咱們將大部分精力花在不應花的地方。
 
接下來看spark 。我這裏用的是python,在pyspark下運行。輸入文件:
A:B,C,D
B:A,D
C:A
D:B,C
先看代碼
def f(x):
    links=x[1][0]
    rank=x[1][1]
    n=len(links.split(","))
    result=[]
    for s in links.split(","):
        result.append((s,rank*1.0/n))
    return result

file="hdfs://10.107.8.110:9000/spark_test/pagerank.txt"

data=sc.textFile(file)
link=data.map(lambda x:(x.split(":")[0],x.split(":")[1]))
n=data.count()
rank=link.mapValues(lambda x:1.0/n)

for i in range(10):
    rank=link.join(rank).flatMap(f).reduceByKey(lambda x,y:x+y).mapValues(lambda x:0.15/n+0.85*x)

 



直接分析,data=sc.textFile(file)從hdfs中獲取text文件。
經過data.collect()能夠發現內容爲
 
咱們須要將其轉換爲鍵值對,那麼這裏就須要map函數
此時lambda x的x值爲字符串,因此經過:將其分割
 
接着經過n=data.count()咱們能夠直接得到網站數,而沒必要手動輸入
 
 
接着經過link.join(rank),讓link和rank根據key而join進來
link.join(rank).flatMap(f)用於提取鍵值,因爲輸入的是(page,(links,rank)),因此這裏定義了一個函數f用於分割links,讓links分割成若干個link,並加上rank輸出。
最後只需將其按照key值進行reduce便可
link.join(rank).flatMap(f).reduceByKey(lambda x,y:x+y),這樣就會將相同key的機率相加,獲得alaph,接着再加上阻尼係數便可
 
link.join(rank).flatMap(f).reduceByKey(lambda x,y:x+y).mapValues(lambda  x:0.15/n+0.85*x)這樣就是一個完整的計算
經過迭代若干次就能夠了。
從代碼量上說(雖然python比java簡明)spark的確是比Hadoop好不少。緣由也說了,1每次迭代沒必要將結果存放在文件中 2提供了更多的範式
相關文章
相關標籤/搜索