2018年第25周-Hadoop的MR程序-天氣數據分析

天氣數據

經過ftp從ftp.ncdc.noaa.gov地址獲取/pub/data/noaa/目錄下的天氣數據,我寫了個ftp抓取程序去抓取,如1950的天氣數據摘錄以下:java

0171999999939931950010100005+36000-094167SAO  +038399999V02015859001550042749N008000599+01174+01065102474ADDGF108995999999999999999999MA1999999097934MW1455EQDN01 07000JPWTH 1QNNG11 1 00014K11 1 00051L11 1 00500M11 1 28920N11 1 07000Q11 1 10247S11 1 00053V11 1 01099W11 1 54003
0165999999939931950010101005+36000-094167SAO  +038399999V02015859002650036649N011200599+01174+01005102474ADDGF108995999999999999999999MA1999999097934EQDN01 00000JPWTH 1QNNG11 1 00012K11 1 00050L11 1 00700M11 1 28920N11 1 00000Q11 1 10247S11 1 00053V11 1 01099W11 1 54005
0165999999939931950010102005+36000-094167SAO  +038399999V02022559002150036649N011200599+01224+01005102474ADDGF108995999999999999999999MA1999999097934EQDN01 00000JPWTH 1QNNG11 1 00012K11 1 00050L11 1 00700M11 1 28920N11 1 00000Q11 1 10247S11 1 00054V11 1 01099W11 1 66004
0171999999939931950010103005+36000-094167SAO  +038399999V02022559002150024449N008000599+01224+01005102474ADDGF108995999999999999999999MA1999999097934MW1455EQDN01 07000JPWTH 1QNNG11 1 00008K11 1 00050L11 1 00500M11 1 28920N11 1 07000Q11 1 10247S11 1 00054V11 1 01099W11 1 66004
0171999999939931950010104005+36000-094167SAO  +038399999V02022559002650021349N008000599+01174+01005102474ADDGF108995999999999999999999MA1999999097934MW1455EQDN01 07000JPWTH 1QNNG11 1 00007K11 1 00050L11 1 00500M11 1 28920N11 1 07000Q11 1 10247S11 1 00053V11 1 01099W11 1 66005
0171999999939931950010105005+36000-094167SAO  +038399999V02020359003150018349N008000599+01174+01065102444ADDGF108995999999999999999999MA1999999097904MW1455EQDN01 07000JPWTH 1QNNG11 1 00006K11 1 00051L11 1 00500M11 1 28910N11 1 07000Q11 1 10244S11 1 00053V11 1 01099W11 1 56006
0171999999939931950010106005+36000-094167SAO  +038399999V02022559001550018349N008000599+01174+01065102444ADDGF108995999999999999999999MA1999999097904MW1455EQDN01 07000JPWTH 1QNNG11 1 00006K11 1 00051L11 1 00500M11 1 28910N11 1 07000Q11 1 10244S11 1 00053V11 1 01099W11 1 66003
0171999999939931950010107005+36000-094167SAO  +038399999V02020359002650012249N004800599+01174+01065102404ADDGF108995999999999999999999MA1999999097874MW1455EQDN01 07000JPWTH 1QNNG11 1 00004K11 1 00051L11 1 00300M11 1 28900N11 1 07000Q11 1 10240S11 1 00053V11 1 01099W11 1 56005
0177999999939931950010108005+36000-094167SAO  +038399999V02027059001050003049N001600599+01174+01065102474ADDGF108995999999999999999999MA1999999097934MW1455MW2515EQDN01 03370JPWTH 1QNNG11 1 00001K11 1 00051L11 1 00100M11 1 28920N11 1 03370Q11 1 10247S11 1 00053V11 1 01099W11 1 77002
0177999999939931950010109005+36000-094167SAO  +038399999V02018059000550003049N001600599+01114+01115102444ADDGF108995999999999999999999MA1999999097904MW1455MW2515EQDN01 03370JPWTH 1QNNG11 1 00001K11 1 00052L11 1 00100M11 1 28910N11 1 03370Q11 1 10244S11 1 00052V11 1 01099W11 1 55001
0177999999939931950010110005+36000-094167SAO  +038399999V02015859001050000049N000000599+01064+01065102374ADDGF108995999999999999999999MA1999999097834MW1455MW2515EQDN01 03370JPWTH 1QNNG11 1 00000K11 1 00051L11 1 00000M11 1 28890N11 1 03370Q11 1 10237S11 1 00051V11 1 01099W11 1 54002
0177999999939931950010111005+36000-094167SAO  +038399999V02013559001550000049N000000599+01064+01065102344ADDGF108995999999999999999999MA1999999097834MW1455MW2515EQDN01 03370JPWTH 1QNNG11 1 00000K11 1 00051L11 1 00000M11 1 28890N11 1 03370Q11 1 10234S11 1 00051V11 1 01099W11 1 44003
0171999999939931950010112005+36000-094167SAO  +038399999V02018059001550003049N001600599+01114+01115102374ADDGF108995999999999999999999MA1999999097834MW1455EQDN01 07000JPWTH 1QNNG11 1 00001K11 1 00052L11 1 00100M11 1 28890N11 1 07000Q11 1 10237S11 1 00052V11 1 01099W11 1 55003
0171999999939931950010113005+36000-094167SAO  +038399999V02022559001550012249N003200599+01174+01115102404ADDGF108995999999999999999999MA1999999097874MW1455EQDN01 07000JPWTH 1QNNG11 1 00004K11 1 00052L11 1 00200M11 1 28900N11 1 07000Q11 1 10240S11 1 00053V11 1 01099W11 1 66003
0171999999939931950010114005+36000-094167SAO  +038399999V02099999000050012249N003200599+01224+01115102474ADDGF108995999999999999999999MA1999999097934MW1455EQDN01 07000JPWTH 1QNNG11 1 00004K11 1 00052L11 1 00200M11 1 28920N11 1 07000Q11 1 10247S11 1 00054V11 1 01099W11 1 00000

