大數據第三課-WordCount 本地運行和集羣運行

1、MapReduce編程思想

mapReduce編程模型的總結:java

MapReduce的開發一共有八個步驟其中map階段分爲2個步驟,shuffle階段4個步驟,reduce階段分爲2個步驟node

Map階段2個步驟

第一步:設置inputFormat類,將咱們的數據切分紅keyvalue對,輸入到第二步apache

第二步:自定義map邏輯,處理咱們第一步的輸入數據,而後轉換成新的keyvalue對進行輸出編程

shuffle階段4個步驟

第三步:對輸出的keyvalue對進行分區。相同key的數據發送到同一個reduce裏面去,相同key合併,value造成一個集合windows

第四步:對不一樣分區的數據按照相同的key進行排序網絡

第五步:對分組後的數據進行規約(combine操做),下降數據的網絡拷貝(可選步驟)app

第六步:對排序後的額數據進行分組,分組的過程當中,將相同keyvalue放到一個集合當中maven

reduce階段2個步驟

第七步:對多個map的任務進行合併,排序,寫reduce函數本身的邏輯,對輸入的keyvalue對進行處理,轉換成新的keyvalue對進行輸出ide

第八步:設置outputformat將輸出的keyvalue對數據進行保存到文件中函數

2、Hadoop中的基本數據類型

 

hadoop沒有沿用java當中基本的數據類型,而是本身進行封裝了一套數據類型,其本身封裝的類型與java的類型對應以下

Java類型

Hadoop Writable類型

Boolean

BooleanWritable

Byte

ByteWritable

Int

IntWritable

Float

FloatWritable

Long

LongWritable

Double

DoubleWritable

String

Text

Map

MapWritable

Array

ArrayWritable

byte[]

BytesWritable

 

3、單詞統計的本地運行實現(要在windows下配置了hadoop環境)

一、添加須要的依賴包(pom.xml文件):添加如下內容

 

    <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0-mr1-cdh5.14.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.0-cdh5.14.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.6.0-cdh5.14.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.6.0-cdh5.14.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.testng</groupId>
            <artifactId>testng</artifactId>
            <version>7.0.0</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                    <!--   <verbal>true</verbal>-->
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <minimizeJar>true</minimizeJar>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

 

二、建立如下三個文件

 

 三、編寫MyMapper類

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

/**
 * 自定義mapper類須要繼承Mapper,有四個泛型,
 * keyin: k1   行偏移量 Long
 * valuein: v1   一行文本內容   String
 * keyout: k2   每個單詞   String
 * valueout : v2   1         int
 * 在hadoop當中沒有沿用Java的一些基本類型,使用本身封裝了一套基本類型
 * long ==>LongWritable
 * String ==> Text
 * int ==> IntWritable
 *
 */
public class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
    /**
     * 繼承mapper以後,覆寫map方法,每次讀取一行數據,都會來調用一下map方法
     * @param key:對應k1
     * @param value:對應v1
     * @param context 上下文對象。承上啓下,承接上面步驟發過來的數據,經過context將數據發送到下面的步驟裏面去
     * @throws IOException
     * @throws InterruptedException
     * k1   v1
     * 0;hello,world
     *
     * k2 v2
     * hello 1
     * world   1
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //獲取咱們的一行數據
        String line = value.toString();
        String[] split = line.split(",");
        Text text = new Text();
        IntWritable intWritable = new IntWritable(1);
        for (String word : split) {
            //將每一個單詞出現都記作1次
            //key2 Text類型
            //v2 IntWritable類型
            text.set(word);
            //將咱們的key2 v2寫出去到下游
            context.write(text,intWritable);
        }
    }
}

 四、編寫MyReduce類

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    //第三步:分區   相同key的數據發送到同一個reduce裏面去,相同key合併,value造成一個集合
    /**
     * 繼承Reducer類以後,覆寫reduce方法
     * @param key
     * @param values
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int result = 0;
        for (IntWritable value : values) {
            //將咱們的結果進行累加
            result += value.get();
        }
        //繼續輸出咱們的數據
        IntWritable intWritable = new IntWritable(result);
        //將咱們的數據輸出
        context.write(key,intWritable);
    }
}

 五、編寫WordCount類

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/*
這個類做爲mr程序的入口類,這裏面寫main方法
 */
