[hadoop in Action] 第3章 Hadoop組件

  • 管理HDFS中的文件
  • 分析MapReduce框架中的組件
  • 讀寫輸入輸出數據
 
一、HDFS文件操做
 
[命令行方式]
 
Hadoop的文件命令採起的形式爲:
hadoop fs -cmd <args>
其中,cmd是具體的文件命令,而<args>是一組數目可變的參數。
 
(1)添加文件和目錄
     HDFS有一個默認的工做目錄/user/$USER,其中$USER是你的登陸用戶名。不過這個目錄不會自動創建,讓咱們用mkdir命令建立它。Hadoop的mkdir命令會自動建立父目錄,相似於UNIX中使用-p選項的mkdir命令。
     hadoop fs -mkdir /user/chuck
     
     若是想看到全部的子目錄,則可使用hadoop的lsr命令,相似於UNIX中打開-r選項的ls:
     hadoop fs -lsr /
     [輸出結果顯示出屬性信息,好比權限、全部者、組、文件大小以及最後修改日期,全部這些都相似於UNIX的概念。顯示「1」的列給出文件的複製因子。由於複製因子不適用於目錄,故屆時該列僅會顯示一個破折號(-)]
 
     在本地文件系統中建立一個名爲examle.txt的文本文件,用hadoop的put命令將它從本地文件系統複製到HDFS中:
     hadoop fs -put example.txt ./
 
 
(2)獲取文件
     從HDFS中複製文件到本地文件系統:
     hadoop fs -get example.txt ./
 
     顯示HDFS中文件的內容:
     hadoop fs -cat example.txt
     [能夠在hadoop的文件命令中使用UNIX的管道,將其結果發送給其餘的UNIX命令作進一步處理]
 
     查看最後一千字節:
     hadoop fs -tail example.txt
 
(3)刪除文件
     刪除HDFS中的文件:
     hadoop fs -rm example.txt
    [ rm命令還能夠用於刪除空目錄]
 
     刪除目錄(目錄不爲空):
     hadoop fs -rmr /user/chuck
 
(4)查閱幫助
     hadoop fs -help <cmd>
 
[編程方式]
 
hadoop命令行工具中有一個getmerge命令,用於把一組HDFS文件在複製到本地計算機之前進行合併,下面開發的是實現把本地計算機文件複製到HDFS之前進行合併:
 
 

代碼清單 PutMerge程序
 
 1 import java.io.IOException;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.FSDataInputStream;
 5 import org.apache.hadoop.fs.FSDataOutputStream;
 6 import org.apache.hadoop.fs.FileStatus;
 7 import org.apache.hadoop.fs.FileSystem;
 8 import org.apache.hadoop.fs.Path;
 9 
10 public class PutMerge {
11 
12     public static void main(String[] args) throws IOException {
13 
14         Configuration conf = new Configuration();
15         FileSystem hdfs  = FileSystem.get(conf);
16         FileSystem local = FileSystem.getLocal(conf);
17 
18         Path inputDir = new Path(args[0]);   //(1)設定輸入目錄和輸出文件
19         Path hdfsFile = new Path(args[1]); 
20 
21         try {
22             FileStatus[] inputFiles = local.listStatus(inputDir);    //(2)獲得本地文件列表
23             FSDataOutputStream out = hdfs.create(hdfsFile);    //(3)生成HDFS輸出流
24 
25             for (int i=0; i<inputFiles.length; i++) {
26                 System.out.println(inputFiles[i].getPath().getName());
27                 FSDataInputStream in = local.open(inputFiles[i].getPath());    //(4)打開本地輸入流
28                 byte buffer[] = new byte[256];
29                 int bytesRead = 0;
30                 while( (bytesRead = in.read(buffer)) > 0) {
31                     out.write(buffer, 0, bytesRead);
32                 }
33                 in.close();
34             }
35             out.close();
36         } catch (IOException e) {
37             e.printStackTrace();
38         }
39     }
40 }
 