其實抓取來的數據是這樣node

[root@amd data]# tree |more
.
└── noaa
    ├── 1901
    │   ├── 029070-99999-1901.gz
    │   ├── 029500-99999-1901.gz
    │   ├── 029600-99999-1901.gz
    │   ├── 029720-99999-1901.gz
    │   ├── 029810-99999-1901.gz
    │   └── 227070-99999-1901.gz
    ├── 1902
    │   ├── 029070-99999-1902.gz
    │   ├── 029500-99999-1902.gz
    │   ├── 029600-99999-1902.gz
    │   ├── 029720-99999-1902.gz
    │   ├── 029810-99999-1902.gz
    │   └── 227070-99999-1902.gz

因此我附帶個shell腳本將全部數據都變成一個txt文件
zcat_all.shgit

#!/bin/bash
for file in ./data/noaa/*
do
    if test -f $file
    then
        echo $file 是文件
    fi
    if test -d $file
    then
        beginSecond=$(date "+%s") 
        echo 正在遍歷$file
        
        for gzfile in $file/*
        do
               zcat $gzfile >> all.txt
        done
        endTime=`date +%Y%m%d%H%M%S` 
        endSecond=$(date "+%s") 
        
        echo $endTime  $file遍歷完成  cost $[endSecond-beginSecond] s
    fi
done

這樣咱們就有一個文件all.txt包含一個世紀的天氣數據。接下來咱們就開始寫mapreduce(MR)程序去獲取每一年的最高氣溫。shell

得到最高氣溫的mapreduce(MR)程序

通常寫MR程序,都以小量的數據在本地測試經過,再在放在集羣上跑,否則在集羣上測試,很浪費時間。apache

項目的pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.jc.demo</groupId>
    <artifactId>jc-demo-hadoop</artifactId>
    <version>0.0.1.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <name>oneapp-archetype-test</name>
    <url>http://www.jevoncode.com</url>




    <repositories>
        <repository>
            <id>aliyun repository</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>aliyun plugin repository</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
        </pluginRepository>
    </pluginRepositories>
    <properties>
        <!-- Every File in Project Enconding -->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

        <!-- Compiling Time Enconding -->
        <maven.compiler.encoding>UTF-8</maven.compiler.encoding>

        <!-- Compiling Time JDK Version -->
        <java.version>1.7</java.version>

        <!-- Test -->
        <junit.version>4.12</junit.version>


        <!-- Logging -->
        <slf4j.version>1.7.21</slf4j.version>
        <logback.version>1.1.7</logback.version>


    </properties>


    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.hamcrest/hamcrest-all -->
        <dependency>
            <groupId>org.hamcrest</groupId>
            <artifactId>hamcrest-all</artifactId>
            <version>1.3</version>
            <scope>test</scope>
        </dependency>

        <!-- Log依賴 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <!-- logback -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>${logback.version}</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>${logback.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
        </dependency>


        <!-- mapreduce測試工具包-->
        <dependency>
            <groupId>org.apache.mrunit</groupId>
            <artifactId>mrunit</artifactId>
            <version>1.1.0</version>
            <classifier>hadoop2</classifier>
            <scope>test</scope>
        </dependency>


    </dependencies>

    <!-- 分環境打包 -->
    <profiles>
        <profile>
            <!-- 打包命令命令 mvn package -Pdev -->
            <id>dev</id> <!-- 開發環境 -->
            <properties>
                <env>development</env>
            </properties>
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
        </profile>
        <profile>
            <!-- 打包命令命令 mvn package -Ptest -->
            <id>test</id> <!-- 測試環境 -->
            <properties>
                <env>test</env>
            </properties>
        </profile>
        <profile>
            <!-- 打包命令命令 mvn package -Pprod -->
            <id>prod</id> <!-- 生產環境 -->
            <properties>
                <env>prod</env>
            </properties>
        </profile>
    </profiles>

    <build>
        <finalName>${project.artifactId}-${project.version}-${env}</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptor>src/main/assembly/dep.xml</descriptor>
                </configuration>
                <executions>
                    <execution>
                        <id>create-archive</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
        <filters>
            <filter>src/main/environment/${env}.properties</filter>
        </filters>
        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.ftl</include>
                    <include>**/*.xml</include>
                </includes>
                <!-- 是否替換資源中的屬性 -->
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.xml</include>
                </includes>
                <!-- 是否替換資源中的屬性 -->
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>system.properties</include>
                </includes>
                <!-- 是否替換資源中的屬性 -->
                <filtering>true</filtering>
            </resource>
        </resources>

    </build>
