一塊兒學Hadoop——使用IDEA編寫第一個MapReduce程序(Java和Python)

上一篇咱們學習了MapReduce的原理,今天咱們使用代碼來加深對MapReduce原理的理解。java

wordcount是Hadoop入門的經典例子,咱們也不能免俗,也使用這個例子做爲學習Hadoop的第一個程序。本文將介紹使用java和python編寫第一個MapReduce程序。python

本文使用Idea2018開發工具開發第一個Hadoop程序。使用的編程語言是Java。shell

打開idea,新建一個工程,以下圖所示:apache

在彈出新建工程的界面選擇Java,接着選擇SDK,通常默認便可,點擊「Next」按鈕,以下圖:編程

 

在彈出的選擇建立項目的模板頁面,不作任何操做,直接點擊「Next」按鈕。數組

輸入項目名稱,點擊Finish,就完成了建立新項目的工做,咱們的項目名稱爲:WordCount。以下圖所示:app

 

添加依賴jar包,和Eclipse同樣,要給項目添加相關依賴包,不然會出錯。編程語言

點擊Idea的File菜單,而後點擊「Project Structure」菜單,以下圖所示:ide

 

依次點擊Modules和Dependencies,而後選擇「+」的符號,以下圖所示:函數

 

 

選擇hadoop的包,我用得是hadoop2.6.1。把下面的依賴包都加入到工程中,不然會出現某個類找不到的錯誤。

(1)」/usr/local/hadoop/share/hadoop/common」目錄下的hadoop-common-2.6.1.jar和haoop-nfs-2.6.1.jar;

(2)/usr/local/hadoop/share/hadoop/common/lib」目錄下的全部JAR包;

(3)「/usr/local/hadoop/share/hadoop/hdfs」目錄下的haoop-hdfs-2.6.1.jar和haoop-hdfs-nfs-2.7.1.jar;

(4)「/usr/local/hadoop/share/hadoop/hdfs/lib」目錄下的全部JAR包。

 

工程已經建立好,咱們開始編寫Map類、Reduce類和運行MapReduce的入口類:

 JAVA編寫MarReduce代碼

Map類以下:

 1 import org.apache.hadoop.io.IntWritable;
 2 
 3 import org.apache.hadoop.io.LongWritable;
 4 
 5 import org.apache.hadoop.io.Text;
 6 
 7 import org.apache.hadoop.mapreduce.Mapper;
 8 
 9 import java.io.IOException;
10 
11 
12 public class WordcountMap extends Mapper<LongWritable,Text,Text,IntWritable> {
13         public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{
14 
15         String line = value.toString();//讀取一行數據
16 
17         String str[] = line.split("");//由於英文字母是以「 」爲間隔的,所以使用「 」分隔符將一行數據切成多個單詞並存在數組中
18 
19         for(String s :str){//循環迭代字符串,將一個單詞變成<key,value>形式,及<"hello",1>
20             context.write(new Text(s),new IntWritable(1));
21         }
22     }
23 }                

 