(1)根據用戶定義的參數設置本地目錄和HDFS的目標文件;
(2)提取本地輸入目錄中每一個文件的信息;
(3)建立一個輸出流寫入到HDFS文件;
(4)遍歷本地目錄中的每一個文件,打開一個輸入流來讀取該文件
 

 
FileSystem類還有些方法用於其餘標準文件操做,如delete()、exists()、mkdirs()和rename()。
 
二、剖析MapReduce程序
 
 
MapReduce程序經過操做鍵/值對來處理數據,通常形式爲:
map:(k1, v1) ——> list(k2, v2)
reduce:(k2, list(v2)) ——> list(k3,v3)
 
  1. 輸入數據;
  2. 輸入數據被分佈在節點上;
  3. 每一個map任務處理一個數據分片;
  4. Mapper輸出中間數據;
  5. 節點間的數據交換在「洗牌」階段完成;
  6. 相同key的中間數據進入相同的reducer;
  7. 存儲Reducer的輸出。
 
     雖然咱們能夠而且的確常常把某些鍵與值稱爲整數、字符串等,但它們實際上並非Integer、String等那些標準的Java類。這是由於爲了讓鍵/值對能夠在集羣上移動,MapReduce框架提供了一種序列化鍵/值對的方法。所以,只有那些支持這種序列化的類可以在這個框架中充當鍵或者值。
 
     更具體而言,實現Writable接口的類能夠是值,而實現WritableComparable<T>接口的類既能夠是鍵也能夠是值。注意WritableComparable<T>接口是Writable和java.lang.Comparable<T>接口的組合。對於鍵而言,咱們須要這個比較,由於它們將在Reduce階段進行排序,而值僅會被簡單地傳遞。
 
     Hadoop帶有一些預約義的類用於實現WritableComparable,包括面向全部基本數據類型的封裝類,以下表:
 
描述
BooleanWritable
標準布爾變量的封裝
ByteWritable
單字節數的封裝
DoubleWritable
雙字節數的封裝
FloatWritable
浮點數的封裝
IntWritable
整數的封裝
LongWritable
長整數的封裝
Text
使用UTF8格式的文本封裝
NullWritable
無鍵值的佔位符
 
     鍵和值所採用的數據類型能夠超過Hadoop自身所支持的基本類型,能夠自定義數據類型,只要它實現了Writable(或WritableComparable<T>)接口。
 

代碼清單 示例實現WritableComparable接口的類
 
 1 import java.io.DataInput;
 2 import java.io.DataOutput;
 3 import java.io.IOException;
 4  
 5 import org.apache.hadoop.io.WritableComparable;
 6  
 7 public class Edge implements WritableComparable<Edge> {
 8  
 9     private String departureNode;
10     private String arrivalNode;
11  
12     public String getDepartureNode() { return departureNode;}
13  
14     @Override
15     public void readFields(DataInput in) throws IOException {    //(1)說明如何讀入數據
16         departureNode = in.readUTF();
17         arrivalNode = in.readUTF();     
18     }
19  
20     @Override
21     public void write(DataOutput out) throws IOException {    //(2)說明如何寫入數據
22         out.writeUTF(departureNode);
23         out.writeUTF(arrivalNode); 
24     }
25  
26     @Override
27     public int compareTo(Edge o) {    //(3)定義數據排序
28      return (departureNode.compareTo(o.departureNode) != 0)
29          ? departureNode.compareTo(o.departureNode)
30          : arrivalNode.compareTo(o.arrivalNode);
31     }
32 }
 