</project>

編寫mapper程序

天氣數據解析類

package com.jc.demo.hadoop.yarn.mr;

import org.apache.hadoop.io.Text;

public class NcdcRecordParser {

    private static final int MISSING_TEMPERATURE=9999;
    private String year;
    private int airTemperature;
    private String quality;

    public void parse(String record){
        year = record.substring(15,19);
        String airTemperatureString;
        if(record.charAt(87) == '+'){
            airTemperatureString = record.substring(88,92);
        }else {
            airTemperatureString = record.substring(87,92);
        }
        airTemperature = Integer.parseInt(airTemperatureString);
        quality = record.substring(92,93);
    }

    public void parse(Text record){
        parse(record.toString());
    }

    public boolean isValidTemperature(){
        return airTemperature != MISSING_TEMPERATURE && quality.matches("[01459]");
    }

    public String getYear() {
        return year;
    }

    public int getAirTemperature() {
        return airTemperature;
    }
}

mapper程序

package com.jc.demo.hadoop.yarn.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private NcdcRecordParser parser = new NcdcRecordParser();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//        String line = value.toString();
//        String year = line.substring(15, 19);
//        int airTemperature = Integer.parseInt(line.substring(87, 92));
//        context.write(new Text(year), new IntWritable(airTemperature));
        parser.parse(value);
        if(parser.isValidTemperature()){
            context.write(new Text(parser.getYear()),new IntWritable(parser.getAirTemperature()));
        }
    }
}

mapper單元測試類

package com.jc.demo.hadoop.yarn.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.Test;

import java.io.IOException;

public class MaxTemperatureMapperTest {

    @Test
    public void processesValidRecord() throws IOException, InterruptedException {
        Text value = new Text("0097999999949101950010121005+42550-092400SAO  +026599999V02022559004650030549N004800599+00724+00395999999ADDMW1055EQDN01" +
                " 08100JPWTH 1QNNG11 1 00010K11 1 00039L11 1 00300N11 1 08100S11 1 00045W11 1 66009");
        new MapDriver<LongWritable, Text, Text, IntWritable>()//
                .withMapper(new MaxTemperatureMapper())//
                .withInput(new LongWritable(0), value)//
                .withOutput(new Text("1950"), new IntWritable(72))      //輸出結果1950年1月1日21:00的氣溫爲72華氏度
                .runTest();
    }

