MapReduce的運行流程概述

MapReduce處理數據的大體流程

InputFormat調用RecordReader,從輸入目錄的文件中,讀取一組數據,封裝爲keyin-valuein對象html

②將封裝好的key-value,交給Mapper.map()------>將處理的結果寫出 keyout-valueout編程

ReduceTask啓動Reducer,使用Reducer.reduce()處理Mapper的keyout-valueoutapp

④OutPutFormat調用RecordWriter,將Reducer處理後的keyout-valueout寫出到文件框架

關於這些名詞的解釋參考我以前的文章MapReduce計算框架的核心編程思想oop

示例

需求: 統計/hello目錄中每一個文件的單詞數量
a-p開頭的單詞放入到一個結果文件中,
q-z開頭的單詞放入到另一個結果文件中。
spa

例如:
/hello/a.txt,文件大小200M
hello,hi,hadoop
hive,hadoop,hive,
zoo,spark,wow
zoo,spark,wow
...線程

/hello/b.txt,文件大小100M
hello,hi,hadoop
zoo,spark,wow
...code

1. Map階段(運行MapTask,將一個大的任務切分爲若干小任務,處理輸出階段性的結果)

①切片(切分數據)
/hello/a.txt 200M
/hello/b.txt 100Morm

默認的切分策略是以文件爲單位,以文件的塊大小(128M)片大小進行切片!
split0:/hello/a.txt,0-128M
split1: /hello/a.txt,128M-200M
split2: /hello/b.txt,0M-100Mhtm

②運行MapTask(進程),每一個MapTask負責一片數據

split0:/hello/a.txt,0-128M--------MapTask1
split1: /hello/a.txt,128M-200M--------MapTask2
split2: /hello/b.txt,0M-100M--------MapTask3

③讀取數據階段

在MR中,全部的數據必須封裝爲key-value
MapTask1,2,3都會初始化一個InputFormat(默認TextInputFormat),每一個InputFormat對象負責建立一個RecordReader(LineRecordReader)對象,
RecordReader負責從每一個切片的數據中讀取數據,封裝爲key-value

LineRecordReader: 將文件中的每一行封裝爲一個key(offset)-value(當前行的內容)
舉例:
hello,hi,hadoop----->(0,hello,hi,hadoop)
hive,hadoop,hive----->(20,hive,hadoop,hive)
zoo,spark,wow----->(30,zoo,spark,wow)
zoo,spark,wow----->(40,zoo,spark,wow)

④進入Mapper的map()階段

map()是Map階段的核心處理邏輯! 單詞統計! map()會循環調用,對輸入的每一個Key-value都進行處理!
輸入:(0,hello,hi,hadoop)
輸出:(hello,1),(hi,1),(hadoop,1)

輸入:(20,hive,hadoop,hive)
輸出:(hive,1),(hadoop,1),(hive,1)

輸入:(30,zoo,spark,wow)
輸出:(zoo,1),(spark,1),(wow,1)

輸入:(40,zoo,spark,wow)
輸出:(zoo,1),(spark,1),(wow,1)

⑤將MapTask輸出的記錄進行分區(分組、分類)

在Mapper輸出後,調用Partitioner,對Mapper輸出的key-value進行分區,分區後也會排序(默認字典順序排序)
分區規則:

  • a-p開頭的單詞放入到一個區
  • q-z開頭的單詞放入到另外一個區
    MapTask1:
    0號區: (hadoop,1),(hadoop,1),(hello,1),(hi,1),(hive,1),(hive,1)
    1號區: (spark,1),(spark,1),(wow,1) ,(wow,1),(zoo,1)(zoo,1)

MapTask2:
0號區: ...
1號區: ...

MapTask3:
0號區: (hadoop,1),(hello,1),(hi,1),
1號區: (spark,1),(wow,1),(zoo,1)

2.Reduce階段

①由於需求是生成兩個結果文件,因此咱們須要啓動兩個ReduceTask
ReduceTask啓動後,會啓動shuffle線程,從MapTask中拷貝相應分區的數據!

ReduceTask1: 只負責0號區
將三個MapTask,生成的0號區數據所有拷貝到ReduceTask所在的機器!
(hadoop,1),(hadoop,1),(hello,1),(hi,1),(hive,1),(hive,1)
(hadoop,1),(hello,1),(hi,1),

ReduceTask2: 只負責1號區
將三個MapTask,生成的1號區數據所有拷貝到ReduceTask所在的機器!
(spark,1),(spark,1),(wow,1) ,(wow,1),(zoo,1)(zoo,1)
(spark,1),(wow,1),(zoo,1)

②sort

ReduceTask1: 只負責0號區進行排序:
(hadoop,1),(hadoop,1),(hadoop,1),(hello,1),(hello,1),(hi,1),(hi,1),(hive,1),(hive,1)
ReduceTask2: 只負責1號區進行排序:
(spark,1),(spark,1),(spark,1),(wow,1) ,(wow,1),(wow,1),(zoo,1),(zoo,1)(zoo,1)

③reduce
ReduceTask1---->Reducer----->reduce(一次讀入一組數據)

何爲一組數據: key相同的爲一組數據
輸入: (hadoop,1),(hadoop,1),(hadoop,1)
輸出: (hadoop,3)

輸入: (hello,1),(hello,1)
輸出: (hello,2)

輸入: (hi,1),(hi,1)
輸出: (hi,2)

輸入:(hive,1),(hive,1)
輸出: (hive,2)

ReduceTask2---->Reducer----->reduce(一次讀入一組數據)

輸入: (spark,1),(spark,1),(spark,1)
輸出: (spark,3)

輸入: (wow,1) ,(wow,1),(wow,1)
輸出: (wow,3)

輸入:(zoo,1),(zoo,1)(zoo,1)
輸出: (zoo,3)

④調用OutPutFormat中的RecordWriter將Reducer輸出的記錄寫出
ReduceTask1---->OutPutFormat(默認TextOutPutFormat)---->RecordWriter(LineRecoreWriter)
LineRecoreWriter將一個key-value以一行寫出,key和alue之間使用\t分割
在輸出目錄中,生成文件part-r-0000
hadoop 3
hello 2
hi 2
hive 2

ReduceTask2---->OutPutFormat(默認TextOutPutFormat)------>RecordWriter(LineRecoreWriter)
LineRecoreWriter將一個key-value以一行寫出,key和alue之間使用\t分割
在輸出目錄中,生成文件part-r-0001
spark 3
wow 3
zoo 3

3、MR總結

Map階段(MapTask): 切片(Split)-----讀取數據(Read)-------交給Mapper處理(Map)------分區和排序(sort)

Reduce階段(ReduceTask): 拷貝數據(copy)------排序(sort)-----合併(reduce)-----寫出(write)

相關文章
相關標籤/搜索