說明: html
測試文件: java
echo -e "aa\tbb \tcc\nbb\tcc\tdd" > 3.txt
hadoop fs -put 3.txt /tmp/3.txt
全文的例子均以該文件作測試用例,統計單詞出現的次數(WordCount)。 python
一、原生態的方式:java 源碼編譯打包成jar包後,由 hadoop 腳本調度執行,舉例: git
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { /** * LongWritable, IntWritable, Text 均是 Hadoop 中實現的用於封裝 Java 數據類型的類, * 這些類實現了WritableComparable接口, 都可以被串行化從而便於在分佈式環境中進行數據交換, * 你能夠將它們分別視爲long,int,String 的替代品。 */ // IntWritable one 至關於 java 原生類型 int 1 private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // 每行記錄都會調用 map 方法處理,此處是每行都被分詞 StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); // 輸出每一個詞及其出現的次數 1,相似 <word1,1><word2,1><word1,1> context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // key 相同的鍵值對會被分發到同一個 reduce中處理 // 例如 <word1,<1,1>>在 reduce1 中處理,而<word2,<1>> 會在 reduce2 中處理 int sum = 0; // 相同的key(單詞)的出現次數會被 sum 累加 for (IntWritable val : values) { sum += val.get(); } result.set(sum); // 1個 reduce 處理完1 個鍵值對後,會輸出其 key(單詞)對應的結果(出現次數) context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // 多隊列hadoop集羣中,設置使用的隊列 conf.set("mapred.job.queue.name", "regular"); // 之因此此處不直接用 argv[1] 這樣的,是爲了排除掉運行時的集羣屬性參數,例如隊列參數, // 獲得用戶輸入的純參數,如路徑信息等 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); for (String argsStr : otherArgs) { System.out.println("-->> " + argsStr); } if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); // map、reduce 輸入輸出類 job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 輸入輸出路徑 FileInputFormat.addInputPath(job, new Path(otherArgs[1])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); // 多子job的類中,能夠保證各個子job串行執行 System.exit(job.waitForCompletion(true) ? 0 : 1); } }
執行: github
bin/hadoop jar /tmp/wordcount.jar WordCount /tmp/3.txt /tmp/5
結果: sql
hadoop fs -cat /tmp/5/* aa 1 bb 2 cc 2 dd 1
參考資料: shell
Hadoop - Map/Reduce 經過WordCount例子的變化來了解新版hadoop接口的變化 apache
http://blog.csdn.net/derekjiang/article/details/6836209 bash
Hadoop示例程序WordCount運行及詳解 app
http://samuschen.iteye.com/blog/763940
官方的 wordcount v1.0 例子
http://hadoop.apache.org/docs/r1.1.1/mapred_tutorial.html#Example%3A+WordCount+v1.0
A1 = load '/data/3.txt'; A = stream A1 through `sed "s/\t/ /g"`; B = foreach A generate flatten(TOKENIZE((chararray)$0)) as word; C = filter B by word matches '\\w+'; D = group C by word; E = foreach D generate COUNT(C), group; dump E;
注意:不一樣分隔符對load及後面的$0的影響。
create table textlines(text string); load data inpath '/data/3.txt' overwrite into table textlines; SELECT wordColumn, count(1) FROM textlines LATERAL VIEW explode(split(text,'\t+')) wordTable AS wordColumn GROUP BY wordColumn;
詳情請見:
#!/usr/bin/python import os,re,sys for line in sys.stdin: for i in line.strip().split("\t"): print i
reduce:
#!/usr/bin/python import os,re,sys arr = {} for words in sys.stdin: word = words.strip() if word not in arr: arr[word] = 1 else: arr[word] += 1 for k, v in arr.items(): print str(k) + ": " + str(v)
最後在shell下執行:
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-0.20.203.0.jar -file map.py -file reduce.py -mapper map.py -reducer reduce.py -input /data/3.txt -output /data/py
注意:腳本開頭須要顯示指定何種解釋器以及賦予腳本執行權限
#!/bin/bash tr '\t' '\n'
reduce:
#!/bin/bash sort|uniq -c
最後在shell下執行:
june@deepin:~/hadoop/hadoop-0.20.203.0/tmp> hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-0.20.203.0.jar -file map.py -file reduce.py -mapper map.py -reducer reduce.py -input /data/3.txt -output /data/py packageJobJar: [map.py, reduce.py, /home/june/data_hadoop/tmp/hadoop-unjar2676221286002400849/] [] /tmp/streamjob8722854685251202950.jar tmpDir=null 12/10/14 21:57:00 INFO mapred.FileInputFormat: Total input paths to process : 1 12/10/14 21:57:00 INFO streaming.StreamJob: getLocalDirs(): [/home/june/data_hadoop/tmp/mapred/local] 12/10/14 21:57:00 INFO streaming.StreamJob: Running job: job_201210141552_0041 12/10/14 21:57:00 INFO streaming.StreamJob: To kill this job, run: 12/10/14 21:57:00 INFO streaming.StreamJob: /home/june/hadoop/hadoop-0.20.203.0/bin/../bin/hadoop job -Dmapred.job.tracker=localhost:9001 -kill job_201210141552_0041 12/10/14 21:57:00 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201210141552_0041 12/10/14 21:57:01 INFO streaming.StreamJob: map 0% reduce 0% 12/10/14 21:57:13 INFO streaming.StreamJob: map 67% reduce 0% 12/10/14 21:57:19 INFO streaming.StreamJob: map 100% reduce 0% 12/10/14 21:57:22 INFO streaming.StreamJob: map 100% reduce 22% 12/10/14 21:57:31 INFO streaming.StreamJob: map 100% reduce 100% 12/10/14 21:57:37 INFO streaming.StreamJob: Job complete: job_201210141552_0041 12/10/14 21:57:37 INFO streaming.StreamJob: Output: /data/py june@deepin:~/hadoop/hadoop-0.20.203.0/tmp> hadoop fs -cat /data/py/part-00000 1 aa 1 bb 1 bb 2 cc 1 dd june@deepin:~/hadoop/hadoop-0.20.203.0/tmp>
特別提示:上述有些方法對字段後的空格忽略或計算,請注意仔細甄別。