    @Test
    public void igoresMissingTemperatureRecord() throws IOException, InterruptedException {
        Text value = new Text("0036999999949101950030716005+42550-092400SAO  +026599999V02015859008850228649N016000599+99999+99999999999QNNG11 1 00075L" +
                "11 1 01000W11 1 54017");
        new MapDriver<LongWritable, Text, Text, IntWritable>()//
                .withMapper(new MaxTemperatureMapper())//
                .withInput(new LongWritable(0), value)//
//                .withOutput(new Text("1950"), new IntWritable(9999))      //不該該有輸出值,由於此記錄是無效的
                .runTest();
    }
}

編寫reducer程序

編寫reducer程序

package com.jc.demo.hadoop.yarn.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MaxTemperatureReducer extends Reducer<Text,IntWritable,Text,IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int maxValue = Integer.MIN_VALUE;
        for(IntWritable value:values){
            maxValue = Math.max(maxValue,value.get());
        }
        context.write(key,new IntWritable(maxValue));
    }
}

編寫reducer單元測試類

package com.jc.demo.hadoop.yarn.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Test;

import java.io.IOException;
import java.util.Arrays;

public class MaxTemperatureReducerTest {

    @Test
    public void returnsMaximuumIntegerInValues() throws IOException {
        new ReduceDriver<Text, IntWritable, Text, IntWritable>()//
                .withReducer(new MaxTemperatureReducer())//
                .withInput(new Text("1950"), Arrays.asList(new IntWritable(10), new IntWritable(5), new IntWritable(34)))//
                .withOutput(new Text("1950"), new IntWritable(34))//
                .runTest();
    }
}

編寫Driver程序

因爲Hadoop有不少配置,因此須要藉助ToolRun來啓動MR程序,這個工具裏會調用GenericOptionsParser處理好全部的配置。啓動的時候你能夠指定或不指定配置都行。api

編寫Driver程序

package com.jc.demo.hadoop.yarn.mr;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 在命令行啓動時,如:
 * hadoop jar jc-demo-hadoop-0.0.1.0-SNAPSHOT-development.jar com.jc.demo.hadoop.yarn.mr.MaxTemperatureDriver -conf hadoop-cluster.xml /user/jevoncode/input/all.txt max-temp
 * -conf就能夠動態配置參數,默認則從$HADOOP_HOME/etc/hadoop裏取配置
 */
public class MaxTemperatureDriver extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.printf("Usage: %s [generic options] <input> <output> \n", getClass().getSimpleName());
            ToolRunner.printGenericCommandUsage(System.err);
            return -1;
        }


        Job job = Job.getInstance(getConf(), "Max temperature");
        job.setJarByClass(getClass());

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(MaxTemperatureMapper.class);
        job.setCombinerClass(MaxTemperatureReducer.class);
        job.setReducerClass(MaxTemperatureReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args);
        System.exit(exitCode);
    }
}

編寫Driver的單元測試類

package com.jc.demo.hadoop.yarn.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;

import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;

public class MaxTemperatureDriverTest {

    @Test
    public void tets() throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","file:///");
        conf.set("mapreduce.framework.name","local");
        conf.setInt("mapreduce.task.io.sort.mb",1);

        Path input = new Path("src/test/resources/mr/ncdc/input/all.txt");
        Path ouput = new Path("src/test/resources/mr/ncdc/output");

        FileSystem fs = FileSystem.getLocal(conf);
        fs.delete(ouput,true);                              //delete old output

        MaxTemperatureDriver driver = new MaxTemperatureDriver();
        driver.setConf(conf);

        int exitCode = driver.run(new String[]{input.toString(),ouput.toString()});
        assertThat(exitCode,is(0));

    }
}

在這裏,咱們用小量的數據all.txt來測試,本文末尾會附帶這個測試數據。
最後結果會生成一個part-r-00000文件,內容是:bash

1950    350

運行MR程序

在編寫和測試玩MR程序後,就能夠打包程序(jc-demo-hadoop-0.0.1.0-SNAPSHOT-development.jar),在機子上跑了,有兩種種方法跑:服務器

本地執行

其實使用hadoop來執行本地文件,用了hadoop,但沒有用hadoop集羣,不會有application存在),all.txt都是本地目錄的文件,運行完就會建立一個max-temp保存結果
配置文件hadoop-local.xml以下:app

