12做業MapReduce

1.mapreduce定義和優缺點?
MapReduce是一個分佈式運算程序的編程框架,是用戶開發「基於Hadoop的數據分析應用」的核心框架
MapReduce核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分佈式運算程序,併發行在一個Hadoop集羣上。
優勢:
1).MapReduce易於編程
它簡單的實現一些接口,就能夠完成一個分佈式程序,這個分佈式程序能夠分佈到大量廉價的PC機器上運行。
2).良好的擴展性
當你的計算資源不能獲得知足的時候,你能夠經過簡單的增長機器來擴展它的計算能力。
3).高容錯性
其中一臺機器掛了,它能夠把上面的計算任務轉移到另外一個節點上運行,不至於這個任務運行失敗,並且這個過程不須要人工參與,而徹底是由Hadoop內部完成的。
4).適合PB級以上海量數據的離線處理
能夠實現上千臺服務器集羣併發工做,提供數據處理能力。
缺點
1)不擅長實時計算
MapReduce沒法像Mysql同樣,在毫秒或秒級返回結果。
2)不擅長流式計算
流式計算的輸入數據是動態的,而MapReduce的輸入數據是靜態的,不能動態變化。這是由於MapReduce自身的設計特色決定了數據源必須是靜態的。
3)不擅長DAG計算
多個應用程序存在依賴關係,後一個應用程序的輸入爲前一個的輸出。在這種狀況下,MapReduce並非不能作,而是使用後,每個MapReduce做業的輸出結果都會寫入磁盤,會形成大量的磁盤IO,致使性能很是低下。
2.mapreduce的數據類型
Java類型    Hadoop Writable類型
Boolean        BooleanWritable
Byte        ByteWritable
Int            IntWritable
Float        FloatWritable
Long        LongWritable
Double        DoubleWritable
String        Text
Map            MapWritable
Array        ArrayWritable
3.查看官方的wordcount代碼樣例
    package com.huawei.hdfs;
     
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
     
    import java.io.IOException;
     
    public class HWMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
     
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            Text keyout=new Text();
            IntWritable valueout=new IntWritable();
            //以空格隔斷
            String[] arr=value.toString().split(" ");//用空格分開
            for(String s: arr){
                keyout.set(s);
                valueout.set(1);
                context.write(keyout,valueout);
            }
     
        }
    }

重寫reducer

    package com.huawei.hdfs;
     
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
     
    import java.io.IOException;
     
     
    public class HWReducer extends org.apache.hadoop.mapreduce.Reducer<Text,IntWritable,Text,IntWritable> {
     
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int count =0;
            for(IntWritable iw:values){
                count+=iw.get();
            }
            context.write(key,new IntWritable(count));
        }
    }

編寫main函數

    package com.huawei.hdfs;
     
     
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     
     
    import java.io.IOException;
     
    public class HWAPP {
        public static void main(String[] args) throws Exception {
            Configuration conf=new Configuration();
            conf.set("fs.defaultFS","file:///");//本地須要這個,集羣須要將這個註釋掉
            Job job =Job.getInstance(conf);
     
            job.setJobName("HWAPP");                        //設置job名稱
     
     
            job.setInputFormatClass(TextInputFormat.class);//設置輸入格式
            FileInputFormat.addInputPath(job,new Path(args[0])); //設置輸入路徑
            FileOutputFormat.setOutputPath(job,new Path(args[1]));//設置輸出路徑
     
            job.setJarByClass(HWAPP.class);                //設置執行的class文件
            job.setMapperClass(HWMapper.class);
            job.setReducerClass(HWReducer.class);
     
     
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
     
            job.setNumReduceTasks(1);                       //設置reduce的個數
            job.setOutputKeyClass(Text.class);              //設置輸出的key格式
            job.setOutputValueClass(IntWritable.class);     //設置輸出的value格式
            job.waitForCompletion(false);
        }
    }
4.本身實現wordcount代碼
package com.jinghang.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
/**
 * KEYIN:LongWritable(偏移量)
 * VALUEIN:Text  (文本中每一行的內容)
 * KEYOUT: Text (某一個單詞做爲key)
 * VALUEOUT:IntWritable (單詞出現的個數)
 */
public class WcMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
  //map輸出的key值
    private Text keyText= new Text();
    //map輸出的value值;
    private IntWritable one = new IntWritable(1);
    @Override
    protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {
        //        super.map(key, value, context);
        //獲取文件中的行數據
        String line = value.toString();
        String[] fileds = line.split(" ");
        //分割字符串(根據空格分割字符串)
        for(String filed : fileds){
            keyText.set(filed);
            context.write(keyText,one);
        }
    }



}


package com.jinghang.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * KEYIN: Text map端的輸出的key做爲reduce的輸入key
 * VALUEIN: IntWritable map端的輸出做爲reduce的輸入value
 * KEYOUT: Text 以單詞做爲輸出的key值
 * VALUEOUT:IntWritable 統計單詞出現的總數,做爲輸出的value
 */
public class WcReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable total = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//        super.reduce(key, values, context);
        int sum = 0; //統計單詞出現的總數 {Text("hadoop"),IntWriter(1),Text("hadoop"),IntWriter(1),Text("hadoop"),IntWriter(1)}
        for (IntWritable value : values) {
            //累加,統計單詞出現總次數
            sum += value.get();
        }
        total.set(sum);
        //key,value 寫入到上下文中(context)
        context.write(key,total);
    }


}


package com.jinghang.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WcDriver {
    public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
        //獲取一個job實例
        Job job = Job.getInstance(new Configuration());

        //設置本程序的jar包類的路徑
        job.setJarByClass(WcDriver.class);

        //設置map類和reduce類
        job.setMapperClass(WcMapper.class);
        job.setReducerClass(WcReducer.class);

        //設置map輸出的key和value類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //設置Reduce的輸出的key和value類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //設置處理文本的輸入和輸出路徑
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        //提交job任務
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0:1);
    }
}java

相關文章
相關標籤/搜索