這個Edge類實現了Writable接口的readFields()及write()方法。它們與Java中的DataInput和DataOutput類一塊兒用於類中內容的串行化。而Comparable接口中的實現是compareTo()方法。若是被調用的Edge小於、等於或者大於給定的Edge,這個方法會分別返回-1,0,1。

 
[Mapper]
 
     一個類要做爲mapper,需繼承MapReducebase基類並實現Mapper接口。並不奇怪,mapper和reducer的基類均爲MapReduceBase類。它包含類的構造與解構方法。
  • void configure(JobConfjob):該函數提取XML配置文件或者應用程序主類中的參數,在數據處理以前調用該函數。
  • void close():做爲map任務結束前的最後一個操做,該函數完成全部的結尾工做,如關閉數據庫鏈接、打開文件等。
 
     Mapper接口負責數據處理階段。它採用的形式爲Mapper<k1,v1,k2,v2>Java泛型,這裏鍵類和值類分別實現WritableComparable和Writable接口。Mapper只有一個方法——Map,用於處理一個單獨的鍵/值對。
     void map (k1 key, v1 value, OutputCollector<k2,v2> output, Reporter reporter) throws IOException
 
     該函數處理一個給定的鍵/值對 (k1,v1),生成一個鍵/值對(k2,v2)的列表(該列表也可能爲空)。OutputCollector接收這個映射過程的輸出,Reporter能夠提供對mapper相關附加信息的記錄,造成任務進度。
 
     Hadoop提供了一些有用的mapper實現,以下表:
 
描述
IdentityMapper<k,v>
實現Mapper<k,v,k,v>將輸入直接映射到輸出
InverseMapper<k,v>
實現Mapper<k,v,v,k>反轉鍵/值對
RegexMapper<k>
實現Mapper<k,text,text,LongWritable>,爲每一個常規表達式的匹配項生成一個(match,1)對
TokenCountMapper<k>
實現Mapper<k,text,text,LongWritable>,當輸入的值爲分詞時,生成一個(token,1)對
 
 
[Reducer]
 
     reducer的實現和mapper同樣必須首先在MapReduce基類上擴展,容許配置和清理。此外,它還必須實現Reducer接口使其具備以下的單一方法:
     void reduce(k2 key, Iterator<v2> values, OutputCollector<k3,v3> output, Reporter reporter) throws IOException
 
     當reducer任務接收來自各個mapper的輸出時,它按照鍵/值對中的鍵對輸入數據進行排序,並將相同鍵的值歸併。而後調用reduce()函數,並經過迭代處理那些與指定鍵相關聯的值,生成一個(可能爲空的)列表(k3,v3)。OutputCollector接收reduce階段的輸出,並寫入輸出文件。Reporter可提供對reducer相關附加信息的記錄,造成任務進度。
 
     Hadoop提供了一些基本的reducer實現,以下表:
 
描述
IdentityReudcer<k,v>
實現Reducer<k,v,k,v>將輸入直接映射到輸出
LongSumReducer<k>
實現<k,LongWritable,k,LongWritable>, 計算與給定鍵相對應的全部值的和
 
 
[Partitioner:重定向Mapper輸出]
 
     當使用多個reducer時,咱們就須要採起一些辦法來肯定mapper應該把鍵/值對輸出給誰。默認的做法是對鍵進行散列來肯定reducer。hadoop經過HashPartitioner類強制執行這個策略。但有時HashPartitioner會讓你出錯。
 
 1 public class EdgePartitioner implements Partitioner<Edge, Writable>
 2 {
 3      @verride
 4      public int getPartition(Edge key, Writable value, int numPartitions)
 5      {
 6           return key.getDepartureNode().hashCode() % numPartitions;
 7      }
 8  
 9      @verride
10      public void configure(JobConf conf) { }
11 }
 
      一個定製的partitioner只須要實現configure()和getPartition()兩個函數。前者將hadoop對做業的配置應用在patittioner上,然後者返回一個介於0和reducer任務數之間的整數,指向鍵/值對將要發送的reducer。
 
     在map和reduce階段之間,一個MapReduce應用必然從mapper任務獲得輸出結果,並把這些結果發佈給reduce任務。該過程一般被稱爲洗牌。
 
 
[Combiner:本地reduce]
 
在許多MapReduce應用場景中,咱們不妨在分發mapper結果以前作一下「本地Reduce」。
 
 
[預約義的mapper和reducer類的單詞計數]
 

代碼清單 修改的WordCount例程
 
 1 import org.apache.hadoop.fs.Path;
 2 import org.apache.hadoop.io.Text;
 3 import org.apache.hadoop.io.LongWritable;
 4 import org.apache.hadoop.mapred.FileInputFormat;
 5 import org.apache.hadoop.mapred.FileOutputFormat;
 6 import org.apache.hadoop.mapred.JobClient;
 7 import org.apache.hadoop.mapred.JobConf;
 8 import org.apache.hadoop.mapred.lib.TokenCountMapper;
 9 import org.apache.hadoop.mapred.lib.LongSumReducer;