<?xml version="1.0" ?>
<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>file:///</value>
    </property>

    <property>
        <name>mapreduce.framework.name</name>
        <value>local</value>
    </property>

</configuration>
[cdata@s1 ~]$ ll
total 26480
-rw-r--r-- 1 cdata cdata  2385449 Jun 24 00:30 all.txt
-rw-r--r-- 1 cdata cdata     1482 Jun 24 00:26 hadoop-cluster.xml
-rw-r--r-- 1 cdata cdata      388 Jun 24 00:26 hadoop-localhost.xml
-rw-r--r-- 1 cdata cdata      260 Jun 24 00:26 hadoop-local.xml
-rw-r--r-- 1 cdata cdata    17655 Jun 24 00:24 jc-demo-hadoop-0.0.1.0-SNAPSHOT-development.jar
hadoop jar jc-demo-hadoop-0.0.1.0-SNAPSHOT-development.jar com.jc.demo.hadoop.yarn.mr.MaxTemperatureDriver -conf hadoop-local.xml all.txt max-temp
[cdata@s1 ~]$ ls -al max-temp/
total 16
drwxrwxr-x 2 cdata cdata   84 Jun 24 00:31 .
drwx------ 5 cdata cdata 4096 Jun 24 00:31 ..
-rw-r--r-- 1 cdata cdata    9 Jun 24 00:31 part-r-00000
-rw-r--r-- 1 cdata cdata   12 Jun 24 00:31 .part-r-00000.crc
-rw-r--r-- 1 cdata cdata    0 Jun 24 00:31 _SUCCESS
-rw-r--r-- 1 cdata cdata    8 Jun 24 00:31 ._SUCCESS.crc

單個虛擬機,在s1服務器上運行了快一個星期,才處理了93784637440+33554432字節(Byte)的數據量(87G),文件總大小爲279970000130(260G).maven

集羣運行

這個就會生成一個application了,並且還作了高可用的處理(除YARN以外)
配置文件hadoop-cluster.xml以下:

<?xml version="1.0" ?>
<configuration>
    <!--用來指定hdfs的集羣名,ns爲固定屬性名,表示兩個namenode-->
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://ns</value>
    </property>
    <!--執行hdfs的nameservice爲ns,和core-site.xml保持一致-->
    <property>
        <name>dfs.nameservices</name>
        <value>ns</value>
    </property>
    <!--定義ns下有兩個namenode,分別是nn1,nn2,而後再具體定義n1和n2-->
    <property>
        <name>dfs.ha.namenodes.ns</name>
        <value>nn1,nn2</value>
    </property>
    <!--nn1的RPC通訊地址-->
    <property>
        <name>dfs.namenode.rpc-address.ns.nn1</name>
        <value>s1.jevoncode.com:9000</value>
    </property>
    <!--nn1的http通訊地址-->
    <property>
        <name>dfs.namenode.http-address.ns.nn1</name>
        <value>s1.jevoncode.com:50070</value>
    </property>
    <!--nn2的RPC通訊地址-->
    <property>
        <name>dfs.namenode.rpc-address.ns.nn2</name>
        <value>s2.jevoncode.com:9000</value>
    </property>
    <!--nn2的http通訊地址-->
    <property>
        <name>dfs.namenode.http-address.ns.nn2</name>
        <value>s2.jevoncode.com:50070</value>
    </property>

    <!-- 需配置HA才能使用hdfs://ns,不然拋unknownHost異常-->
    <!--開啓namenode故障時自動切換-->
    <property>
        <name>dfs.ha.automatic-failover.enabled</name>
        <value>true</value>
    </property>
    <!--配置切換的實現方式-->
    <property>
        <name>dfs.client.failover.proxy.provider.ns</name>
        <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
    </property>



    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>

    <property>
        <name>yarn.resourcemanager.address</name>
        <value>s1.jevoncode.com:8032</value>
    </property>

</configuration>

運行狀況以下(需將文件上傳到HDFS,而後執行命令hadoop jar運行):

