putmerge程序的大致流程是?java
1、根據用戶定義的參數設置本地目錄和HDFS的目錄文件程序員
2、提取本地輸入目錄中每一個文件的信息數據庫
3、建立一個輸出流寫入到HDF文件網絡
4、遍歷本地目錄中的每一個文件,打開一個輸入流來讀取該文件,剩下就是一個標準的Java文件複製過程了app
具體程序以下:框架
public static void main(String[] args) throws IOException {ide
Configuration conf = new Configuration();函數
FileSystem hdfs = FileSystem.get(conf);oop
FileSystem local = FieSystem.getLocal(conf);spa
// 設定輸入目錄與輸出文件
Path inputDir = new Path(args[0]);
Path hdfsFile = new Path(args[1]);
try {
// 獲得本地文件列表
FileStatus[] inputFiles = local.listStatus(inputDir);
// 生成HDFS輸出流
FSDataOutputStream out = hdfs.create(hdfsFile);
for (int i = 0; i < inputFiles.length; i++) {
System.out.println(inputFiles[i].getPath().getName());
// 打開本地輸入流
FSDataInputStream in = local.open(inputFiles[i].getPath());
byte buffer[] = new byte[256];
int bytesRead = 0;
while ( (bytesRead = in.read(buffer)) > 0) {
out.write(buffer, 0, bytesRead);
}
in.close();
}
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
那麼如今有數據了,還要對它進行處理、分析以及作其餘的操做。
MapReduce程序經過操做鍵/值對來處理數據,通常形式爲
map: (K1, V1) -> list(K2, V2)
reduce:(K2, list(V2)) -> list(K3, V3)
Hadoop數據類型有哪些?
MapReduce框架並不容許它們是任意的類。
雖然咱們常常把某些鍵與值稱爲整數、字符串等,但它們實際上並非Integer、String等哪些標準的Java類。爲了讓鍵/值對能夠在集羣上移動,MapReduce框架提供了一種序列化鍵/值對的方法。所以,只有那些支持這種序列化的類可以在這個框架中充當鍵或者值。
更具體的Hadoop類型說明
實現Writable接口的類能夠是值
而實現WritableComparable<T>接口的類既能夠是鍵也能夠是值
注意WritableComparable<T>接口是Writable和java.lang.Comparable<T>接口的組合,對於鍵而言,咱們須要這個比較,由於它們將在Reduce階段進行排序,而值僅會被簡單地傳遞。
鍵/值對常用的數據類型列表,這些類均實現WritableComparable接口
類 |
描述 |
BooleanWritable |
標準布爾變量的封裝 |
ByteWritable |
單字節數的封裝 |
DoubleWritable |
雙字節數的封裝 |
FloatWritable |
浮點數的封裝 |
IntWritable |
整數的封裝 |
LongWritable |
Long的封裝 |
Text |
使用UTF-8格式的文本封裝 |
NullWritable |
無鍵值時的站位符 |
如何自定義數據類型?
只要它實現了Writable(或WritableComparable<T>)接口。
定義一個Edge類型用於表示一個網絡的邊界
public class Edge implements WritableComparable<Efge> {
private String departureNode;
private String arrivalNode;
public String getDepartureNode() {
return departureNode;
}
// 說明如何讀入數據
@Override
public void readFields(DataInput in) throws IOException {
departureNode = in.readUTF();
arrivalNode = in.readUTF();
}
// 說明如何寫出數據
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(departureNode);
out.writeUTF(arrivalNode);
}
// 定義數據排序
@Override
public int compareTo(Edge o) {
return (departureNode.compareTo(o.departureNode) != 0)
? departureNode.compareTo(o.departureNode)
: arrivalNode.compareTo(o.arrivalNode);
}
}
Mapper類是什麼?
一個類要做爲mapper,需繼承MapReduceBase基類並實現Mapper接口。
mapper和reducer的基類均爲MapReduceBase類
其中包含一些函數或方法:
1、void configure(JobConf job),該函數提取XML配置文件或者應用程序主類中的參數,在數據處理以前調用該函數。
2、void close(),做爲map任務結束前的最後一個操做,該函數完成全部的結尾工做,如關閉數據庫鏈接、打開文件等。
Mapper接口負責數據處理階段,它採用Mapper<K1, V1, K2, V2>Java泛型,這裏鍵類和值類分別實現WritableComparable和Writable接口。
Mapper類只有是一個方法-map,用於處理一個單獨的鍵/值對。
void map (K1 key,
V1 value,
OutputCollector<K2, V2> output,
Reporter reporter
) throws IOException
上面這個map函數的參數都是什麼意思?
該函數處理一個給定的鍵/值對 (K1, V1),生成一個鍵/值對 (K2, V2) 的列表 (該列表頁可能爲空)
OutputCollector接收這個映射過程的輸出
Reporter可提供對mapper相關附加信息的記錄
Hadoop提供了一些有用的mapper實現,這些實現是?
類 |
描述 |
IdentityMapper<K, V> |
實現Mapper<K, V, K, V>, 將輸入直接映射到輸出 |
InverseMapper<K, V> |
實現Mapper<K, V, V, K>, 反轉鍵/值對 |
RegexMapper<K> |
實現Mapper<K, Text, Text, LongWritable>, 爲每一個常規表達式的匹配項生成一個 (match, 1) 對 |
TokenCountMapper<K> |
實現Mapper<K, Text, Text, LongWritable>, 當輸入的值爲分詞時, 生成一個(token, 1) 對 |
Reducer是什麼?
一個類要做爲reducer,需繼承MapReduceBase基類並實現Reducer接口。
以便於容許配置和清理。
此外,它還必須實現Reducer接口使其具備以下的單一方法:
void reduce (K2 key,
Iterator<V2> values,
OutputCollector<K3, V3> output,
Reporter reporter
) throws IOException
當reducer任務接收來自各個mapper的輸出時,它按照鍵/值對中的鍵對輸入數據進行排序,並將相同鍵的值歸併。而後調用reduce()函數,並經過迭代處理哪些與指定鍵相關聯的值,生成一個 (可能爲空的) 列表 (K3, V3)
OutputCollector接收reduce階段的輸出,並寫入輸出文件
Reporter可提供對reducer相關附加信息的記錄,造成任務進度
一些很是有用的由Hadoop預約義的Reducer實現
類 |
描述 |
IdentityReducer<K, V> |
實現Reducer<K, V, K, V>, 將輸入直接映射到輸出 |
LongSumReducer<K> |
實現<K, LongWritable, K, LongWritable>, 計算與給定鍵相對應的全部值的和 |
注:雖然咱們將Hadoop程序稱爲MapReduce應用,可是在map和reduce兩個階段之間還有一個極其重要的步驟:將mapper的結果輸出給不一樣的reducer。這就是partitioner的工做。
初次使用MapReduce的程序員一般有一個誤解?
僅須要一個reducer? 採用單一的reducer能夠在處理以前對全部的數據進行排序。
No,採用單一的reducer忽略了並行計算的好處。
那麼就應該使用多個reducer是麼?但須要解決一個問題,如何肯定mapper應該把鍵/值對輸出給誰。
默認的做法是對鍵進行散列來肯定reducer。Hadoop經過HashPartitioner類強制執行這個策略。但有時HashPartitioner會出錯。
HashPartitioner會出什麼錯?
假如你使用Edge類來分析航班信息來決定從各個機場離港的乘客數目,這些數據多是:
(San Francisco, Los Angeles) Chuck Lam
(San Francisco, Dallas) James Warren
若是你使用HashPartitioner,這兩行能夠被送到不一樣的reducer, 離港的乘客數目被處理兩次而且兩次都是錯誤的
如何爲你的應用量身定製partitioner呢?
上面的狀況,我但願具備相同離港地的全部edge被送往相同的reducer,怎麼作呢?只要對Edge類的departureNode成員進行散列就能夠了:
public class EdgePartitioner implements Partitioner<Edge, Writable> {
@Override
public int getPartition (Edge key, Writable value, int numPartitions) {
return key.getDepartureNode().hashCode() % numPartitions;
}
@Override
public void configure(JobConf conf) { }
}
一個定製的partitioner只須要實現configure()和getPartition()兩個函數,前者將Hadoop對做業的配置應用在patitioner上,然後者返回一個介於0和reduce任務數之間的整數,指向鍵/值對將要發送的reducer
Combiner:本地reduce
在許多MapReduce應用場景中,咱們不妨在分發mapper結果以前作一下 "本地Reduce"。再考慮一下WordCount的例子,若是做業處理的文件中單詞 "the" 出現了574次,存儲並洗牌一次 ("the", 574) 鍵/值對比許屢次 ("the", 1) 更爲高效。這種處理步驟被稱爲合併。
預約義mapper和Reducer類的單詞計數
public class WordCount {
public static void main (String[] args) {
JobClient client = new JobClient();
JobConf conf = new JobConf(WordCount.class);
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(LongWritable.class);
conf.setMapperClass(TokenCountMapper.class); // Hadoop本身的TokenCountMapper
conf.setCombinerClass(LongSumReducer.class);
conf.setReducerClass(LongSumReduver.class); // Hadoop本身的LongSumReducer
client.setConf(conf);
try {
JobClient.runJob(conf);
} catch (Exception e) {
e.printStackTrace();
}
}
}
使用Hadoop預約義的類TokenCountMapper和LongSumReducer,編寫MapReduce分廠的容易,Hadoop也支持生成更復雜的程序,這裏只是強調Hadoop容許你經過最小的代碼量快速生成實用的程序。