Reudce類:

 1 import org.apache.hadoop.io.IntWritable;
 2 import org.apache.hadoop.mapreduce.Reducer;
 3 import org.apache.hadoop.io.Text;
 4 import java.io.IOException;
 5 
 6 public class WordcountReduce extends Reducer<Text,IntWritable,Text,IntWritable> {
 7         public void reduce(Text key, Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{
 8         int count = 0;
 9         for(IntWritable value: values) {
10             count++;
11         }
12         context.write(key,new IntWritable(count));
13         }
14 }    

 

 入口類 :

 1 import org.apache.hadoop.conf.Configuration;
 2 import org.apache.hadoop.fs.Path;
 3 import org.apache.hadoop.mapreduce.Job;
 4 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 5 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 6 import org.apache.hadoop.util.GenericOptionsParser;
 7 import org.apache.hadoop.io.IntWritable;
 8 import org.apache.hadoop.io.Text;
 9 
10 public class WordCount {
11 
12     public static void main(String[] args)throws Exception{
13         Configuration conf = new Configuration();
14         //獲取運行時輸入的參數,通常是經過shell腳本文件傳進來。
15         String [] otherArgs = new         GenericOptionsParser(conf,args).getRemainingArgs();
16         if(otherArgs.length < 2){
17             System.err.println("必須輸入讀取文件路徑和輸出路徑");
18             System.exit(2);
19         }
20         Job job = new Job();
21         job.setJarByClass(WordCount.class);
22         job.setJobName("wordcount app");
23     
24         //設置讀取文件的路徑,都是從HDFS中讀取。讀取文件路徑從腳本文件中傳進來
25         FileInputFormat.addInputPath(job,new Path(args[0]));
26         //設置mapreduce程序的輸出路徑,MapReduce的結果都是輸入到文件中
27         FileOutputFormat.setOutputPath(job,new Path(args[1]));
28 
29          //設置實現了map函數的類
30         job.setMapperClass(WordcountMap.class);
31         //設置實現了reduce函數的類
32         job.setReducerClass(WordcountReduce.class);
33 
34          //設置reduce函數的key值
35         job.setOutputKeyClass(Text.class);
36         //設置reduce函數的value值
37         job.setOutputValueClass(IntWritable.class);
38         System.exit(job.waitForCompletion(true) ? 0 :1);
39     }
40 }

 

代碼寫好以後,開始jar包,按照下圖打包。點擊「File」,而後點擊「Project Structure」,彈出以下的界面,

依次點擊"Artifacts" -> "+" -> "JAR" -> "From modules with dependencies",而後彈出一個選擇入口類的界面,選擇剛剛寫好的WordCount類,以下圖:

 

按照上面設置好以後,就開始打jar包,以下圖:

點擊上圖的「Build」以後就會生成一個jar包。jar的位置看下圖,依次點擊File->Project Structure->Artifacts就會看到以下的界面:

將打好包的wordcount.jar文件上傳到裝有hadoop集羣的機器中,而後建立shell文件,shell文件內容以下,/usr/local/src/hadoop-2.6.1是hadoop集羣中hadoop的安裝位置,

1 /usr/local/src/hadoop-2.6.1/bin/hadoop jar wordcount.jar \ #執行jar文件的命令以及jar文件名,
2 
3 hdfs://hadoop-master:8020/data/english.txt \ #輸入路徑
4 
5 hdfs://hadoop-master:8020/wordcount_output #輸出路徑

 

執行shell文件以後,會看到以下的信息,

 

上圖中數字1表示輸入分片split的數量,數字2表示map和reduce的進度,數字3表示mapreduce執行成功,數字4表示啓動多少個map任務,數字5表示啓動多少個reduce任務。

自行成功後在hadoop集羣中的hdfs文件系統中會看到一個wordcount_output的文件夾。使用「hadoop fs -ls /」命令查看:

 

在wordcount_output文件夾中有兩個文件,分別是_SUCCESS和part-r-00000,part-r-00000記錄着mapreduce的執行結果,使用hadoop fs -cat /wordcount_output/part-r-00000查看part-r-00000的內容:

 

能夠每一個英文單詞出現的次數。

至此,藉助idea 2018工具開發第一個使用java語言編寫的mapreduce程序已經成功執行。下面介紹使用python語言編寫的第一個mapreduce程序,相對於java,python編寫mapreduce會簡單不少,由於hadoop提供streaming,streaming是使用Unix標準流做爲Hadoop和應用程序之間的接口,因此可使用任何語言經過標準輸入輸出來寫MapReduce程序。

Python編寫MapReduce程序

看代碼:

實現了map函數的python程序,命名爲map.py:

 1 #!/usr/local/bin/python
 2 
 3 import sys #導入sys包
 4 
 5 for line in sys.stdin: #從標準輸入中讀取數據
 6     ss = line.strip().split(' ')#讀取每一行數據,strip()函數過濾掉空格換行的字符,split(' ')分隔出每一個額單詞並存放在數組ss中
 7 
 8     for s in ss: #讀取數組ss中的每一個單詞
 9         if s.strip() != "":
10             print "%s\t%s" % (s, 1)#構造以單詞爲key,1爲value的鍵值對,並寫入到標準輸出中。

 

 實現了reduce函數的python程序,命名爲reduce.py:

 

 1 import sys
 2 cur_word = None
 3 sum = 0
 4 for line in sys.stdin:
 5         ss = line.strip().split('\t')#從標準輸入中讀取數據。
 6         if len(ss) != 2:
 7                 continue
 8         word,cnt = ss
 9         if cur_word == None:
10                 cur_word = word
11         #由於從map流轉到reduce的數據時按照key排好序的,cur_word記錄的是上一個單詞,word記    #錄的是當前讀取的單詞,若是兩個單詞一致,則將sum+1,不然將word和sum值組成一個鍵值對,##寫入到標準輸出,同時sum賦值爲0,而且將word賦值給cur_word變量。
12         if cur_word != word:
13                 print '\t'.join([cur_word,str(sum)])
14                 cur_word = word
15                 sum = 0
16         sum += int(cnt)
17 print '\t'.join([cur_word,str(sum)])

 

map和reduce程序已經編寫完畢,下面編寫shell腳本文件:

 1 HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop"
 2 STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar "
 3 
 4 INPUT_FILE_PATH_1="/data/english.txt"#輸入路徑
 5 OUTPUT_PATH="/wordcount_output"#輸出路徑
 6 $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH#每次執行時都刪除輸出路徑,不然會出錯
 7 
 8 $HADOOP_CMD jar $STREAM_JAR_PATH \
 9                 -input $INPUT_FILE_PATH_1 \#指定輸入路徑
10                 -output $OUTPUT_PATH \#指定輸出路徑
11                 -mapper "python map.py" \#指定要執行的map程序
12                 -reducer "python reduce.py" \#指定要執行reduce程序
13                 -file ./map.py \#指定map程序所在的位置
14                 -file ./reduce.py#指定reduce程序所在的位置

到此Java和Python編寫第一個MapReduce程序已經完成。

相關文章
相關標籤/搜索