輸入數據概要html
輸入數據一般駐留在較大的文件中,一般幾十或者數百GB,甚至更大。MapReduce處理的基本原則之一是將輸入數據分割成塊。這些塊能夠在多臺計算機上並行處理,在Hadoop的術語中這些塊被稱爲輸入分片(Input Split)。每一個分片應該足夠小以實現更細粒度的並行。(若是全部的輸入數據都在一個分片中,那就沒有並行了。) 另外一方面,每一個分片也不能過小,不然啓動與中止各個分片處理所需的開銷將佔去很大一部分執行時間。java
因此說:數據庫
1、單個文件要足夠的大,這樣才能被分片,纔會有並行。apache
2、分片大小不能過小,不然分片太多影響效率。api
並行處理切分輸入數據的原則app
輸入數據一般爲單一的大文件,揭示了其背後Hadoop通用文件系統 (尤爲是HDFS) 的一些設計策略。ide
例如,Hadoop的文件系統提供了FSDataInputStream類用於讀取文件,而未採用Java中的java.io.DataInputStream FSDataInputStream擴展了DataInputStream以支持隨機讀,MapReduce須要這樣特性,由於一臺機器可能被指派從輸入文件的中間開始處理一個分片。oop
若是沒有隨機訪問,而須要從頭開始一直讀取到分片的位置,效率就會很是低。你還能夠看到HDFS爲了存儲MapReduce並行切分和處理的數據所作的設計。優化
HDFS按塊存儲文件並分佈在多臺機器上。籠統而言,每一個文件塊爲一個分片。因爲不一樣的機器會存儲不一樣的塊,若是每一個分片/塊都由它所駐留的機器進行處理,就自動實現了並行。此外,因爲HDFS在多個節點上覆制數據塊以實現可靠性,MapReducer能夠選擇任意一個包含分片/數據塊副本的節點。this
1、必須支持隨機訪問。
2、每一個分片由它所駐留的機器進行處理,這樣就實現了並行。
3、因爲存在副本的緣由,MapReducer能夠選擇任意包含該分片的節點。
Hadoop默認的讀入模式
MapReduce操做是基於鍵/值對的,Hadoop默認地將輸入文件中的每一行視爲一個記錄,而鍵/值對分別爲該行的字節偏移(key)和內容(value)。
你也許不會把全部的數據都這樣記錄,Hadoop也支持一些其餘的數據格式,並容許自定義格式。
輸入分片與記錄邊界
請注意,輸入分片是一種記錄的邏輯劃分,而HDFS數據塊是對輸入數據的物理分割。當它們一致時,效率會很是高,但在實際應用中從未達到徹底一致。記錄可能會跨過數據塊的邊界。Hadoop確保所有記錄都被處理。處理特定分片的計算節點會從一個數據塊中獲取記錄的一個片斷,該數據塊可能不是該記錄的"主"數據塊,而會存放在遠端。爲獲取一個記錄片斷所需的通訊成本是微不足道的,由於它相對而言不多發生。
InputFormat接口
Hadoop分割與讀取輸入文件的方式被定義在InputFormat接口中
TextInputFormat是InputFormat的默認實現
從TextInputFormat返回的鍵爲每行的字節偏移量,好像是不多用
InputFormat的其餘經常使用實現
InputFormat |
描述 |
TextInputFormat |
在文本文件中的每一行均爲一個記錄。鍵 (key) 爲一行的字節偏移,而值 (value)爲一行的內容 Key: LongWritable Value: Text |
KeyValueTextInputFormat |
在文本文件中的每一行均爲一個記錄,以每行的第一個分隔符爲界,分隔符以前的鍵(key),以後的是值(value)。分離器在屬性key.value.separator.in.input.line中設定,默認爲製表符(\t) Key: Text Value: Text |
SequenceFileInputFormat<K, V> |
用於讀取序列文件的InputFormat。鍵和值由用戶定義。序列文件爲Hadoop專用的壓縮二進制文件格式。它專用於一個MapReduce做業和其餘MapReduce做業之間傳送數據 Key: K (用戶定義) Value: V (用戶定義) |
NLineInputFormat |
與TextInputFormat相同,但每一個分片必定有N行。N在屬性mapred.line.input.format.linespermap中設定,默認爲1 Key:LongWritable Value: Text |
KeyValueTextInputFormat在更結構化的輸入文件中使用,由一個預約義的字符,一般爲製表符(\t),將每行 (記錄) 的鍵與值分開。例如,一個以製表符分割的,由時間戳和URL組成的數據文件也須要是這樣的:
17:16:18 http://hadoop.apache.org/core/docs/r0.19.0/api/index.html
17:16:19 http://hadoop.apache.org/core/docs/r0.19.0/mapred_tutorial.html
...
你能夠設置JobConf對象使用KeyValueTextInputFormat類讀取這個文件:
conf.setInputFormat(KeyValueTextInputFormat.class);
由前面的示例文件可知,mapper讀取的第一個記錄將包含一個鍵 "17:16:18" 和一個值
"http://hadoop.apache.org/core/docs/r0.19.0/api/index.html"。而mapper讀到的第二個記錄將包含鍵"17:16:19"和值http://hadoop.apache.org/core/docs/r0.19.0/mapred_tutorial.html... 如此遞歸。
以前,咱們在mapper中曾使用LongWritable和Text分別做爲鍵(key)和值(value)的類型。在TextInputFormat中,由於值爲用數字表示的偏移量,因此LongWritable是一個合理的鍵類型。而當使用KeyValueTextInputFormat時,不管是鍵和值都爲Text類型,你必須改變Mapper的實現以及map()方法來適應這個新的鍵 (key) 類型
輸入到MapReduce做業的數據未必都是些外部數據,實際上,一個MapReduce做業的輸入經常是其餘一些MapReduce的輸出。
你能夠自定義輸出格式
默認的輸出格式與KeyValueTextInputFormat可以讀取的數據格式保存一致 (即記錄中的每行均爲一個由製表符分割的鍵和值)。不過, Hadoop提供了更加有效的二進制壓縮文件格式,稱爲序列文件。這個序列文件爲Hadoop處理作了優化,當連接多個MapReduce做業時,它是首選格式。讀取序列文件的InputFormat類爲SequenceFileInputFormat
生成一個定製的InputFormat——InputSplit和RecordReader
InputFormat是一個僅包含兩個方法的接口。
public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader<K, V> getRecordReader(InputSplit split,
JobConf job,
Reporter reporter) throws IOException;
}
這兩個方法總結了InputStream需執行的兩個功能:
1、肯定全部用於輸入數據的文件,並將之分割爲輸入分片。每一個map任務分配一個分片
2、提供一個對象 (RecordReader),循環提取給定分片中的記錄,並解析每一個記錄爲預約義類型的鍵與值
那麼,誰去考慮如何將文件劃分爲分片呢?
1、FileInputFormat類主要負責分片
2、InputFormat類都是FileInputFormat類的子類
3、FileInputFormat實現了getSplits()方法,不過保留了getRecordReader()抽象讓子類實現
4、FileInputFormat中實現的getSplits()把輸入數據粗略地劃分爲一組分片,分片數目在numSplits中限定,且每一個分片得大小必須大於mapred.min.split.size個字節,但小於文件系統的塊
5、在實際狀況中,一個分片最終老是以一個塊未大小,在HDFS中默認爲64MB
FileInputFormat說明
1、FileInputFormat有必定數量的protected方法,子類能夠經過覆蓋改變其行爲
2、其中一個就是isSplitable(FileSystem fs, Path fileName)方法,它檢查你是否能夠將給定文件分片,默認實現老是返回true,所以全部大於一個分塊的文件都要分片。
3、有時你可能不想該文件分片,這時你就能夠覆蓋isSplittable()來返回false,例如一些文件壓縮方案並不支持分割,一些數據處理操做,如文件轉換,須要把每一個文件視爲一個原子記錄,也不能將之分片
RecordReader接口說明
這個接口負責把一個輸入分片解析爲記錄,再把每一個記錄解析爲一個鍵/值對
public interface RecordReader<K, V> {
boolean next(K key, V value) throws IOException;
K createKey();
V createValue();
long getPos() throws IOException;
public void close() throws IOException;
float getProgress() throws IOException;
}
咱們不用本身寫RecordReader,仍是利用Hadoop所提供的類。
1、LineRecordReader實現RecordReader<LongWritable, Text>。它在TextInputFormat中被用於每次讀取一行,以字節偏移做爲鍵,以行的內容做爲值。
2、而KeyValueLineRecordReader則被用在KeyValueTextInputFormat中
3、在大多數狀況下,自定義RecordReader是基於現有實現的封裝,並把大多數操做放在next()方法中。
自定義的TimeUrlTextInputFormat相似於KeyValueTextInputFormat,之前的鍵/值是Text/Text,如今是Text/URLWritable,URLWritable是自定義的
public class TimeUrlTextInputFormat extends FileInputFormat<Text, URLWritable> {
@Override
public RecordReader<Text, URLWritable> getRecordReader(
InputSplit input, JobConf job, Reporter reporter) throws IOException {
return new TimeUrlLineRecordReader(job, (FileSplit) input);
}
}
// 咱們的URLWritable類很是簡單:
public class URLWritable implements Writable {
protected URL url;
public URLWritable() {}
public URLWritable(URL url) {
this.url = url;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(url.toString());
}
public void readFields(DataInput in) throw IOException {
url = new URL(in.readUTF());
}
public void set(String s) throws MalformedURLException {
url = new URL(s);
}
}
TimeUrlLineRecordReader實現
class TimeUrlLineRecordReader implements RecordReader<Text, URLWritable> {
private KeyValueLineRecordReader lineReader;
private Text lineKey, lineValue;
public TimeUrlLineRecordReader(JonConf job, FileSplit split) throws IOException {
lineReader = new KeyValueLineRecordReader(job, split);
lineKey = lineReader.createKey();
lineValue = lineReader.createValue();
}
public boolean next(Text key, URLWritable value) throws IOException {
if (!lineReader.next(lineKey, lineValue)) {
return false;
}
key.set(lineKey);
value.set(lineValue.toString());
return true;
}
public Text createKey() {
return new Text("");
}
public URLWritable createValue() {
return new URLWritable();
}
public long getPos() throws IOException {
return lineReader.getPos();
}
public float getProgress() throws IOException {
return lineReader.getProgress();
}
public void close() throws IOException {
lineReader.close();
}
}
TImeUrlLineRecordReader類生成一個KeyValueRecordReader對象,並直接把getPos()、getProgress()以及close()方法調用傳遞給它。而next()方法將lineValueText對象轉換爲URLWritable類型
OurputFormat類是什麼?
當MapReduce輸出數據到文件時,使用的時OutputFormat類,它與InputFormat類類似,由於每一個reducer僅需將它的輸出寫入本身的文件中,輸出無需分片。輸出文件放在一個公用目錄中,一般命名爲part-nnnnn,這裏nnnnn是reducer的分區ID。RecordWriter對象將輸出結果進行格式化,而RecordReader對輸入格式進行解析。
幾乎咱們處理的全部OutputFormat類都是從FileOutputFormat抽象類繼承來的
你能夠經過調用JobConf對象中的setOutputFormat()定製OutputFormat
注意:你可能會奇怪,爲何要將OutputFormat (InputFormat) 和 FileOutputFormat (FileInputFormat) 區分開,彷佛全部OutputFormat (InputFormat) 類都擴展了FileOutputFormat (FielInputFormat) 是否有不處理文件的OutFormat (InputFormat)類?沒錯,NullOutputFormat簡單地實現了OutputFomat,並不須要繼承FileOutputFormat。更重要的是,OutputFormat (InputFormat) 類處理的是數據庫,並不是文件,並且在類的層次關係中OutputFormat (InputFormat) 類是區別於FileoutputFormat (FileInputFormat) 的一個獨立分支。這些類有專門的應用,有興趣的讀者能夠在網上搜尋在線Java文檔進一步瞭解DBInputFormat和DBOutputFormat
主要的OutputFormat類,默認爲TextOutputFormat
OutputFormat |
描述 |
TextOutputFormat<K, V> |
將每一個記錄寫爲一行文本。鍵和值以字符串的形式寫入,並以製表符(\t)分隔。這個分隔符能夠在屬性mapred.textoutputformat.separator中修改 |
SequenceFileOutputFormat<K, V> |
以Hadoop專有序列文件格式寫入鍵/值對。與SequenceFileInputFormat配合使用 |
NullOutputFormat<K, V> |
無輸出 |
默認的OutputFormat是TextOutputFormat,將每一個記錄寫爲一行文本。每一個記錄的鍵和值經過toString()被轉換爲字符串(string),並以製表符 (\t) 分隔。分隔符能夠在mapred.textoutputformat.separator屬性中修改。
1、TextOutputFormat採用可被KeyValueInputFormat識別的格式輸出數據。
2、若是把鍵的類型設爲NullWritable,它也能夠採用可被TextInputFormat識別的輸出格式,在這種狀況下,在鍵/值對中沒有鍵,也沒有分隔符。
3、若是想徹底禁止輸出,應該使用NullOutputFormat
4、若是讓reducer採用本身的方式輸出,而且不須要Hadoop寫任何附加的文件,能夠限制Hadoop的輸出。
5、SequenceFileOutputFormat以序列文件格式輸出數據,使其能夠經過SequenceFileInputFormat來讀取。它有助於經過中間數據結果將MapReduce做業串接起來
MapReduce程序的核心是Map和Reduce操做,還有其餘的操做?
1、data spliting(數據分割)
2、shuffling(洗牌)
3、Partitining(分組)
4、Combining(合併)
5、Hadoop還支持採用不一樣的格式讀入和輸出數據