[cdata@s1 ~]$ hdfs dfs -mkdir /user/
[cdata@s1 ~]$ hdfs dfs -mkdir /user/jevoncode/ 
[cdata@s1 ~]$ hdfs dfs -mkdir /user/jevoncode/input 
[cdata@s1 ~]$ hdfs dfs -put all.txt /user/jevoncode/input/
[cdata@s1 ~]$ hadoop jar jc-demo-hadoop-0.0.1.0-SNAPSHOT-development.jar com.jc.demo.hadoop.yarn.mr.MaxTemperatureDriver -conf hadoop-cluster.xml /user/jevoncode/input/all.txt max-temp
18/06/24 00:45:13 INFO input.FileInputFormat: Total input paths to process : 1
18/06/24 00:45:15 INFO mapreduce.JobSubmitter: number of splits:1
18/06/24 00:45:16 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1529756862370_0003
18/06/24 00:45:17 INFO impl.YarnClientImpl: Submitted application application_1529756862370_0003
18/06/24 00:45:17 INFO mapreduce.Job: The url to track the job: http://s1.jevoncode.com:8088/proxy/application_1529756862370_0003/
18/06/24 00:45:17 INFO mapreduce.Job: Running job: job_1529756862370_0003
...

最後在HDFS目錄/user/cdata/max-temp下面生成結果文件part-r-00000
/user/jevoncode/input/all.txt文件,一整個20世紀的天氣數據260G大小,運行了兩個小時左右,我是加了個硬盤,因此並行讀寫硬盤的速度變高了。
而後將它圖形化展現出來,例如awk和sed轉換格式

cat part-r-00000|awk '{for(i=0;++i<=NF;)a[i]=a[i]?a[i] FS $i:$i}END{for(i=0;i++<NF;)print a[i]}' |sed 's/ /,/g'

結果:

1901,1902,1903,1904,1905,1906,1907,1908,1909,1910,1911,1912,1913,1914,1915,1916,1917,1918,1919,1920,1921,1922,1923,1924,1925,1926,1927,1928,1929,1930,1931,1932,1933,1934,1935,1936,1937,1938,1939,1940,1941,1942,1943,1944,1945,1946,1947,1948,1949,1950,1951,1952,1953,1954,1955,1956,1957,1958,1959,1960,1961,1962,1963,1964,1965,1966,1967,1968,1969,1970,1971,1972,1973,1974,1975,1976,1977,1978,1979,1980,1981,1982,1983,1984,1985,1986,1987,1988,1989,1990,1991,1992,1993,1994,1995,1996,1997,1998,1999
317,244,289,256,283,294,283,289,278,294,306,322,300,333,294,278,317,322,378,294,283,278,294,294,317,489,489,378,328,400,461,489,489,478,478,550,489,489,489,489,462,479,485,507,496,494,490,601,511,494,511,544,506,506,500,511,489,489,500,490,600,489,489,489,489,506,489,489,474,489,478,483,580,560,561,570,580,550,600,600,580,617,616,617,611,607,607,607,600,607,607,605,567,568,567,561,564,568,568

將它放入echart裏:

option = {
    xAxis: {
        type: 'category',
        name:'year',
        data: [1901,1902,1903,1904,1905,1906,1907,1908,1909,1910,1911,1912,1913,1914,1915,1916,1917,1918,1919,1920,1921,1922,1923,1924,
1925,1926,1927,1928,1929,1930,1931,1932,1933,1934,1935,1936,1937,1938,1939,1940,1941,1942,1943,1944,1945,1946,1947,1948,
1949,1950,1951,1952,1953,1954,1955,1956,1957,1958,1959,1960,1961,1962,1963,1964,1965,1966,1967,1968,1969,1970,1971,1972,
1973,1974,1975,1976,1977,1978,1979,1980,1981,1982,1983,1984,1985,1986,1987,1988,1989,1990,1991,1992,1993,1994,1995,1996,
1997,1998,1999]
    },
    yAxis: {
        type: 'value',
        name:'x10°C'
    },
    series: [{
        data: [317,244,289,256,283,294,283,289,278,294,306,322,300,333,294,278,317,322,378,294,283,278,294,294,317,489,489,378,328,400,
461,489,489,478,478,550,489,489,489,489,462,479,485,507,496,494,490,601,511,494,511,544,506,506,500,511,489,489,500,490,
600,489,489,489,489,506,489,489,474,489,478,483,580,560,561,570,580,550,600,600,580,617,616,617,611,607,607,607,600,607,
607,605,567,568,567,561,564,568,568],
        type: 'line',
        smooth: true
    }]
};

clipboard.png

