一、Map-Reduce的邏輯過程緩存
假設咱們須要處理一批有關天氣的數據,其格式以下:app
按照ASCII碼存儲,每行一條記錄分佈式
每一行字符從0開始計數,第15個到第18個字符爲年ide
第25個到第29個字符爲溫度,其中第25位是符號+/-函數
0067011990999991950051507+0000+oop
0043011990999991950051512+0022+命令行
0043011990999991950051518-0011+線程
0043012650999991949032412+0111+orm
0043012650999991949032418+0078+xml
0067011990999991937051507+0001+
0043011990999991937051512-0002+
0043011990999991945051518+0001+
0043012650999991945032412+0002+
0043012650999991945032418+0078+
如今須要統計出每一年的最高溫度。
Map-Reduce主要包括兩個步驟:Map和Reduce
每一步都有key-value對做爲輸入和輸出:
map階段的key-value對的格式是由輸入的格式所決定的,若是是默認的TextInputFormat,則每行做爲一個記錄進程處理,其中key爲此行的開頭相對於文件的起始位置,value就是此行的字符文本
map階段的輸出的key-value對的格式必須同reduce階段的輸入key-value對的格式相對應
對於上面的例子,在map過程,輸入的key-value對以下:
(0, 0067011990999991950051507+0000+)
(33, 0043011990999991950051512+0022+)
(66, 0043011990999991950051518-0011+)
(99, 0043012650999991949032412+0111+)
(132, 0043012650999991949032418+0078+)
(165, 0067011990999991937051507+0001+)
(198, 0043011990999991937051512-0002+)
(231, 0043011990999991945051518+0001+)
(264, 0043012650999991945032412+0002+)
(297, 0043012650999991945032418+0078+)
在map過程當中,經過對每一行字符串的解析,獲得年-溫度的key-value對做爲輸出:
(1950, 0)
(1950, 22)
(1950, -11)
(1949, 111)
(1949, 78)
(1937, 1)
(1937, -2)
(1945, 1)
(1945, 2)
(1945, 78)
在reduce過程,將map過程當中的輸出,按照相同的key將value放到同一個列表中做爲reduce的輸入
(1950, [0, 22, –11])
(1949, [111, 78])
(1937, [1, -2])
(1945, [1, 2, 78])
在reduce過程當中,在列表中選擇出最大的溫度,將年-最大溫度的key-value做爲輸出:
(1950, 22)
(1949, 111)
(1937, 1)
(1945, 78)
其邏輯過程可用以下圖表示:
image
二、編寫Map-Reduce程序
編寫Map-Reduce程序,通常須要實現兩個函數:mapper中的map函數和reducer中的reduce函數。
通常遵循如下格式:
map: (K1, V1) -> list(K2, V2)
public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable {
void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)
throws IOException;
}
reduce: (K2, list(V)) -> list(K3, V3)
public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {
void reduce(K2 key, Iterator<V2> values,
OutputCollector<K3, V3> output, Reporter reporter)
throws IOException;
}
對於上面的例子,則實現的mapper以下:
public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(25) == '+') {
airTemperature = Integer.parseInt(line.substring(26, 30));
} else {
airTemperature = Integer.parseInt(line.substring(25, 30));
}
output.collect(new Text(year), new IntWritable(airTemperature));
}
}
實現的reducer以下:
public class MaxTemperatureReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int maxValue = Integer.MIN_VALUE;
while (values.hasNext()) {
maxValue = Math.max(maxValue, values.next().get());
}
output.collect(key, new IntWritable(maxValue));
}
}
欲運行上面實現的Mapper和Reduce,則須要生成一個Map-Reduce得任務(Job),其基本包括如下三部分:
輸入的數據,也即須要處理的數據
Map-Reduce程序,也即上面實現的Mapper和Reducer
此任務的配置項JobConf
欲配置JobConf,須要大體瞭解Hadoop運行job的基本原理:
Hadoop將Job分紅task進行處理,共兩種task:map task和reduce task
Hadoop有兩類的節點控制job的運行:JobTracker和TaskTracker
JobTracker協調整個job的運行,將task分配到不一樣的TaskTracker上
TaskTracker負責運行task,並將結果返回給JobTracker
Hadoop將輸入數據分紅固定大小的塊,咱們稱之input split
Hadoop爲每個input split建立一個task,在此task中依次處理此split中的一個個記錄(record)
Hadoop會盡可能讓輸入數據塊所在的DataNode和task所執行的DataNode(每一個DataNode上都有一個TaskTracker)爲同一個,能夠提升運行效率,因此input split的大小也通常是HDFS的block的大小。
Reduce task的輸入通常爲Map Task的輸出,Reduce Task的輸出爲整個job的輸出,保存在HDFS上。
在reduce中,相同key的全部的記錄必定會到同一個TaskTracker上面運行,然而不一樣的key能夠在不一樣的TaskTracker上面運行,咱們稱之爲partition
partition的規則爲:(K2, V2) –> Integer, 也即根據K2,生成一個partition的id,具備相同id的K2則進入同一個partition,被同一個TaskTracker上被同一個Reducer進行處理。
public interface Partitioner<K2, V2> extends JobConfigurable {
int getPartition(K2 key, V2 value, int numPartitions);
}
下圖大概描述了Map-Reduce的Job運行的基本原理:
image
下面咱們討論JobConf,其有不少的項能夠進行配置:
setInputFormat:設置map的輸入格式,默認爲TextInputFormat,key爲LongWritable, value爲Text
setNumMapTasks:設置map任務的個數,此設置一般不起做用,map任務的個數取決於輸入的數據所能分紅的input split的個數
setMapperClass:設置Mapper,默認爲IdentityMapper
setMapRunnerClass:設置MapRunner, map task是由MapRunner運行的,默認爲MapRunnable,其功能爲讀取input split的一個個record,依次調用Mapper的map函數
setMapOutputKeyClass和setMapOutputValueClass:設置Mapper的輸出的key-value對的格式
setOutputKeyClass和setOutputValueClass:設置Reducer的輸出的key-value對的格式
setPartitionerClass和setNumReduceTasks:設置Partitioner,默認爲HashPartitioner,其根據key的hash值來決定進入哪一個partition,每一個partition被一個reduce task處理,因此partition的個數等於reduce task的個數
setReducerClass:設置Reducer,默認爲IdentityReducer
setOutputFormat:設置任務的輸出格式,默認爲TextOutputFormat
FileInputFormat.addInputPath:設置輸入文件的路徑,可使一個文件,一個路徑,一個通配符。能夠被調用屢次添加多個路徑
FileOutputFormat.setOutputPath:設置輸出文件的路徑,在job運行前此路徑不該該存在
固然不用全部的都設置,由上面的例子,能夠編寫Map-Reduce程序以下:
public class MaxTemperature {
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
JobConf conf = new JobConf(MaxTemperature.class);
conf.setJobName("Max temperature");
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setReducerClass(MaxTemperatureReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
}
}
三、Map-Reduce數據流(data flow)
Map-Reduce的處理過程主要涉及如下四個部分:
客戶端Client:用於提交Map-reduce任務job
JobTracker:協調整個job的運行,其爲一個Java進程,其main class爲JobTracker
TaskTracker:運行此job的task,處理input split,其爲一個Java進程,其main class爲TaskTracker
HDFS:hadoop分佈式文件系統,用於在各個進程間共享Job相關的文件
image
3.一、任務提交
JobClient.runJob()建立一個新的JobClient實例,調用其submitJob()函數。
向JobTracker請求一個新的job ID
檢測此job的output配置
計算此job的input splits
將Job運行所需的資源拷貝到JobTracker的文件系統中的文件夾中,包括job jar文件,job.xml配置文件,input splits
通知JobTracker此Job已經能夠運行了
提交任務後,runJob每隔一秒鐘輪詢一次job的進度,將進度返回到命令行,直到任務運行完畢。
3.二、任務初始化
當JobTracker收到submitJob調用的時候,將此任務放到一個隊列中,job調度器將從隊列中獲取任務並初始化任務。
初始化首先建立一個對象來封裝job運行的tasks, status以及progress。
在建立task以前,job調度器首先從共享文件系統中得到JobClient計算出的input splits。
其爲每一個input split建立一個map task。
每一個task被分配一個ID。
3.三、任務分配
TaskTracker週期性的向JobTracker發送heartbeat。
在heartbeat中,TaskTracker告知JobTracker其已經準備運行一個新的task,JobTracker將分配給其一個task。
在JobTracker爲TaskTracker選擇一個task以前,JobTracker必須首先按照優先級選擇一個Job,在最高優先級的Job中選擇一個task。
TaskTracker有固定數量的位置來運行map task或者reduce task。
默認的調度器對待map task優先於reduce task
當選擇reduce task的時候,JobTracker並不在多個task之間進行選擇,而是直接取下一個,由於reduce task沒有數據本地化的概念。
3.四、任務執行
TaskTracker被分配了一個task,下面便要運行此task。
首先,TaskTracker將此job的jar從共享文件系統中拷貝到TaskTracker的文件系統中。
TaskTracker從distributed cache中將job運行所須要的文件拷貝到本地磁盤。
其次,其爲每一個task建立一個本地的工做目錄,將jar解壓縮到文件目錄中。
其三,其建立一個TaskRunner來運行task。
TaskRunner建立一個新的JVM來運行task。
被建立的child JVM和TaskTracker通訊來報告運行進度。
3.4.一、Map的過程
MapRunnable從input split中讀取一個個的record,而後依次調用Mapper的map函數,將結果輸出。
map的輸出並非直接寫入硬盤,而是將其寫入緩存memory buffer。
當buffer中數據的到達必定的大小,一個背景線程將數據開始寫入硬盤。
在寫入硬盤以前,內存中的數據經過partitioner分紅多個partition。
在同一個partition中,背景線程會將數據按照key在內存中排序。
每次從內存向硬盤flush數據,都生成一個新的spill文件。
當此task結束以前,全部的spill文件被合併爲一個整的被partition的並且排好序的文件。
reducer能夠經過http協議請求map的輸出文件,tracker.http.threads能夠設置http服務線程數。
3.4.二、Reduce的過程
當map task結束後,其通知TaskTracker,TaskTracker通知JobTracker。
對於一個job,JobTracker知道TaskTracer和map輸出的對應關係。
reducer中一個線程週期性的向JobTracker請求map輸出的位置,直到其取得了全部的map輸出。
reduce task須要其對應的partition的全部的map輸出。
reduce task中的copy過程即當每一個map task結束的時候就開始拷貝輸出,由於不一樣的map task完成時間不一樣。
reduce task中有多個copy線程,能夠並行拷貝map輸出。
當不少map輸出拷貝到reduce task後,一個背景線程將其合併爲一個大的排好序的文件。
當全部的map輸出都拷貝到reduce task後,進入sort過程,將全部的map輸出合併爲大的排好序的文件。
最後進入reduce過程,調用reducer的reduce函數,處理排好序的輸出的每一個key,最後的結果寫入HDFS。
image
3.五、任務結束
當JobTracker得到最後一個task的運行成功的報告後,將job得狀態改成成功。
當JobClient從JobTracker輪詢的時候,發現此job已經成功結束,則向用戶打印消息,從runJob函數中返回。