Hadoop實戰讀書筆記(6)

putmerge程序的大致流程是?java

1、根據用戶定義的參數設置本地目錄和HDFS的目錄文件程序員

2、提取本地輸入目錄中每一個文件的信息數據庫

3、建立一個輸出流寫入到HDF文件網絡

4、遍歷本地目錄中的每一個文件,打開一個輸入流來讀取該文件,剩下就是一個標準的Java文件複製過程了app

具體程序以下:框架

public static void main(String[] args) throws IOException {ide

       Configuration conf = new Configuration();函數

       FileSystem hdfs = FileSystem.get(conf);oop

       FileSystem local = FieSystem.getLocal(conf);spa

       // 設定輸入目錄與輸出文件

       Path inputDir = new Path(args[0]);

       Path hdfsFile = new Path(args[1]);

 

       try {

              // 獲得本地文件列表

              FileStatus[] inputFiles = local.listStatus(inputDir);

              // 生成HDFS輸出流

              FSDataOutputStream out = hdfs.create(hdfsFile);

 

              for (int i = 0; i < inputFiles.length; i++) {

                     System.out.println(inputFiles[i].getPath().getName());

                     // 打開本地輸入流

                     FSDataInputStream in = local.open(inputFiles[i].getPath());

                     byte buffer[] = new byte[256];

                     int bytesRead = 0;

                     while ( (bytesRead = in.read(buffer)) > 0) {

                            out.write(buffer, 0, bytesRead);

                     }

                     in.close();

              }

              out.close();

       } catch (IOException e) {

              e.printStackTrace();

       }

}

 

那麼如今有數據了,還要對它進行處理、分析以及作其餘的操做。

MapReduce程序經過操做鍵/值對來處理數據,通常形式爲

map: (K1, V1) -> list(K2, V2)

reduce:(K2, list(V2)) -> list(K3, V3)

 

Hadoop數據類型有哪些?

MapReduce框架並不容許它們是任意的類。

雖然咱們常常把某些鍵與值稱爲整數、字符串等,但它們實際上並非IntegerString等哪些標準的Java類。爲了讓鍵/值對能夠在集羣上移動,MapReduce框架提供了一種序列化鍵/值對的方法。所以,只有那些支持這種序列化的類可以在這個框架中充當鍵或者值。

 

更具體的Hadoop類型說明

實現Writable接口的類能夠是值

而實現WritableComparable<T>接口的類既能夠是鍵也能夠是值

注意WritableComparable<T>接口是Writablejava.lang.Comparable<T>接口的組合,對於鍵而言,咱們須要這個比較,由於它們將在Reduce階段進行排序,而值僅會被簡單地傳遞。

 

/值對常用的數據類型列表,這些類均實現WritableComparable接口

描述

BooleanWritable

標準布爾變量的封裝

ByteWritable

單字節數的封裝

DoubleWritable

雙字節數的封裝

FloatWritable

浮點數的封裝

IntWritable

整數的封裝

LongWritable

Long的封裝

Text

使用UTF-8格式的文本封裝

NullWritable

無鍵值時的站位符

 

如何自定義數據類型?

只要它實現了Writable(WritableComparable<T>)接口。

 

定義一個Edge類型用於表示一個網絡的邊界

public class Edge implements WritableComparable<Efge> {

       private String departureNode;

       private String arrivalNode;

 

       public String getDepartureNode() {

              return departureNode;

       }

       // 說明如何讀入數據

       @Override

       public void readFields(DataInput in) throws IOException {

              departureNode = in.readUTF();

              arrivalNode = in.readUTF();

       }

       // 說明如何寫出數據

       @Override

       public void write(DataOutput out) throws IOException {

              out.writeUTF(departureNode);

              out.writeUTF(arrivalNode);

       }

       // 定義數據排序

       @Override

       public int compareTo(Edge o) {

              return (departureNode.compareTo(o.departureNode) != 0)

                     ? departureNode.compareTo(o.departureNode)

                     : arrivalNode.compareTo(o.arrivalNode);

       }

}

 

Mapper類是什麼?

一個類要做爲mapper,需繼承MapReduceBase基類並實現Mapper接口。

mapperreducer的基類均爲MapReduceBase

其中包含一些函數或方法:

1void configure(JobConf job),該函數提取XML配置文件或者應用程序主類中的參數,在數據處理以前調用該函數。

2void close(),做爲map任務結束前的最後一個操做,該函數完成全部的結尾工做,如關閉數據庫鏈接、打開文件等。

Mapper接口負責數據處理階段,它採用Mapper<K1, V1, K2, V2>Java泛型,這裏鍵類和值類分別實現WritableComparableWritable接口。

Mapper類只有是一個方法-map,用於處理一個單獨的鍵/值對。

void map (K1 key,

              V1 value,

              OutputCollector<K2, V2> output,

              Reporter reporter

              ) throws IOException

 

上面這個map函數的參數都是什麼意思?

該函數處理一個給定的鍵/值對 (K1, V1),生成一個鍵/值對 (K2, V2) 的列表 (該列表頁可能爲空)

OutputCollector接收這個映射過程的輸出