1901年317不是317°C,這個數字要除以10,因而1901年最高氣溫是31.7°C,20世紀最高氣溫1982年是61.7°C,最低氣溫1902年的24.4°C,能夠看出整個世紀溫度是往上升的,是溫室效應呢?仍是隻是由於統計更精確而已?這都是題外話了。

TroubleShooting

1.遇到application狀態一直是:

最後發現是nodemanager沒起來,錯誤提示是這個

org.apache.hadoop.yarn.exceptions.YarnRuntimeException: org.apache.hadoop.yarn.exceptions.YarnRuntimeException: Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed, Message from ResourceManager: NodeManager from  s4.jevoncode.com doesn't satisfy minimum allocations, Sending SHUTDOWN signal to the NodeManager.
    at org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl.serviceStart(NodeStatusUpdaterImpl.java:203)
    at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
    at org.apache.hadoop.service.CompositeService.serviceStart(CompositeService.java:120)
    at org.apache.hadoop.yarn.server.nodemanager.NodeManager.serviceStart(NodeManager.java:272)
    at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
    at org.apache.hadoop.yarn.server.nodemanager.NodeManager.initAndStartNodeManager(NodeManager.java:496)
    at org.apache.hadoop.yarn.server.nodemanager.NodeManager.main(NodeManager.java:543)
Caused by: org.apache.hadoop.yarn.exceptions.YarnRuntimeException: Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed, Message from ResourceManager: NodeManager from  s4.jevoncode.com doesn't satisfy minimum allocations, Sending SHUTDOWN signal to the NodeManager.
    at org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl.registerWithRM(NodeStatusUpdaterImpl.java:278)
    at org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl.serviceStart(NodeStatusUpdaterImpl.java:197)
    ... 6 more
2018-06-24 23:16:58,432 INFO org.apache.hadoop.yarn.server.nodemanager.NodeManager: SHUTDOWN_MSG:

乍眼一看覺得是內存不夠,但個人yarn.scheduler.minimum-allocation-mb都配置512了,mapreduce.reduce.memory.mb和mapreduce.map.memory.mb也是512,不夠,最後發現是cpu配置是這樣:

<!-- 指定cpu個數-->
    <property>
        <!--     Number of vcores that can be allocated for containers. This is used by the RM scheduler when allocating resources for containers. This is not used to limit the number of CPUs used by YARN containers. If it is set to -1 and yarn.nodemanager.resource.detect-hardware-capabilities is true, it is automatically determined from the hardware in case of Windows and Linux. In other cases, number of vcores is 8 by default.-->
      <name>yarn.nodemanager.resource.cpu-vcores</name>
      <value>-1</value>
    </property>

配置爲默認8個,但我這虛擬機每一個機器只有2核,因此nodemanager就啓動失敗了,YARN不會智能的探測節點的內存總量及CPU個數。需配置爲:

<!-- 指定cpu個數-->
    <property>
        <!--     Number of vcores that can be allocated for containers. This is used by the RM scheduler when allocating resources for containers. This is not used to limit the number of CPUs used by YARN containers. If it is set to -1 and yarn.nodemanager.resource.detect-hardware-capabilities is true, it is automatically determined from the hardware in case of Windows and Linux. In other cases, number of vcores is 8 by default.-->
      <name>yarn.nodemanager.resource.cpu-vcores</name>
      <value>1</value>
    </property>

配了上面那個仍是不行,最後仍是有內存問題,那就是這個:

<property>
        <!--    Amount of physical memory, in MB, that can be allocated for containers. If set to -1 and yarn.nodemanager.resource.detect-hardware-capabilities is true, it is automatically calculated(in case of Windows and Linux). In other cases, the default is 8192MB.-->
        <name>yarn.nodemanager.resource.memory-mb</name>
        <value>-1</value>
      </property>
    <property>

這是能夠分配全部容器的總內存大小,我物理機總大小才8G,虛擬機才2G,因此我設置爲:

<property>
        <!--    Amount of physical memory, in MB, that can be allocated for containers. If set to -1 and yarn.nodemanager.resource.detect-hardware-capabilities is true, it is automatically calculated(in case of Windows and Linux). In other cases, the default is 8192MB.-->
        <name>yarn.nodemanager.resource.memory-mb</name>
        <value>1024</value>
      </property>
    <property>
相關文章
相關標籤/搜索