初識MapReduce的應用場景(附JAVA和Python代碼)

從這篇文章開始,我會開始系統性地輸出在大數據踩坑過程當中的積累,後面會涉及到實戰項目的具體操做,目前的規劃是按照系列來更新,力爭作到一個系列在5篇文章以內總結出最核心的乾貨,若是是涉及到理論方面的文章,會以畫圖的方式來說解,若是是涉及到操做方面,會以實際的代碼來演示。java

這篇是MapReduce系列的第一篇,初識MapReduce的應用場景,在文章後面會有關於代碼的演示。python

前言

Hadoop做爲Apache旗下的一個以Java語言實現的分佈式計算開源框架,其由兩個部分組成,一個是分佈式的文件系統HDFS,另外一個是批處理計算框架MapReduce。這篇文章做爲MapReduce系列的第一篇文章,會從MapReduce的產生背景、框架的計算流程、應用場景和演示Demo來說解,主要是讓你們對MapReduce的這個批計算框架有個初步的瞭解及簡單的部署和使用。git

目錄

MapReduce的產生背景

MapReduce的計算流程

MapReduce的框架架構

MapReduce的生命週期

應用場景

演示Demo

MapReduce的產生背景

Google 在2004年的時候在 MapReduce: Simplified Data Processing on Large Clusters 這篇論文中提出了MapReduce 的功能特性和設計理念,設計MapReduce 的出發點就是爲了解決如何把大問題分解成獨立的小問題,再並行解決。例如,MapReduce的經典使用場景之一就是對一篇長文進行詞頻統計,統計過程就是先把文章分爲一句一句,而後進行分割,最後進行詞的數量統計。github

MapReduce的架構圖

MapReduce的架構圖
這裏的Client和TaskTracker我都使用一個來簡化了,在實際中是會有很個Client和TaskTracker的。

咱們來說解下不一樣的組件做用算法

Client

Client的含義是指用戶使用MapReduce程序經過Client來提交任務到Job Tracker上,同時用戶也可使用Client來查看一些做業的運行狀態。apache

Job Tracker

這個負責的是資源監控和做業調度。JobTracker會監控着TaskTracker和做業的健康情況,會把失敗的任務轉移到其餘節點上,同時也監控着任務的執行進度、資源使用量等狀況,會把這些消息通知任務調度器,而調度器會在資源空閒的時候選擇合適的任務來使用這些資源。編程

任務調度器是一個可插拔的模塊,用戶能夠根據本身的須要來設計相對應的調度器。bash

TaskTracker

TaskTracker會週期性地經過Hearbeat來向Job Tracker彙報本身的資源使用狀況和任務的運行進度。會接受來自於JobTaskcker的指令來執行操做(例如啓動新任務、殺死任務之類的)。架構

TaskTracker中經過的是slot來進行等量劃分一個節點上資源量,只用Task得到slot的時候纔有機會去運行。調度器的做用就是進行將空閒的slot分配給Task使用,能夠配置slot的數量來進行限定Task上的併發度。併發

Task

Task分爲Map TaskReduce Task,在MapReduce中的 split 就是一個 Map Task,split 的大小能夠設置的,由 mapred.max.spilt.size 參數來設置,默認是 Hadoop中的block的大小,在Hadoop 2.x中默認是128M,在Hadoop 1.x中默認是64M

Task中的設置能夠這麼設置,通常來說,會把一個文件設置爲一個split,若是是小文件,那麼就會存在不少的Map Task,這是特別浪費資源的,若是split切割的數據塊的量大,那麼會致使跨節點去獲取數據,這樣也是消耗不少的系統資源的。

MapReduce的生命週期

MapReduce的生命週期

一共分爲5個步驟:

  1. 做業的提交和初始化

由用戶提交做業以前,須要先把文件上傳到HDFS上,JobClient使用upload來加載關於打包好的jar包,JobClientRPC建立一個JobInProcess來進行管理任務,而且建立一個TaskProcess來管理控制關於每個Task

  1. JobTracker調度任務

JobTracker會調度和管理任務,一發現有空閒資源,會按照一個策略選擇一個合適的任務來使用該資源。

任務調度器有兩個點:一個是保證做業的順利運行,若是有失敗的任務時,會轉移計算任務,另外一個是若是某一個Task的計算結果落後於同一個Task的計算結果時,會啓動另外一個Task來作計算,最後去計算結果最塊的那個。

  1. 任務運行環境

TaskTracker會爲每個Task來準備一個獨立的JVM從而避免不一樣的Task在運行過程當中的一些影響,同時也使用了操做系統來實現資源隔離防止Task濫用資源。

  1. 執行任務

每一個Task的任務進度經過RPC來彙報給TaskTracker,再由TaskTracker彙報給JobTracker。

  1. 任務結束,寫入輸出的文件到HDFS中。

MapReduce 的計算流程

先來看一張圖,系統地瞭解下 MapReduce 的運算流程。

MapReduce的運算流程

爲了方便你們理解,從新畫了一張新的圖,演示的是關於如何進行把一個長句進行分割,最後進行詞頻的統計(已忽略掉標點符號)。

簡單的實操例子

整個過程就是先讀取文件,接着進行split切割,變成一個一個的詞,而後進行 map task 任務,排列出全部詞的統計量,接着 sorting 排序,按照字典序來排,接着就是進行 reduce task,進行了詞頻的彙總,最後一步就是輸出爲文件。例如圖中的 spacedong 就出現了兩次。

其中對應着的是 Hadoop Mapreduce 對外提供的五個可編程組件,分別是InputFormatMapperPartitionerReduceOutputFormat,後續的文章會詳細講解這幾個組件。