Reporter可提供對mapper相關附加信息的記錄

 

Hadoop提供了一些有用的mapper實現,這些實現是?

描述

IdentityMapper<K, V>

實現Mapper<K,   V, K, V>, 將輸入直接映射到輸出

InverseMapper<K, V>

實現Mapper<K,   V, V, K> 反轉鍵/值對

RegexMapper<K>

實現Mapper<K,   Text, Text, LongWritable>, 爲每一個常規表達式的匹配項生成一個   match 1  

TokenCountMapper<K>

實現Mapper<K,   Text, Text, LongWritable>, 當輸入的值爲分詞時,   生成一個(token 1  

 

Reducer是什麼?

一個類要做爲reducer,需繼承MapReduceBase基類並實現Reducer接口。

以便於容許配置和清理。

此外,它還必須實現Reducer接口使其具備以下的單一方法:

void reduce (K2 key,

                     Iterator<V2> values,

                     OutputCollector<K3, V3> output,

                     Reporter reporter

                     ) throws IOException

reducer任務接收來自各個mapper的輸出時,它按照鍵/值對中的鍵對輸入數據進行排序,並將相同鍵的值歸併。而後調用reduce()函數,並經過迭代處理哪些與指定鍵相關聯的值,生成一個 (可能爲空的) 列表 K3, V3

OutputCollector接收reduce階段的輸出,並寫入輸出文件

Reporter可提供對reducer相關附加信息的記錄,造成任務進度

 

一些很是有用的由Hadoop預約義的Reducer實現

描述

IdentityReducer<K, V>

實現Reducer<K,   V, K, V>, 將輸入直接映射到輸出

LongSumReducer<K>

實現<K,   LongWritable, K, LongWritable>   計算與給定鍵相對應的全部值的和

 

注:雖然咱們將Hadoop程序稱爲MapReduce應用,可是在mapreduce兩個階段之間還有一個極其重要的步驟:將mapper的結果輸出給不一樣的reducer。這就是partitioner的工做。

 

初次使用MapReduce的程序員一般有一個誤解?

僅須要一個reducer 採用單一的reducer能夠在處理以前對全部的數據進行排序。

No,採用單一的reducer忽略了並行計算的好處。

那麼就應該使用多個reducer是麼?但須要解決一個問題,如何肯定mapper應該把鍵/值對輸出給誰。

默認的做法是對鍵進行散列來肯定reducerHadoop經過HashPartitioner類強制執行這個策略。但有時HashPartitioner會出錯。

 

HashPartitioner會出什麼錯?

假如你使用Edge類來分析航班信息來決定從各個機場離港的乘客數目,這些數據多是:

(San Francisco, Los Angeles) Chuck Lam

(San Francisco, Dallas) James Warren

若是你使用HashPartitioner,這兩行能夠被送到不一樣的reducer 離港的乘客數目被處理兩次而且兩次都是錯誤的

 

如何爲你的應用量身定製partitioner呢?

上面的狀況,我但願具備相同離港地的全部edge被送往相同的reducer,怎麼作呢?只要對Edge類的departureNode成員進行散列就能夠了:

public class EdgePartitioner implements Partitioner<Edge, Writable> {

       @Override

       public int getPartition (Edge key, Writable value, int numPartitions) {

              return key.getDepartureNode().hashCode() % numPartitions;

       }

       @Override

       public void configure(JobConf conf) { }

}

一個定製的partitioner只須要實現configure()getPartition()兩個函數,前者將Hadoop對做業的配置應用在patitioner上,然後者返回一個介於0reduce任務數之間的整數,指向鍵/值對將要發送的reducer

 

Combiner:本地reduce

在許多MapReduce應用場景中,咱們不妨在分發mapper結果以前作一下 "本地Reduce"。再考慮一下WordCount的例子,若是做業處理的文件中單詞 "the" 出現了574次,存儲並洗牌一次 ("the", 574) /值對比許屢次 ("the", 1) 更爲高效。這種處理步驟被稱爲合併。

 

預約義mapperReducer類的單詞計數

public class WordCount {

       public static void main (String[] args) {

              JobClient client = new JobClient();

              JobConf conf = new JobConf(WordCount.class);

 

              FileInputFormat.addInputPath(conf, new Path(args[0]));

              FileOutputFormat.setOutputPath(conf, new Path(args[1]));

      

              conf.setOutputKeyClass(Text.class);

              conf.setOutputValueClass(LongWritable.class);

              conf.setMapperClass(TokenCountMapper.class); // Hadoop本身的TokenCountMapper

              conf.setCombinerClass(LongSumReducer.class);

              conf.setReducerClass(LongSumReduver.class); // Hadoop本身的LongSumReducer

              client.setConf(conf);

              try {

                     JobClient.runJob(conf);

              } catch (Exception e) {

                     e.printStackTrace();

              }

       }

}

使用Hadoop預約義的類TokenCountMapperLongSumReducer,編寫MapReduce分廠的容易,Hadoop也支持生成更復雜的程序,這裏只是強調Hadoop容許你經過最小的代碼量快速生成實用的程序。

相關文章
相關標籤/搜索