10  
11 public class WordCount2 {
12     public static void main(String[] args) {
13         JobClient client = new JobClient();
14         JobConf conf = new JobConf(WordCount2.class);
15  
16         FileInputFormat.addInputPath(conf, new Path(args[0]));
17         FileOutputFormat.setOutputPath(conf, new Path(args[1]));
18  
19         conf.setOutputKeyClass(Text.class);
20         conf.setOutputValueClass(LongWritable.class);
21         conf.setMapperClass(TokenCountMapper.class);
22         conf.setCombinerClass(LongSumReducer.class);
23         conf.setReducerClass(LongSumReducer.class);
24  
25         client.setConf(conf);
26         try {
27             JobClient.runJob(conf);
28         } catch (Exception e) {
29             e.printStackTrace();
30         }
31     }
32 }
 

 
三、讀和寫
 
[InputFormat]
 
     hadoop分割與讀取輸入文件的方式被定義在InputFormat接口的一個實現中。TextInputFormat是InputFormat的默認實現,當你想要一次獲取一行內容而輸入數據又沒有肯定的鍵值時,這種數據格式一般會很是有用。
 
經常使用的InputFormat類,以下表:
 
InputFormat
描述
TextInputFormat
在文本文件中每一行均爲一個記錄。鍵(key)爲一行的字節偏移,而值(value)爲一行的內容
key: LongWritable
value: Text
KeyValueTextInputFormat
在文本文件中的每一行均爲一個記錄。以每行的第一個分隔符爲界,分隔符以前的是鍵(key),以後的是值(value)。分離器在屬性key.value.separator.in.input.line中設定,默認爲製表符(\t)。
key: Text
Value: Text
SequenceFileInputFormat<k,v>
用於讀取序列文件的InputFormat。鍵和值由用戶定義。序列文件爲hadoop專用的壓縮二進制文件格式。它專用於一個MapReduce做業和其餘MapReduce做業之間傳送數據。
key: K(用戶定義)
value: V(用戶定義)
NLineInputFormat
與TextInputFormat相同,但每一個分片必定有N行。N在屬性mapred.line.input.format.linespermap中設定,默認爲1.
key: LongWritable
value: Text
 
能夠設置JobConf對象使用KeyValueTextInputFormat類讀取這個文件:
     conf.setInputFormat(KeyValueTextInputFormat.class);
 
回想一下,咱們以前在mapper中曾使用LongWritable和Text分別做爲鍵(key)和值(value)的類型。在TextInputFormat中,由於值爲用數字表示的偏移量,因此LongWritable是一個合理的鍵類型。而當使用KeyvalueTextInputFormat時,不管是鍵和值都爲Text類型,你必須改變mapper的實現以及map()方法來適應這個新的鍵(key)類型。
 
生成一個定製的InputFormat:略
 
 
[OutputFormat]
 
當MapReduce輸出數據到文件時,使用的是OutputForamt類,它與inputForamt類類似。由於每一個reducer僅需將它的輸出寫入本身的文件中,輸出無需分片。輸出文件放在一個公用目錄中,一般命名爲part-nnnnn,這裏nnnnn是reducer的分區ID。RecordWriter對象將輸出結果進行格式化,而RecordReader對輸入格式進行解析。
 
經常使用的OutputFormat類,以下表:
 
OutputFormat
描述
TextOutputFormat<k,v>
將每一個記錄寫爲一行文本。鍵和值以字符串的形式寫入,並以製表符(\t)分隔。這個分隔符能夠在屬性mapred.textoutputformat.separator中修改
SequenceFileOutputFormat<k,v>
以hadoop專有序列文件格式寫入鍵/值對。與SequenceFileInputForamt配合使用
NullOutputFormat<k,v>
無輸出
 
 
  [轉載請註明] http://www.cnblogs.com/zhengrunjian/
相關文章
相關標籤/搜索