public class WordCount extends Configured implements Tool {
    /**
     * 實現Tool接口以後,須要實現一個run方法,
     * 這個run方法用於組裝咱們的程序的邏輯,其實就是組裝八個步驟
     *
     * @param args
     * @return
     * @throws Exception
     */
    @Override
    public int run(String[] args) throws Exception {
        //獲取Job對象,組裝咱們的八個步驟,每個步驟都是一個class類
        Configuration conf = super.getConf();
        Job job = Job.getInstance(conf, "mrdemo1");

        //實際工做當中,程序運行完成以後通常都是打包到集羣上面去運行,打成一個jar包
        //若是要打包到集羣上面去運行,必須添加如下設置
        job.setJarByClass(WordCount.class);

        //第一步:讀取文件,解析成key,value對,k1:行偏移量 v1:一行文本內容
        job.setInputFormatClass(TextInputFormat.class);
        //指定咱們去哪個路徑讀取文件
        TextInputFormat.addInputPath(job, new Path("file:///E:\\BigDataCode\\Java\\data\\wc\\input"));
        //第二步:自定義map邏輯,接受k1   v1 轉換成爲新的k2   v2輸出
        job.setMapperClass(MyMapper.class);
        //設置map階段輸出的key,value的類型,其實就是k2 v2的類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //第三步到六步:分區,排序,規約,分組都省略
        //第七步:自定義reduce邏輯
        job.setReducerClass(MyReducer.class);
        //設置key3 value3的類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //第八步:輸出k3 v3 進行保存
        job.setOutputFormatClass(TextOutputFormat.class);
        //必定要注意,輸出路徑是須要不存在的,若是存在就報錯
        TextOutputFormat.setOutputPath(job, new Path("file:///E:\\BigDataCode\\Java\\data\\wc\\out_result"));
        //提交job任務
        boolean b = job.waitForCompletion(true);
        return b ? 0 : 1;
        /***
         * 第一步:讀取文件,解析成key,value對,k1   v1
         * 第二步:自定義map邏輯,接受k1   v1 轉換成爲新的k2   v2輸出
         * 第三步:分區。相同key的數據發送到同一個reduce裏面去,key合併,value造成一個集合
         * 第四步:排序   對key2進行排序。字典順序排序
         * 第五步:規約 combiner過程 調優步驟 可選
         * 第六步:分組
         * 第七步:自定義reduce邏輯接受k2   v2 轉換成爲新的k3   v3輸出
         * 第八步:輸出k3 v3 進行保存
         *
         *
         */
    }

    /*
    做爲程序的入口類
      */
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set("hello", "world");
        //提交run方法以後,獲得一個程序的退出狀態碼
        int run = ToolRunner.run(configuration, new WordCount(), args);
        //根據咱們 程序的退出狀態碼,退出整個進程
        System.exit(run);
    }

}

4、單詞統計的集羣運行實現(打jar包)

一、將WordCount類中的路徑更改爲集羣上的路徑

 

 二、打  jar 包

 

 

 

三、拷貝jar包到集羣上

 

 

 

 四、運行jar包,執行如下語句(後面那個com.yyy.wordcount.WordCount路徑根據本身的設置:能夠在本身IDEA中的文件上右鍵->Copy Reference)

 yarn jar yarn-1.0-SNAPSHOT.jar com.yyy.wordcount.WordCount

五、查看運行  http://node01:8088/cluster

 

六、查看運行結果

 

能夠下載下來看一下對不對。

而後結束。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。

相關文章
相關標籤/搜索