假設咱們須要處理一批有關天氣的數據,其格式以下: html
0067011990999991950051507+0000+ java 0043011990999991950051512+0022+ 緩存 0043011990999991950051518-0011+ app 0043012650999991949032412+0111+ 分佈式 0043012650999991949032418+0078+ ide 0067011990999991937051507+0001+ 函數 0043011990999991937051512-0002+ oop 0043011990999991945051518+0001+ spa 0043012650999991945032412+0002+ 命令行 0043012650999991945032418+0078+ |
如今須要統計出每一年的最高溫度。
Map-Reduce主要包括兩個步驟:Map和Reduce
每一步都有key-value對做爲輸入和輸出:
對於上面的例子,在map過程,輸入的key-value對以下:
(0, 0067011990999991950051507+0000+) (33, 0043011990999991950051512+0022+) (66, 0043011990999991950051518-0011+) (99, 0043012650999991949032412+0111+) (132, 0043012650999991949032418+0078+) (165, 0067011990999991937051507+0001+) (198, 0043011990999991937051512-0002+) (231, 0043011990999991945051518+0001+) (264, 0043012650999991945032412+0002+) (297, 0043012650999991945032418+0078+) |
在map過程當中,經過對每一行字符串的解析,獲得年-溫度的key-value對做爲輸出:
(1950, 0) (1950, 22) (1950, -11) (1949, 111) (1949, 78) (1937, 1) (1937, -2) (1945, 1) (1945, 2) (1945, 78) |
在reduce過程,將map過程當中的輸出,按照相同的key將value放到同一個列表中做爲reduce的輸入
(1950, [0, 22, –11]) (1949, [111, 78]) (1937, [1, -2]) (1945, [1, 2, 78]) |
在reduce過程當中,在列表中選擇出最大的溫度,將年-最大溫度的key-value做爲輸出:
(1950, 22) (1949, 111) (1937, 1) (1945, 78) |
其邏輯過程可用以下圖表示:
通常遵循如下格式
map: (K1, V1) -> list(K2, V2)
public interface Mapper<K1,V1,K2,V2> extends JobConfigurable, Closeable{ void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter)throw IOException; }
reduce: (K2,list(v)) -> list(K3,V3)
public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable { void reduce(K2 key, Iterator<V2> values, OutputCollector<K3, V3> output, Reporter reporter) throws IOException; }對於上面的例子,則實現的mapper以下:
public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { @Override public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(25) == '+') { airTemperature = Integer.parseInt(line.substring(26, 30)); } else { airTemperature = Integer.parseInt(line.substring(25, 30)); } output.collect(new Text(year), new IntWritable(airTemperature)); } }實現的reducer以下:
public class MaxTemperatureReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int maxValue = Integer.MIN_VALUE; while (values.hasNext()) { maxValue = Math.max(maxValue, values.next().get()); } output.collect(key, new IntWritable(maxValue)); } }
欲運行上面實現的Mapper和Reduce,則須要生成一個Map-Reduce的任務(Job),基本包括如下三部分:‘
欲配置JobConf,須要大體瞭解Hadoop運行Job的基本原理:
public interface Partitioner<K2, V2> extends JobConfigurable { int getPartition(K2 key, V2 value, int numPartitions); }下圖大概描述了Map-Reduce的Job運行的基本原理:
下面咱們討論JobConf,其有不少的項能夠進行配置:
固然不用全部的都設置,由上面的例子,能夠編寫Map-Reduce程序以下:
public class MapTemperature { public static void main (String[] args)throws IOException{ if(args.length != 2){ System.err.println("Usage: MaxTemperature <input path> <output path>"); System.exit(-1); } JobConf conf = new JobConf(MaxTemperature.class); conf.setJobName("Max temperature"); FileInputFormat.addInputPath(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.setMapperClass(MaxTemperatureMapper.class); conf.setReducerClass(MaxTemperatureReducer.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); JobClient.runJob(conf); } }
提交任務後,runJob每隔一秒鐘輪詢一次job的進度,將進度返回到命令行,直到任務運行完畢。
在建立task以前,job調度器首先從共享文件系統中得到JobClient計算出的input split。其爲每一個input split建立一個map task。每一個task被分配一個ID。
TaskTracker週期性的向JobTracker發送heartbeat。
在heartbeat中,TaskTracker告知JobTracker其已經準備運行一個新的task,JobTracker將分配給其一個task。
在JobTracker爲TaskTracker選擇一個task以前,JobTracker必須首先按照優先級選擇一個Job,在最高優先級的Job中選擇一個task。TaskTracker有固定數量的位置來運行map task 或者 reduce task。
默認的調度器對待map task優先於reduce task
當選擇reduce task的時候,JobTracker並不在多個task之間進行選擇,而是直接取下一個,由於reduce task沒有數據本地化的概念。
首先,TaskTracker將此job的jar從共享文件系統中拷貝到TaskTracker的文件系統中。
TaskTracker從distributed cache中將job運行所須要的文件拷貝到本地磁盤。
其次,其爲每一個task建立一個本地的工做目錄,將jar解壓縮到文件目錄中。
其三,其建立一個TaskRunner來運行task。
TaskRunner建立一新的JVM來運行task。
被建立的child JVM和TaskTracker通訊來報告運行進度。
MapRunnable從input split中讀取一個個的record,而後依次調用Mapper的map函數,將結果輸出。
map的輸出並非直接寫入硬盤,而是將其寫入緩存memory buffer。
當buffer中數據的到達必定的大小,一個背景線程將數據開始寫入硬盤。
在寫入硬盤以前,內存中的數據經過partitioner分紅多個partition。
在同一個partition中,背景線程會將數據按照key在內存中排序。
每次從內存向硬盤flush數據,都生成一個新的spill文件。
當此task結束以前,全部的spill文件被合併爲一個整的被partition的並且排好序的文件。
reducer能夠經過http協議請求map的輸出文件,tracker.http.threads能夠設置http服務線程數。
當map task結束後,其通知TaskTracker,TaskTracker通知JobTracker。
對於一個job,JobTracker知道TaskTracer和map輸出的對應關係。
reducer中一個線程週期性的向JobTracker請求map輸出的位置,直到其取得了全部的map輸出。
reduce task須要其對應的partition的全部的map輸出。
reduce task中的copy過程即當每一個map task結束的時候就開始拷貝輸出,由於不一樣的map task完成時間不一樣。
reduce task中有多個copy線程,能夠並行拷貝map輸出。
當不少map輸出拷貝到reduce task後,一個背景線程將其合併爲一個大的排好序的文件。
當全部的map輸出都拷貝到reduce task後,進入sort過程,將全部的map輸出合併爲大的排好序的文件。
最後進入reduce過程,調用reducer的reduce函數,處理排好序的輸出的每一個key,最後的結果寫入HDFS。
當JobTracker得到最後一個task的運行成功的報告後,將job得狀態改成成功。
當JobClient從JobTracker輪詢的時候,發現此job已經成功結束,則向用戶打印消息,從runJob函數中返回。
hadoop運行痕跡http://www.cnblogs.com/forfuture1978/archive/2010/11/23/1884967.html