Hadoop解析--MapReduce

從本篇博客開始我們一塊兒來具體瞭解Hadoop的每個部分。咱們在上篇博客中介紹了HDFS,MapReduce,MapReduce爲了更有效率事實上是創建在HDFS之上的。有了分佈式的文件系統,咱們就能在這個系統之上更有效率地進行分佈式的計算。咱們看看它是咱麼實現更優秀的分佈式計算。java

優點

第一。限制大小算法

        因爲HDFS對本地的文件大小作了限制,這樣咱們本地一個任務處理的量是有限的。儘管咱們可以改變這個值。但是也爲更好的運行任務打下了堅實的基礎,分片的處理方式。不不過度開。還有限制。這種思想使咱們欠缺的,分開不過攻克了問你。而限制,是在優化解決方式。數據庫


第二。備份網絡

        HDFS對所有的文件,都會進行備份,這樣就會下降很是多麻煩。咱們以往對文件的備份還原一直是個頭疼的問題。尤爲是數據量上來以後。這件事情變得愈來愈不可控,而HDFS爲計算數據作了備份。這樣咱們的失誤率就會降低,在一臺機器文件毀壞的狀況下。不影響咱們的計算,這就下降了查詢日誌的時間(相對傳統數據庫的備份策略)app


第三。本地計算分佈式

        MapReduce中,所有的計算,都是在本地完畢,及時有計算需要外來數據。也是集合好後完畢。這樣保證了咱們最高效的帶寬利用。使咱們對數據的處理能力隨着集羣數目的增大而線性增大。函數


第四,預處理oop

        在計算的過程當中,假設咱們對數據的處理結果每次都要控制機進行彙總,和咱們可以對計算出的數據,進行預處理,固然是預處理的效果好些,這樣至關於減輕了控制機的壓力。這種設計在前臺js裏也有涉及,咱們經過js讓客戶機運行部分代碼,減輕咱們server的壓力,這種效果,天然是比較優秀的!post


第五,心跳性能

        在MapReduce過程當中。心跳對咱們的幫助也很是大,它幫助咱們維護計算的可靠性,幫助咱們屏蔽一部分因機器故障形成的計算失敗,至關於心跳是咱們計算過程當中主要的保證!

原理

那麼mapreduce是怎麼作的呢。咱們看看這幅原理圖:


再看看一些細節上的圖,幫咱們這裏瞭解下詳細是怎麼執行的:


源代碼

有了前面的認識。咱們經過代碼看看,咱們要秉着一個原則,就是這是簡單的分治法的應用。因此這一切都不復雜,map就是分治法的分。reduce就是分治法的治,將大問題打散成小問題,最後整合小問題的結果:

map:

public static class Map extends MapReduceBase implements 
            Mapper<LongWritable, Text, Text, IntWritable> { 
        private final static IntWritable one = new IntWritable(1); 
        private Text word = new Text();
        public void map(LongWritable key, Text value, 
                OutputCollector<Text, IntWritable> output, Reporter reporter) 
                throws IOException { 
            String line = value.toString(); 
            StringTokenizer tokenizer = new StringTokenizer(line); 
            while (tokenizer.hasMoreTokens()) { 
                word.set(tokenizer.nextToken()); 
                output.collect(word, one); 
            } 
        } 
    }

reduce:

 public static class Reduce extends MapReduceBase implements 
            Reducer<Text, IntWritable, Text, IntWritable> { 
        public void reduce(Text key, Iterator<IntWritable> values, 
                OutputCollector<Text, IntWritable> output, Reporter reporter) 
                throws IOException { 
            int sum = 0; 
            while (values.hasNext()) { 
                sum += values.next().get(); 
            } 
            output.collect(key, new IntWritable(sum)); 
        } 
    }

任務運行的方法:

public static void main(String[] args) throws Exception { 
        JobConf conf = new JobConf(WordCount.class); 
        conf.setJobName("wordcount");
        conf.setOutputKeyClass(Text.class); 
        conf.setOutputValueClass(IntWritable.class);
        conf.setMapperClass(Map.class); 
        conf.setCombinerClass(Reduce.class); 
        conf.setReducerClass(Reduce.class);
        conf.setInputFormat(TextInputFormat.class); 
        conf.setOutputFormat(TextOutputFormat.class);
        FileInputFormat.setInputPaths(conf, new Path(args[0])); 
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        JobClient.runJob(conf); 
    } 


任務方法解析:

首先解說一下 Job 的 初始化過程 。

main 函數調用 Jobconf 類來對 MapReduce Job 進行初始化,而後調用 setJobName() 方法命名這個 Job 。

對Job進行合理的命名有助於 更快 地找到Job,以便在JobTracker和Tasktracker的頁面中對其進行 監視 。

JobConf conf = new JobConf(WordCount. class ); conf.setJobName("wordcount" );

接着設置Job輸出結果<key,value>的中key和value數據類型。因爲結果是<單詞,個數>。因此key設置爲"Text"類型,至關於Java中String類型。

Value設置爲"IntWritable"。至關於Java中的int類型。


conf.setOutputKeyClass(Text.class );
conf.setOutputValueClass(IntWritable.class );

而後設置Job處理的Map(拆分)、Combiner(中間結果合併)以及Reduce(合併)的相關處理類。這裏用Reduce類來進行Map產生的中間結果合併。避免給網絡傳輸數據產生壓力。
conf.setMapperClass(Map.class );
conf.setCombinerClass(Reduce.class );
conf.setReducerClass(Reduce.class );

接着就是調用setInputPath()和setOutputPath()設置輸入輸出路徑。
conf.setInputFormat(TextInputFormat.class );
conf.setOutputFormat(TextOutputFormat.class );

總結:

        不論什麼技術都是一種思想的體現,而這個世界。咱們最主要的一個算法就是分治法。這是咱們拿在手裏的一本百科全書,差點兒可以解決咱們80%的問題。而性能的問題尤爲如此,咱們通過了幾百萬年的演變,咱們成爲了地球上的強大智慧生物,咱們自己就具備幾百萬年延續本身生命的強大競爭力。及咱們幾千年文明的積澱。咱們現在遇到的問題,前人用文字書寫在書上,咱們必定可以找到。或者咱們現在的生活,這個社會,也必定有這個問題的縮影。

相關文章
相關標籤/搜索