這裏咱們在hadoop平臺下面實現Pagerank算法java
輸入文件格式以下:算法
1 1.0 2 3 4 5 6 7 8
2 2.0 3 4 5 6 7 8
3 3.0 4 5 6 7 8
4 4.0 5 6 7 8
5 5.0 6 7 8
6 6.0 7 8
7 7.0 8
8 8.0 1 2 3 4 5 6 7
apache
拿第一行進行說明: 1表示網址 而後用tab鍵隔開,記住必定要是tab鍵,1.0爲給予的初始pr值,2,3,4,5,6,7,8爲從網址1指向的網址
下面幾行都是如此app
代碼以下:oop
package com.apache.hadoop.io; import java.io.IOException; import java.text.DecimalFormat; import java.text.NumberFormat; import java.util.StringTokenizer; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class PageRank { public static class MyMapper extends Mapper<Object, Text, Text, Text> { private Text id = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { String line = value.toString(); //判斷是否爲輸入文件 if(line.substring(0,1).matches("[0-9]{1}")) { boolean flag = false; if(line.contains("_")) { line = line.replace("_",""); flag = true; } //對輸入文件進行處理 String[] values = line.split("\t"); Text t = new Text(values[0]); String[] vals = values[1].split(" "); //將url保存下來,下次計算也要用到 String url="_"; double pr = 0; int i = 0; int num = 0; if(flag) { i=2; pr=Double.valueOf(vals[1]); num=vals.length-2; } else { i=1; pr=Double.valueOf(vals[0]); num=vals.length-1; } for(;i<vals.length;i++) { url=url+vals[i]+" "; id.set(vals[i]); Text prt = new Text(String.valueOf(pr/num)); context.write(id,prt); } context.write(t,new Text(url)); } } } public static class MyReducer extends Reducer<Text,Text,Text,Text> { private Text result = new Text(); private Double pr = new Double(0); public void reduce(Text key, Iterable<Text> values, Context context ) throws IOException, InterruptedException { double sum=0; String url=""; for(Text val:values) { //發現_標記則代表是url,不然是外鏈pr,要參與計算 if(!val.toString().contains("_")) { sum=sum+Double.valueOf(val.toString()); } else { url=val.toString(); } } pr=0.15+0.85*sum; String str=String.format("%.3f",pr); result.set(new Text(str+" "+url)); context.write(key,result); } } public static void main(String[] args) throws Exception { // String paths="/user/root/input11/file.txt"; //這裏的路徑爲用戶本身的輸入文件的位置 String paths="hdfs://localhost:9000/user/root/input11"; String path1=paths; String path2=""; //這裏咱們迭代20次 for(int i=1;i<=20;i++) { System.out.println("This is the "+i+"th job!"); System.out.println("path1:"+path1); System.out.println("path2:"+path2); Configuration conf = new Configuration(); Job job = new Job(conf, "PageRank"); path2=paths+i; job.setJarByClass(PageRank.class); job.setMapperClass(MyMapper.class); job.setCombinerClass(MyReducer.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(path1)); FileOutputFormat.setOutputPath(job, new Path(path2)); path1=path2; job.waitForCompletion(true); System.out.println(i+"th end!"); } } }
1 0.150 0.501 _2 3 4 5 6 7 8
2 0.150 0.562 _3 4 5 6 7 8
3 0.150 0.644 _4 5 6 7 8
4 0.150 0.755 _5 6 7 8
5 0.150 0.919 _6 7 8
6 0.150 1.184 _7 8
7 0.150 1.698 _8
8 0.150 2.822 _1 2 3 4 5 6 7 url
能夠看出此時的結果比8要大,尚未到達預期結果,咱們能夠本身設置迭代次數spa
在進行實驗室實驗中間結果可能產生不少,咱們能夠在hadoop的HDFS中進行刪除相應的命令爲code
bin/hadoop fs -rmr /user/root/input11*orm