從這篇文章開始,我會開始系統性地輸出在大數據踩坑過程當中的積累,後面會涉及到實戰項目的具體操做,目前的規劃是按照系列來更新,力爭作到一個系列在5
篇文章以內總結出最核心的乾貨,若是是涉及到理論方面的文章,會以畫圖的方式來說解,若是是涉及到操做方面,會以實際的代碼來演示。java
這篇是MapReduce
系列的第一篇,初識MapReduce
的應用場景,在文章後面會有關於代碼的演示。python
Hadoop
做爲Apache
旗下的一個以Java
語言實現的分佈式計算開源框架,其由兩個部分組成,一個是分佈式的文件系統HDFS
,另外一個是批處理計算框架MapReduce
。這篇文章做爲MapReduce
系列的第一篇文章,會從MapReduce
的產生背景、框架的計算流程、應用場景和演示Demo
來說解,主要是讓你們對MapReduce
的這個批計算框架有個初步的瞭解及簡單的部署和使用。git
MapReduce
的產生背景MapReduce
的計算流程MapReduce
的框架架構MapReduce
的生命週期Demo
Google
在2004年的時候在 MapReduce: Simplified Data Processing on Large Clusters 這篇論文中提出了MapReduce
的功能特性和設計理念,設計MapReduce
的出發點就是爲了解決如何把大問題分解成獨立的小問題,再並行解決。例如,MapReduce
的經典使用場景之一就是對一篇長文進行詞頻統計,統計過程就是先把文章分爲一句一句,而後進行分割,最後進行詞的數量統計。github
咱們來說解下不一樣的組件做用算法
Client
的含義是指用戶使用MapReduce
程序經過Client
來提交任務到Job Tracker
上,同時用戶也可使用Client
來查看一些做業的運行狀態。apache
這個負責的是資源監控和做業調度。JobTracker
會監控着TaskTracker
和做業的健康情況,會把失敗的任務轉移到其餘節點上,同時也監控着任務的執行進度、資源使用量等狀況,會把這些消息通知任務調度器,而調度器會在資源空閒的時候選擇合適的任務來使用這些資源。編程
任務調度器是一個可插拔的模塊,用戶能夠根據本身的須要來設計相對應的調度器。bash
TaskTracker
會週期性地經過Hearbeat
來向Job Tracker
彙報本身的資源使用狀況和任務的運行進度。會接受來自於JobTaskcker
的指令來執行操做(例如啓動新任務、殺死任務之類的)。架構
在TaskTracker
中經過的是slot
來進行等量劃分一個節點上資源量,只用Task
得到slot
的時候纔有機會去運行。調度器的做用就是進行將空閒的slot
分配給Task
使用,能夠配置slot
的數量來進行限定Task上的併發度。併發
Task分爲Map Task
和Reduce Task
,在MapReduce
中的 split
就是一個 Map Task
,split
的大小能夠設置的,由 mapred.max.spilt.size
參數來設置,默認是 Hadoop
中的block
的大小,在Hadoop 2.x
中默認是128M
,在Hadoop 1.x
中默認是64M
。
在Task
中的設置能夠這麼設置,通常來說,會把一個文件設置爲一個split
,若是是小文件,那麼就會存在不少的Map Task
,這是特別浪費資源的,若是split
切割的數據塊的量大,那麼會致使跨節點去獲取數據,這樣也是消耗不少的系統資源的。
一共分爲5個步驟:
由用戶提交做業以前,須要先把文件上傳到HDFS
上,JobClient
使用upload
來加載關於打包好的jar
包,JobClient
會RPC
建立一個JobInProcess
來進行管理任務,而且建立一個TaskProcess
來管理控制關於每個Task
。
JobTracker
會調度和管理任務,一發現有空閒資源,會按照一個策略選擇一個合適的任務來使用該資源。
任務調度器有兩個點:一個是保證做業的順利運行,若是有失敗的任務時,會轉移計算任務,另外一個是若是某一個Task的計算結果落後於同一個Task的計算結果時,會啓動另外一個Task來作計算,最後去計算結果最塊的那個。
TaskTracker會爲每個Task來準備一個獨立的JVM從而避免不一樣的Task在運行過程當中的一些影響,同時也使用了操做系統來實現資源隔離防止Task濫用資源。
每一個Task的任務進度經過RPC來彙報給TaskTracker,再由TaskTracker彙報給JobTracker。
先來看一張圖,系統地瞭解下 MapReduce
的運算流程。
爲了方便你們理解,從新畫了一張新的圖,演示的是關於如何進行把一個長句進行分割,最後進行詞頻的統計(已忽略掉標點符號)。
整個過程就是先讀取文件,接着進行split
切割,變成一個一個的詞,而後進行 map task
任務,排列出全部詞的統計量,接着 sorting
排序,按照字典序來排,接着就是進行 reduce task
,進行了詞頻的彙總,最後一步就是輸出爲文件。例如圖中的 spacedong
就出現了兩次。
其中對應着的是 Hadoop Mapreduce
對外提供的五個可編程組件,分別是InputFormat
、Mapper
、Partitioner
、Reduce
和OutputFormat
,後續的文章會詳細講解這幾個組件。
用一句話簡單地總結就是,Mapreduce
的運算過程就是進行拆解-排序-彙總,解決的就是統計的問題,使用的思想就是分治的思想。
MapReduce
的產生是爲了把某些大的問題分解成小的問題,而後解決小問題後,大問題也就解決了。那麼通常有什麼樣的場景會運用到這個呢?那可多了去,簡單地列舉幾個經典的場景。
URL
的訪問頻率搜索引擎的使用中,會遇到大量的URL的訪問,因此,可使用 MapReduce
來進行統計,得出(URL
,次數)結果,在後續的分析中可使用。
Map
函數去分析文件格式是(詞,文檔號)的列表,Reduce
函數就分析這個(詞,文檔號),排序全部的文檔號,輸出(詞,list
(文檔號)),這個就能夠造成一個簡單的倒排索引,是一種簡單的算法跟蹤詞在文檔中的位置。
在各類的文檔分析,或者是不一樣的場景中,常常會遇到關於 Top K
的問題,例如輸出這篇文章的出現前5
個最多的詞彙。這個時候也可使用 MapReduce
來進行統計。
Demo
今天的代碼演示從Python
和Java
兩個版本的演示,Python
版本的話即是不使用封裝的包,Java
版本的話則是使用了Hadoop
的封裝包。接下來便進行演示一個MapReduce
的簡單使用,如何進行詞彙統計。
Java
版本代碼txt
格式的。文件名是WordMRDemo.txt
,內容是下面簡短的一句話,以空格分割開:hello my name is spacedong welcome to the spacedong thank you
Hadoop
的依賴包//這裏使用的是2.6.5的依賴包,你可使用其餘版本的
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.5</version>
</dependency>
複製代碼
WordMapper.java
文件,代碼的做用是進行以空格的形式進行分詞。public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper.Context context)
throws java.io.IOException, InterruptedException {
String line = value.toString();
//StringTokenizer默認按照空格來切
StringTokenizer st = new StringTokenizer(line);
while (st.hasMoreTokens()) {
String world = st.nextToken();
//map輸出
context.write(new Text(world), new IntWritable(1));
}
}
}
複製代碼
WordReduce.java
文件,做用是進行詞彙的統計。public class WordReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> iterator, Context context)
throws java.io.IOException ,InterruptedException {
int sum = 0 ;
for(IntWritable i:iterator){
sum+=i.get();
}
context.write(key, new IntWritable(sum));
}
}
複製代碼
WordMRDemo.java
文件,做用是運行Job
,開始分析句子。public class WordMRDemo {
public static void main(String[] args) {
Configuration conf = new Configuration();
//設置mapper的配置,既就是hadoop/conf/mapred-site.xml的配置信息
conf.set("mapred.job.tracker", "hadoop:9000");
try {
//新建一個Job工做
Job job = new Job(conf);
//設置運行類
job.setJarByClass(WordMRDemo.class);
//設置要執行的mapper類
job.setMapperClass(WordMapper.class);
//設置要執行的reduce類
job.setReducerClass(WordReduce.class);
//設置輸出key的類型
job.setMapOutputKeyClass(Text.class);
//設置輸出value的類型
job.setMapOutputValueClass(IntWritable.class);
//設置ruduce任務的個數,默認個數爲一個(通常reduce的個數越多效率越高)
//job.setNumReduceTasks(2);
//mapreduce 輸入數據的文件/目錄,注意,這裏能夠輸入的是目錄。
FileInputFormat.addInputPath(job, new Path("F:\\BigDataWorkPlace\\data\\input"));
//mapreduce 執行後輸出的數據目錄,不能預先存在,不然會報錯。
FileOutputFormat.setOutputPath(job, new Path("F:\\BigDataWorkPlace\\data\\out"));
//執行完畢退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
複製代碼
WordMRDemo.java
文件,而後獲得的結果是out
文件夾內的內容,它長這個樣子:
打開part-r-00000
文件的內容以下
map.py
文件,進行詞彙的切割。for line in sys.stdin:
time.sleep(1000)
ss = line.strip().split(' ')
for word in ss:
print '\t'.join([word.strip(), '1'])
複製代碼
red.py
文件,進行詞彙的統計。cur_word = None
sum = 0
for line in sys.stdin:
ss = line.strip().split('\t')
if len(ss) != 2:
continue
word, cnt = ss
if cur_word == None:
cur_word = word
if cur_word != word:
print '\t'.join([cur_word, str(sum)])
cur_word = word
sum = 0
sum += int(cnt)
print '\t'.join([cur_word, str(sum)])
複製代碼
run.sh
文件,直接運行便可。HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"
INPUT_FILE_PATH_1="/test.txt"
OUTPUT_PATH="/output"
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_1 \
-output $OUTPUT_PATH \
-mapper "python map.py" \
-reducer "python red.py" \
-file ./map.py \
-file ./red.py
複製代碼
以上的是演示demo
的核心代碼,完整的代碼能夠上github
的代碼倉庫上獲取。 倉庫地址爲:https://github.com/spacedong/bigDataNotes
以上的文章是MapReduce
系列的第一篇,下篇預告是MapReduce的編程模型
,敬請期待!
參考資料:
Hadoop的技術內幕:深刻解析MapReduce架構設計及實現原理