用一句話簡單地總結就是,Mapreduce的運算過程就是進行拆解-排序-彙總,解決的就是統計的問題,使用的思想就是分治的思想。

MapReduce的應用場景

MapReduce 的產生是爲了把某些大的問題分解成小的問題,而後解決小問題後,大問題也就解決了。那麼通常有什麼樣的場景會運用到這個呢?那可多了去,簡單地列舉幾個經典的場景。

計算URL的訪問頻率

搜索引擎的使用中,會遇到大量的URL的訪問,因此,可使用 MapReduce 來進行統計,得出(URL,次數)結果,在後續的分析中可使用。

倒排索引

Map 函數去分析文件格式是(詞,文檔號)的列表,Reduce 函數就分析這個(詞,文檔號),排序全部的文檔號,輸出(詞,list(文檔號)),這個就能夠造成一個簡單的倒排索引,是一種簡單的算法跟蹤詞在文檔中的位置。

Top K 問題

在各類的文檔分析,或者是不一樣的場景中,常常會遇到關於 Top K 的問題,例如輸出這篇文章的出現前5個最多的詞彙。這個時候也可使用 MapReduce來進行統計。

演示Demo

今天的代碼演示從PythonJava兩個版本的演示,Python版本的話即是不使用封裝的包,Java版本的話則是使用了Hadoop的封裝包。接下來便進行演示一個MapReduce的簡單使用,如何進行詞彙統計。

Java版本代碼

  • 先是準備一個數據集,包含着已經切割好的詞彙,這裏咱們設置文件的格式是txt格式的。文件名是WordMRDemo.txt,內容是下面簡短的一句話,以空格分割開:

hello my name is spacedong welcome to the spacedong thank you

  • 引入Hadoop的依賴包
//這裏使用的是2.6.5的依賴包,你可使用其餘版本的
       <dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-common</artifactId>
			<version>2.6.5</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>2.6.5</version>
		</dependency>
複製代碼
  • 新建WordMapper.java文件,代碼的做用是進行以空格的形式進行分詞。
public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Mapper.Context context)
            throws java.io.IOException, InterruptedException {
        String line = value.toString();
        //StringTokenizer默認按照空格來切
        StringTokenizer st = new StringTokenizer(line);
        while (st.hasMoreTokens()) {
            String world = st.nextToken();
            //map輸出
            context.write(new Text(world), new IntWritable(1));
        }
    }
}
複製代碼
  • 新建WordReduce.java文件,做用是進行詞彙的統計。
public class WordReduce extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> iterator, Context context)
            throws java.io.IOException ,InterruptedException {
        int sum = 0 ;
        for(IntWritable i:iterator){
            sum+=i.get();
        }
        context.write(key, new IntWritable(sum));
    }
  }
複製代碼
  • 新建WordMRDemo.java文件,做用是運行Job,開始分析句子。
public class WordMRDemo {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        //設置mapper的配置,既就是hadoop/conf/mapred-site.xml的配置信息
        conf.set("mapred.job.tracker", "hadoop:9000");
        try {
            //新建一個Job工做
            Job job = new Job(conf);
            //設置運行類
            job.setJarByClass(WordMRDemo.class);
            //設置要執行的mapper類
            job.setMapperClass(WordMapper.class);
            //設置要執行的reduce類
            job.setReducerClass(WordReduce.class);
            //設置輸出key的類型
            job.setMapOutputKeyClass(Text.class);
            //設置輸出value的類型
            job.setMapOutputValueClass(IntWritable.class);
            //設置ruduce任務的個數,默認個數爲一個(通常reduce的個數越多效率越高)
            //job.setNumReduceTasks(2);
            //mapreduce 輸入數據的文件/目錄,注意,這裏能夠輸入的是目錄。
            FileInputFormat.addInputPath(job, new Path("F:\\BigDataWorkPlace\\data\\input"));
            //mapreduce 執行後輸出的數據目錄,不能預先存在,不然會報錯。
            FileOutputFormat.setOutputPath(job, new Path("F:\\BigDataWorkPlace\\data\\out"));
            //執行完畢退出
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
複製代碼
  • 最後執行WordMRDemo.java文件,而後獲得的結果是out文件夾內的內容,它長這個樣子:
    out的文件目錄

打開part-r-00000文件的內容以下

具體的文件內容

Python代碼版本

  • 新建map.py文件,進行詞彙的切割。
for line in sys.stdin:
    time.sleep(1000)
    ss = line.strip().split(' ')
    for word in ss:
        print '\t'.join([word.strip(), '1'])
複製代碼
  • 新建red.py文件,進行詞彙的統計。
cur_word = None
sum = 0

for line in sys.stdin:
	ss = line.strip().split('\t')
	if len(ss) != 2:
		continue
	word, cnt = ss

	if cur_word == None:
		cur_word = word

	if cur_word != word:
		print '\t'.join([cur_word, str(sum)])
		cur_word = word
		sum = 0

	sum += int(cnt)

print '\t'.join([cur_word, str(sum)])
複製代碼
  • 新建run.sh文件,直接運行便可。
HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"

INPUT_FILE_PATH_1="/test.txt"
OUTPUT_PATH="/output"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
    -input $INPUT_FILE_PATH_1 \
    -output $OUTPUT_PATH \
    -mapper "python map.py" \
    -reducer "python red.py" \
    -file ./map.py \
    -file ./red.py
複製代碼

以上的是演示demo的核心代碼,完整的代碼能夠上github的代碼倉庫上獲取。 倉庫地址爲:https://github.com/spacedong/bigDataNotes

以上的文章是MapReduce系列的第一篇,下篇預告是MapReduce的編程模型,敬請期待!

參考資料:

Hadoop的技術內幕:深刻解析MapReduce架構設計及實現原理

image
相關文章
相關標籤/搜索