大數據學習(二)-------- MapReduce

前提已經安裝好hadoop的hdfs集羣,能夠查看html

https://www.cnblogs.com/tree1123/p/10683570.htmlnode

Mapreduce是hadoop的運算框架,能夠對hdfs中的數據分開進行計算,先執行不少maptask,在執行reducetask,這個過程當中任務的執行須要一個任務調度的平臺,就是yarn。windows

1、安裝YARN集羣app

yarn集羣中有兩個角色:框架

主節點:Resource Manager  1臺分佈式

從節點:Node Manager   N臺ide

 

Resource Manager通常安裝在一臺專門的機器上oop

Node Manager應該與HDFS中的data node重疊在一塊兒code

修改配置文件:yarn-site.xmlorm

<property>
<name>yarn.resourcemanager.hostname</name>
<value>主機名</value>
</property>

<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>

<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>2048</value>
</property>

<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>2</value>
</property>

而後scp到全部機器,修改主節點hadoop的slaves文件,列入要啓動nodemanager的機器,配好免密

而後,就能夠用腳本啓動yarn集羣:

sbin/start-yarn.sh

中止:

sbin/stop-yarn.sh

頁面:http://主節點:8088 看看node manager節點是否識別

開發一個提交job到yarn的客戶端類,mapreduce全部jar和自定義類,打成jar包上傳到hadoop集羣中的任意一臺機器上,運行jar包中的(YARN客戶端類

hadoop jar ......JobSubmitter

2、開發mapreduce程序
注意理解分而治之的思想,先進行map:映射,對應,個數不變。 reduce:化簡,合併,將一系列數據,化簡爲一個值。

主要須要開發:

map階段的進、出數據,

reduce階段的進、出數據,

類型都應該是實現了HADOOP序列化框架的類型,如:

String對應Text

Integer對應IntWritable

Long對應LongWritable

例子wordcount代碼:

WordcountMapper

public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        // 切單詞
        String line = value.toString();
        String[] words = line.split(" ");
        for(String word:words){
            context.write(new Text(word), new IntWritable(1));
            
        }
    }
}

WordcountReducer

public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    
    
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
    
        
        int count = 0;
        
        Iterator<IntWritable> iterator = values.iterator();
        while(iterator.hasNext()){
            
            IntWritable value = iterator.next();
            count += value.get();
        }
        
        context.write(key, new IntWritable(count));
        
    }

}







public class JobSubmitter {
    
    public static void main(String[] args) throws Exception {
        
        // 在代碼中設置JVM系統參數,用於給job對象來獲取訪問HDFS的用戶身份
        System.setProperty("HADOOP_USER_NAME", "root");
        
        
        Configuration conf = new Configuration();
        // 一、設置job運行時要訪問的默認文件系統
        conf.set("fs.defaultFS", "hdfs://hdp-01:9000");
        // 二、設置job提交到哪去運行
        conf.set("mapreduce.framework.name", "yarn");
        conf.set("yarn.resourcemanager.hostname", "hdp-01");
        // 三、若是要從windows系統上運行這個job提交客戶端程序,則須要加這個跨平臺提交的參數
        conf.set("mapreduce.app-submission.cross-platform","true");
        
        Job job = Job.getInstance(conf);
        
        // 一、封裝參數:jar包所在的位置
        job.setJar("d:/wc.jar");
        //job.setJarByClass(JobSubmitter.class);
        
        // 二、封裝參數: 本次job所要調用的Mapper實現類、Reducer實現類
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);
        
        // 三、封裝參數:本次job的Mapper實現類、Reducer實現類產生的結果數據的key、value類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        
        
        Path output = new Path("/wordcount/output");
        FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"),conf,"root");
        if(fs.exists(output)){
            fs.delete(output, true);
        }
        
        // 四、封裝參數:本次job要處理的輸入數據集所在路徑、最終結果的輸出路徑
        FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
        FileOutputFormat.setOutputPath(job, output);  // 注意:輸出路徑必須不存在
        
        
        // 五、封裝參數:想要啓動的reduce task的數量
        job.setNumReduceTasks(2);
        
        // 六、提交job給yarn
        boolean res = job.waitForCompletion(true);
        
        System.exit(res?0:-1);
        
    }
    
    

}

MR還有一些高級的用法:自定義類型,自定義Partitioner,Combiner,排序,倒排索引,自定義GroupingComparator

3、mapreduce與yarn的核心機制

yarn是一個分佈式程序的運行調度平臺

yarn中有兩大核心角色:

一、Resource Manager

接受用戶提交的分佈式計算程序,併爲其劃分資源

管理、監控各個Node Manager上的資源狀況,以便於均衡負載

 

二、Node Manager

管理它所在機器的運算資源(cpu + 內存)

負責接受Resource Manager分配的任務,建立容器、回收資源

Mapreduce工做機制:

劃分輸入切片——》 環形緩衝區 ——》 分區排序 ——》Combiner 局部聚合——》shuffle ——》GroupingComparator——》輸出

相關文章
相關標籤/搜索