一種分佈式的計算方式指定一個Map(映#x5C04;)函數,用來把一組鍵值對映射成一組新的鍵值對,指定併發的Reduce(歸約)函數,用來保證全部映射的鍵值對中的每個共享相同的鍵組數據庫
map: (K1, V1) → list(K2, V2) combine: (K2, list(V2)) → list(K2, V2) reduce: (K2, list(V2)) → list(K3, V3)編程
Map輸出格式和Reduce輸入格式必定是相同的設計模式
MapReduce主要是先讀取文件數據,而後進行Map處理,接着Reduce處理,最後把處理結果寫到文件中緩存
記錄閱讀器會翻譯由輸入格式生成的記錄,記錄閱讀器用於將數據解析給記錄,並不分析記錄自身。記錄讀取器的目的是將數據解析成記錄,但不分析記錄自己。它將數據以鍵值對的形式傳輸給mapper。一般鍵是位置信息,值是構成記錄的數據存儲塊.自定義記錄不在本文討論範圍以內.markdown
在映射器中用戶提供的代碼稱爲中間對。對於鍵值的具體定義是慎重的,由於定義對於分佈式任務的完成具備重要意義.鍵決定了數據分類的依據,而值決定了處理器中的分析信息.本書的設計模式將會展現大量細節來解釋特定鍵值如何選擇.網絡
ruduce任務以隨機和排序步驟開始。此步驟寫入輸出文件並下載到本地計算機。這些數據採用鍵進行排序以把等價密鑰組合到一塊兒。併發
reducer採用分組數據做爲輸入。該功能傳遞鍵和此鍵相關值的迭代器。能夠採用多種方式來彙總、過濾或者合併數據。當ruduce功能完成,就會發送0個或多個鍵值對。app
輸出格式會轉換最終的鍵值對並寫入文件。默認狀況下鍵和值以tab分割,各記錄以換行符分割。所以能夠自定義更多輸出格式,最終數據會寫入HDFS。相似記錄讀取,自定義輸出格式不在本書範圍。分佈式
經過InputFormat決定讀取的數據的類型,而後拆分紅一個個InputSplit,每一個InputSplit對應一個Map處理,RecordReader讀取InputSplit的內容給Mapide
決定讀取數據的格式,能夠是文件或數據庫等
List getSplits(): 獲取由輸入文件計算出輸入分片(InputSplit),解決數據或文件分割成片問題
RecordReader <k,v>createRecordReader():</k,v> 建立#x5EFA;RecordReader,從InputSplit中讀取數據,解決讀取分片中數據問題
TextInputFormat: 輸入文件中的每一行就是一個記錄,Key是這一行的byte offset,而value是這一行的內容
KeyValueTextInputFormat: 輸入文件中每一行就是一個記錄,第一個分隔符字符切分每行。在分隔符字符以前的內容爲Key,在以後的爲Value。分隔符變量經過key.value.separator.in.input.line變量設置,默認爲(\t)字符。
NLineInputFormat: 與TextInputFormat同樣,但每一個數據塊必須保證有且只有N行,mapred.line.input.format.linespermap屬性,默認爲1
SequenceFileInputFormat: 一個用來讀取字符流數據的InputFormat,<key,value>爲用戶自定義的。字符流數據是Hadoop自定義的壓縮的二進制數據格式。它用來優化從一個MapReduce任務的輸出到另外一個MapReduce任務的輸入之間的數據傳輸過程。</key,value>
表明一個個邏輯分片,並無真正存儲數據,只是提供了一個如何將數據分片的方法
Split內有Location信息,利於數據局部化
一個InputSplit給一個單獨的Map處理
public abstract class InputSplit { /** * 獲取Split的大小,支持根據size對InputSplit排序. */ public abstract long getLength() throws IOException, InterruptedException; /** * 獲取存儲該分片的數據所在的節點位置. */ public abstract String[] getLocations() throws IOException, InterruptedException; }
將InputSplit拆分紅一個個<key,value>對給Map處理,也是實際的文件讀取分隔對象</key,value>
CombineFileInputFormat能夠將若干個Split打包成一個,目的是避免過多的Map任務(由於Split的數目決定了Map的數目,大量的Mapper Task建立銷燬開銷將是巨大的)
一般一個split就是一個block(FileInputFormat僅僅拆分比block大的文件),這樣作的好處是使得Map能夠在存儲有當前數據的節點上運行本地的任務,而不須要經過網絡進行跨節點的任務調度
經過mapred.min.split.size, mapred.max.split.size, block.size來控制拆分的大小
若是mapred.min.split.size大於block size,則會將兩個block合成到一個split,這樣有部分block數據須要經過網絡讀取
若是mapred.max.split.size小於block size,則會將一個block拆成多個split,增長了Map任務數(Map對split進行計算ק#x5E76;且上報結果,關閉當前計算打開新的split均須要耗費資源)
先獲取文件在HDFS上的路徑和Block信息,而後根據splitSize對文件進行切分( splitSize = computeSplitSize(blockSize, minSize, maxSize) ),默認splitSize 就等於blockSize的默認值(64m)
public List<InputSplit> getSplits(JobContext job) throws IOException { // 首先計算分片的最大和最小值。這兩個值將會用來計算分片的大小 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); // generate splits List<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus> files = listStatus(job); for (FileStatus file: files) { Path path = file.getPath(); long length = file.getLen(); if (length != 0) { FileSystem fs = path.getFileSystem(job.getConfiguration()); // 獲取該文件全部的block信息列表[hostname, offset, length] BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); // 判斷文件是否可分割,一般是可分割的,但若是文件是壓縮的,將不可分割 if (isSplitable(job, path)) { long blockSize = file.getBlockSize(); // 計算分片大小 // 即 Math.max(minSize, Math.min(maxSize, blockSize)); long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length; // 循環分片。 // 當剩餘數據與分片大小比值大於Split_Slop時,繼續分片, 小於等於時,中止分片 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } // 處理餘下的數據 if (bytesRemaining != 0) { splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts())); } } else { // 不可split,整塊返回 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts())); } } else { // 對於長度爲0的文件,建立空Hosts列表,返回 splits.add(makeSplit(path, 0, length, new String[0])); } } // 設置輸入文件數量 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); LOG.debug("Total # of splits: " + splits.size()); return splits; }
split是根據文件大小分割的,而通常處理是根據分隔符進行分割的,這樣勢必存在一條記錄橫跨兩個split
解決辦法是隻要不是第一個split,都會遠程讀取一條記錄。不是第一個split的都忽略到第一條記錄
public class LineRecordReader extends RecordReader<LongWritable, Text> { private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private LineReader in; private int maxLineLength; private LongWritable key = null; private Text value = null; // initialize函數即對LineRecordReader的一個初始化 // 主要是計算分片的始末位置,打開輸入流以供讀取K-V對,處理分片通過壓縮的狀況等 public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); // 打開文件,並定位到分片讀取的起始位置 FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); boolean skipFirstLine = false; if (codec != null) { // 文件是壓縮文件的話,直接打開文件 in = new LineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { // 只要不是第一個split,則忽略本split的第一行數據 if (start != 0) { skipFirstLine = true; --start; // 定位到偏移位置,下&#x#x6B21;讀取就會從偏移位置開始 fileIn.seek(start); } in = new LineReader(fileIn, job); } if (skipFirstLine) { // 忽略第一行數據,從新定位start start += in.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - start)); } this.pos = start; } public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); } key.set(pos);// key即爲偏移量 if (value == null) { value = new Text(); } int newSize = 0; while (pos < end) { newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); // 讀取的數據長度爲0,則說明已讀完 if (newSize == 0) { break; } pos += newSize; // 讀取的數據長度小於最大行長度,也說明已讀取完畢 if (newSize < maxLineLength) { break; } // 執行到此處,說明該行數據沒讀完,繼續讀入 } if (newSize == 0) { key = null; value = null; return false; } else { return true; } } }
主要是讀取InputSplit的每個Key,Value對並進行處理
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { /** * 預處理,僅在map task啓動時運行一次 */ protected void setup(Context context) throws IOException, InterruptedException { } /** * 對於InputSplit中的每一對<key, value>都會運行一次 */ @SuppressWarnings("unchecked") protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } /** * 掃尾工做,好比關閉流等 */ protected void cleanup(Context context) throws IOException, InterruptedException { } /** * map task的驅動器 */ public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } cleanup(context); } } public class MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { private RecordReader<KEYIN, VALUEIN> reader; private InputSplit split; /** * Get the input split for this map. */ public InputSplit getInputSplit() { return split; } @Override public KEYIN getCurrentKey() throws IOException, InterruptedException { return reader.getCurrentKey(); } @Override public VALUEIN getCurrentValue() throws IOException, InterruptedException { return reader.getCurrentValue(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { return reader.nextKeyValue(); } }
對Map的結果進行排序並傳輸到Reduce進行處理 Map的結果並不#x662F;直接存放到硬盤,而是利用緩存作一些預排序處理 Map會調用Combiner,壓縮,按key進行分區、排序等,儘可能減小結果的大小 每一個Map完成後都會通知Task,而後Reduce就能夠進行處理
當Map程序開始產生結果的時候,並非直接寫到文件的,而是利用緩存作一些排序方面的預處理操做
每一個Map任務都有一個循環內存緩衝區(默認100MB),當緩存的內容達到80%時,後臺線程開始將內容寫到文件,此時Map任務能夠&#x#x7EE7;續輸出結果,但若是緩衝區滿了,Map任務則須要等待
寫文件使用round-robin方式。在寫入文件以前,先將數據按照Reduce進行分區。對於每個分區,都會在內存中根據key進行排序,若是配置了Combiner,則排序後執行Combiner(Combine以後能夠減小寫入文件和傳輸的數據)
每次結果達到緩衝區的閥值時,都會建立一個文件,在Map結束時,可能會產生大量的文件。在Map完成前,會將這些文件進行合併和排序。若是文件的數量超過3個,則&##x5408;並後會再次運行Combiner(一、2個文件就沒有必要了)
若是配置了壓縮,則最終寫入的文件會先進行壓縮,這樣能夠減小寫入和傳輸的數據
一旦Map完成,則通知任務管理器,此時Reduce就能夠開始複製結果數據
Map的結果文件都存放到運行Map任務的機器的本地硬盤中
若是Map的結果不多,則直接放到內存,不然寫入文件中
同時後臺線程將這些文件進行合併和排序到一個更大的文件中(若是文件是壓縮的ÿ#xFF0C;則須要先解壓)
當全部的Map結果都被複制和合並後,就會調用Reduce方法
Reduce結果會寫入到HDFS中
通常的原則是給shuffle分配儘量多的內存,但前提是要保證Map、Reduce任務有足夠的內存
對於Map,主要就是避免把文件寫入磁盤,例如使用Combiner,增大io.sort.mb的值
對於Reduce,主要是把Map的結果儘量地保存到內存中,一樣也是要避免把中間結果寫入磁盤。默認狀況下,全部的內存都是分配給Reduce方法的,若是Reduce方法不怎&##x4E48;消耗內存,能夠mapred.inmem.merge.threshold設成0,mapred.job.reduce.input.buffer.percent設成1.0
在任務監控中可經過Spilled records counter來監控寫入磁盤的數,但這個值是包括map和reduce的
對於IO方面,能夠Map的結果可使用壓縮,同時增大buffer size(io.file.buffer.size,默認4kb)
屬性 | 默認值 | 描述 |
---|---|---|
io.sort.mb | 100 | 映射輸出分類時所使用緩衝區的大小. |
io.sort.record.percent | 0.05 | 剩餘空間用於映射輸出自身記錄.在1.X發佈後去除此屬性.隨機代碼用於使用映射全部內存並記錄信息. |
io.sort.spill.percent | 0.80 | 針對映射輸出內存緩衝和記錄索引的閾值使用比例. |
io.sort.factor | 10 | 文件分類時合併流的最大數量。此屬性也用於reduce。一般把數字設爲100. |
min.num.spills.for.combine | 3 | 組合運行所需最小溢出文件數目. |
mapred.compress.map.output | false | 壓縮映射輸出. |
mapred.map.output.compression.codec | DefaultCodec | 映射輸出所需的壓縮解編碼器. |
mapred.reduce.parallel.copies | 5 | 用於向reducer傳送映射輸出的線程數目. |
mapred.reduce.copy.backoff | 300 | 時間的最大數量,以秒爲單位,這段時間內若reducer失敗則會反覆嘗試傳輸 |
io.sort.factor | 10 | 組合運行所需最大溢出文件數目. |
mapred.job.shuffle.input.buffer.percent | 0.70 | 隨機複製階段映射輸出緩衝器的堆棧大小比例 |
mapred.job.shuffle.merge.percent | 0.66 | 用於啓動合併輸出進程和磁盤傳輸的映射輸出緩衝器的閥值使用比例 |
mapred.inmem.merge.threshold | 1000 | 用於啓動合併輸出和磁盤傳輸進程的映射輸出的閥值數目。小於等於0意味着沒有門檻,而溢出行爲由 mapred.job.shuffle.merge.percent單獨管理. |
mapred.job.reduce.input.buffer.percent | 0.0 | 用於減小內存映射輸出的堆棧大小比例,內存中映射大小不得超出此值。若reducer須要較少內存則能夠提升該值. |
export LIBJARS=$MYLIB/commons-lang-2.3.jar, <other_jars_used_by_remote_components>hadoop jar prohadoop-0.0.1-SNAPSHOT.jar org.aspress.prohadoop.c3. WordCountUsingToolRunner -libjars $LIBJARS<input_path><output_path>
hadoop jar prohadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.aspress.prohadoop.c3. WordCountUsingToolRunner <input_path>The dependent libraries are now included inside the application JAR file
通常仍是上面的好,指定依賴能夠利用Public Cache,若是是包含依賴,則每次都須要拷貝