Hadoop平臺下面實現PageRank算法

這裏咱們在hadoop平臺下面實現Pagerank算法java


輸入文件格式以下:算法

1    1.0 2 3 4 5 6 7 8
2    2.0 3 4 5 6 7 8
3    3.0 4 5 6 7 8
4    4.0 5 6 7 8
5    5.0 6 7 8
6    6.0 7 8
7    7.0 8
8    8.0 1 2 3 4 5 6 7
apache


拿第一行進行說明: 1表示網址  而後用tab鍵隔開,記住必定要是tab鍵,1.0爲給予的初始pr值,2,3,4,5,6,7,8爲從網址1指向的網址
下面幾行都是如此
app


代碼以下:oop

package com.apache.hadoop.io;

import java.io.IOException;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.StringTokenizer;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class PageRank {

  public static class MyMapper   extends Mapper<Object, Text, Text, Text>
  {
	  	private Text id = new Text();
  	 	public void map(Object key, Text value, Context context ) throws IOException, InterruptedException
  	 	{
  	 		String line = value.toString();
  	 		//判斷是否爲輸入文件
  	 		if(line.substring(0,1).matches("[0-9]{1}"))
  	 		{
				  boolean flag = false;
				  if(line.contains("_"))
				  {
						line = line.replace("_","");
						flag = true;
				  }
				  //對輸入文件進行處理
				  String[] values = line.split("\t");
				  Text t = new Text(values[0]);
				  String[] vals = values[1].split(" ");
				  //將url保存下來,下次計算也要用到
				  String url="_";
				  
				  double pr = 0;
				  int i = 0;
				  int num = 0;
				  
				  if(flag)
				  {
					  i=2;
					  pr=Double.valueOf(vals[1]);
					  num=vals.length-2;
				  }
				  else
				  {
					  i=1;
					  pr=Double.valueOf(vals[0]);
					  num=vals.length-1;
				  }
				  
				  for(;i<vals.length;i++)
				  {
					  url=url+vals[i]+" ";
					  id.set(vals[i]);
					  Text prt = new Text(String.valueOf(pr/num));
					  context.write(id,prt);
				  }
				  context.write(t,new Text(url));
			  }
		  }
  }

  public static class MyReducer  extends Reducer<Text,Text,Text,Text>
  {
			  private Text result = new Text();
			  private Double pr = new Double(0);
			  
		 public void reduce(Text key, Iterable<Text> values,  Context context  ) throws IOException, InterruptedException
		 {
			  double sum=0;
			  String url="";
			  
			  for(Text val:values)
			  {
					  //發現_標記則代表是url,不然是外鏈pr,要參與計算
				  if(!val.toString().contains("_"))
				  {
					  sum=sum+Double.valueOf(val.toString());
				  }
				  else
				 {
					  url=val.toString();
				  }
	    	  }
			  pr=0.15+0.85*sum;
			  String str=String.format("%.3f",pr);
			  result.set(new Text(str+" "+url));
			  context.write(key,result);
		  }
 }

	public static void main(String[] args) throws Exception
	{
			 // String paths="/user/root/input11/file.txt";
		//這裏的路徑爲用戶本身的輸入文件的位置
		     String paths="hdfs://localhost:9000/user/root/input11";
			  String path1=paths;
			  String path2="";

			  //這裏咱們迭代20次
			  for(int i=1;i<=20;i++)
			  {
			       System.out.println("This is the "+i+"th job!");
			        System.out.println("path1:"+path1);
				 System.out.println("path2:"+path2);
						  
				  Configuration conf = new Configuration();
				Job job = new Job(conf, "PageRank");
						  
				 path2=paths+i;
						  
				 job.setJarByClass(PageRank.class);
			         job.setMapperClass(MyMapper.class);
				  job.setCombinerClass(MyReducer.class);
				job.setReducerClass(MyReducer.class);
				job.setOutputKeyClass(Text.class);
				job.setOutputValueClass(Text.class);
			       FileInputFormat.addInputPath(job, new Path(path1));
			      FileOutputFormat.setOutputPath(job, new Path(path2));
						  
			  path1=path2;
						  
			 job.waitForCompletion(true);
			System.out.println(i+"th end!");
		}
	  }	
 }

結果以下:

1    0.150 0.501 _2 3 4 5 6 7 8
2    0.150 0.562 _3 4 5 6 7 8
3    0.150 0.644 _4 5 6 7 8
4    0.150 0.755 _5 6 7 8
5    0.150 0.919 _6 7 8
6    0.150 1.184 _7 8
7    0.150 1.698 _8
8    0.150 2.822 _1 2 3 4 5 6 7 
url


能夠看出此時的結果比8要大,尚未到達預期結果,咱們能夠本身設置迭代次數spa

在進行實驗室實驗中間結果可能產生不少,咱們能夠在hadoop的HDFS中進行刪除相應的命令爲code


bin/hadoop fs -rmr /user/root/input11*orm

相關文章
相關標籤/搜索