前言:在具體執行Hadoop程序的時候,咱們要根據不一樣的狀況來設置Map的個數。除了設置固定的每一個節點上可運行的最大map個數外,咱們還須要控制真正執行Map操做的任務個數。
1.如何控制實際運行的map任務個數
咱們知道,文件在上傳到Hdfs文件系統的時候,被切分紅不一樣的Block塊(默認大小爲64MB)。可是每一個Map處理的分塊有時候並非系統的物理Block塊大小。實際處理的輸入分塊的大小是根據InputSplit來設定的,那麼InputSplit是怎麼獲得的呢?
java
InputSplit=Math.max(minSize, Math.min(maxSize, blockSize) 其中:minSize=mapred.min.split.size maxSize=mapred.max.split.size
咱們經過改變InputFormat中分片的多少來控制實際使用的Map數量,而控制InputFormat中的分片多少就須要控制每一個InputSplit分片的大小
2.如何控制每一個split分片的大小
Hadoop默認的輸入格式是TextInputFormat,他裏邊定義了文件讀取的方式和分片的方式。咱們打開他的源文件(org.apache.hadoop.mapreduce.lib.input包中):apache
package org.apache.hadoop.mapreduce.lib.input; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.SplittableCompressionCodec; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; public class TextInputFormat extends FileInputFormat<LongWritable, Text> { @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { return new LineRecordReader(); } @Override protected boolean isSplitable(JobContext context, Path file) { CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file); if (null == codec) { return true; } return codec instanceof SplittableCompressionCodec; } }
經過源代碼,咱們發現TextInputFormat繼承了FileInputFormat,而在TextInputFormat中,咱們並無發現具體的進行文件切分的部分,TextInputFormat應該是採用了FileInputFormat默認的InputSplit方法。所以,咱們打開FileInputFormat的源代碼,在其中發現:
ide
public static void setMinInputSplitSize(Job job,long size) { job.getConfiguration().setLong("mapred.min.split.size", size); } public static long getMinSplitSize(JobContext job) { return job.getConfiguration().getLong("mapred.min.split.size", 1L); } public static void setMaxInputSplitSize(Job job,long size) { job.getConfiguration().setLong("mapred.max.split.size", size); } public static long getMaxSplitSize(JobContext context) { return context.getConfiguration().getLong("mapred.max.split.size",Long.MAX_VALUE); }
如上咱們能夠看到,Hadoop在這裏實現了對mapred.min.split.size和mapred.max.split.size的定義,且默認值分別爲1和Long的最大。所以,咱們在程序只需從新賦值給這兩個值就能夠控制InputSplit分片的大小了。
3.假如咱們想要設置的分片大小爲10MB
則咱們能夠在MapReduce程序的驅動部分添加以下代碼:
oop
TextInputFormat.setMinInputSplitSize(job,1024L);//設置最小分片大小 TextInputFormat.setMaxInputSplitSize(job,1024×1024×10L);//設置最大分片大小