寫好源代碼以後,首先要編譯: javac -classpath /usr/local/hadoop/hadoop-core-1.2.1.jar:/usr/local/hadoop/lib/commons-cli-1.2.jar count.java -d org 在org目錄下生成三個class文件: count.class count\ Map.class count\ Reduce.class 以後將三個class文件打包: jar -cvf count.jar -C org/ . 以後在hadoop根目錄下生成count.jar文件 建立分佈式文件夾,並把要分析的數據放入之中: bin/hadoop fs -mkdir input bin/hadoop fs --put ~/Downloads/Gowalla_totalCheckins.txt input (~/Downloads/Gowalla_totalCheckins.txt爲我文件所在位置) 經過localhost:50070能夠查看: 能夠看到txt中的數據已經考到了input下。 接下來運行程序: bin/hadoop jar count.jar count input output 運行完以後會發現:生成一個output文件夾,其下有三個文件,輸出的信息保存在part-r-00000中 文件內容: 196514 2010-07-24T13:45:06Z 53.3648119 -2.2723465833 145064 196514 2010-07-24T13:44:58Z 53.360511233 -2.276369017 1275991 196514 2010-07-24T13:44:46Z 53.3653895945 -2.2754087046 376497 196514 2010-07-24T13:44:38Z 53.3663709833 -2.2700764333 98503 196514 2010-07-24T13:44:26Z 53.3674087524 -2.2783813477 1043431 196514 2010-07-24T13:44:08Z 53.3675663377 -2.278631763 881734 196514 2010-07-24T13:43:18Z 53.3679640626 -2.2792943689 207763 196514 2010-07-24T13:41:10Z 53.364905 -2.270824 1042822 其中第一列爲用戶id ,第二列爲登陸時間,第三列是用戶的緯度,第四列我爲用戶的經度,第五列爲用戶的地址id 文件源地址:http://snap.stanford.edu/data/loc-gowalla.html 本次程序是分析用戶的登陸時間,並分時間段進行統計。 源代碼:html
<!-- lang: java --> import java.io.IOException; import java.util.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class count { public static class Map extends Mapper<Object, Text, IntWritable, IntWritable> { // 實現map函數 public void map(Object key, Text value, Context context)throws IOException, InterruptedException { String line = value.toString(); int k; StringTokenizer itr = new StringTokenizer(line); int i = 0; int hour = 0,minute = 0,second = 0; while (itr.hasMoreTokens()) { String token = itr.nextToken(); i++; if(i == 2){ int indexOfT = token.indexOf('T'); int indexOfZ = token.indexOf('Z',indexOfT + 1); String substr = token.substring(indexOfT + 1,indexOfZ); int blank1 = substr.indexOf(':'); int blank2 = substr.indexOf(':',blank1 + 1); hour = Integer.parseInt(substr.substring(0,blank1),10); minute = Integer.parseInt(substr.substring(blank1 + 1,blank2),10); second = Integer.parseInt(substr.substring(blank2 + 1),10); } } k = (hour * 60 * 60 + minute * 60 + second) / (3600 * 4) ; context.write(new IntWritable( k ), new IntWritable(1)); } } public static class Reduce extends Reducer< IntWritable, IntWritable, IntWritable, IntWritable> { // 實現reduce函數 public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable val : values){ sum += val.get(); } context.write( key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: Multiple Table Join <in> <out>"); System.exit(2); } Job job = new Job(conf, "count"); job.setJarByClass(count.class); // 設置Map和Reduce處理類 job.setMapperClass(Map.class); job.setCombinerClass(Reduce.class); job.setReducerClass(Reduce.class); // 設置輸出類型 job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }