高可用Hadoop平臺-啓航

1.概述

  在上篇博客中,咱們搭建了《配置高可用Hadoop平臺》,接下來咱們就能夠駕着Hadoop這艘巨輪在大數據的海洋中遨遊了。工欲善其事,必先利其器。是的,沒錯;咱們開發須要有開發工具(IDE);本篇文章,我打算講解如何搭建和使用開發環境,以及編寫和講解WordCount這個例子,給即將在Hadoop的海洋馳騁的童鞋入個門。上次,我在《網站日誌統計案例分析與實現》中說會將源碼放到Github,後來,我考慮了下,決定將《高可用的Hadoop平臺》作一個系列,後面基於這個平臺,我會單獨寫一篇來贅述具體的實現過程,和在實現過程當中遇到的一些問題,以及解決這些問題的方案。下面咱們開始今天的啓航html

2.啓航

  IDE:JBoss Developer Studio 8.0.0.GA (Eclipse的升級版,Redhat公司出的)java

  JDK:1.7(或1.8)linux

  Hadoop2x-eclipse-plugin:這個插件,本地單元測試或本身作學術研究比較好用git

  插件下載地址:https://github.com/smartdengjie/hadoop2x-eclipse-plugingithub

  因爲JBoss Developer Studio 8基本適合於Retina屏,因此,咱們這裏直接使用JBoss Developer Studio 8,JBoss Developer Studio 7對Retina屏的支持不是很完美,這裏就不贅述了。apache

  附上一張IDE的截圖:oracle

2.1安裝插件

  下面咱們開始安裝插件,首先展現首次打開的界面,以下圖所示:app

  而後,咱們到上面給的Github的地址,clone整個工程,裏面有編譯好的jar和源碼,可自行選擇(使用已存在的和本身編譯對應的版本),這裏我直接使用編譯好的版本。咱們將jar放到IDE的plugins目錄下,以下圖所示:dom

  接着,咱們重啓IDE,界面出現以下圖所示的,即表示插件添加成功,若沒有,查看IDE的啓動日誌,根據異常日誌定位出緣由。eclipse

2.2設置Hadoop插件

  配置信息以下所示(已在圖中說明):

  添加本地的hadoop源碼目錄:

  到這裏,IDE和插件的搭建就完成了,下面咱們進入一段簡單的開發,hadoop的源碼中提供了許多example讓我學習,這裏我以WordCount爲例子來講明:

3.WordCount

  首先咱們看下hadoop的源碼文件目錄,以下圖所示:

3.1源碼解讀

package cn.hdfs.mr.example;

import java.io.IOException;
import java.util.Random;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.hdfs.utils.ConfigUtils;

/**
 * 
 * @author dengjie
 * @date 2015年03月13日
 * @description Wordcount的例子是一個比較經典的mapreduce例子,能夠叫作Hadoop版的hello world。
 *              它將文件中的單詞分割取出,而後shuffle,sort(map過程),接着進入到彙總統計
 *              (reduce過程),最後寫道hdfs中。基本流程就是這樣。
 */
public class WordCount {

    private static Logger log = LoggerFactory.getLogger(WordCount.class);

    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    /*
     * 源文件:a b b
     * 
     * map以後:
     * 
     * a 1
     * 
     * b 1
     * 
     * b 1
     */
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());// 整行讀取
        while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());// 按空格分割單詞
        context.write(word, one);// 每次統計出來的單詞+1
        }
    }
    }

    /*
     * reduce以前:
     * 
     * a 1
     * 
     * b 1
     * 
     * b 1
     * 
     * reduce以後:
     * 
     * a 1
     * 
     * b 2
     */
    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
        sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
    }
    }

    @SuppressWarnings("deprecation")
    public static void main(String[] args) throws Exception {
    Configuration conf1 = new Configuration();
    Configuration conf2 = new Configuration();
    long random1 = new Random().nextLong();// 重定下輸出目錄1
    long random2 = new Random().nextLong();// 重定下輸出目錄2
    log.info("random1 -> " + random1 + ",random2 -> " + random2);
    Job job1 = new Job(conf1, "word count1");
    job1.setJarByClass(WordCount.class);
    job1.setMapperClass(TokenizerMapper.class);// 指定Map計算的類
    job1.setCombinerClass(IntSumReducer.class);// 合併的類
    job1.setReducerClass(IntSumReducer.class);// Reduce的類
    job1.setOutputKeyClass(Text.class);// 輸出Key類型
    job1.setOutputValueClass(IntWritable.class);// 輸出值類型  

    Job job2 = new Job(conf2, "word count2");
    job2.setJarByClass(WordCount.class);
    job2.setMapperClass(TokenizerMapper.class);
    job2.setCombinerClass(IntSumReducer.class);
    job2.setReducerClass(IntSumReducer.class);
    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(IntWritable.class);
    // FileInputFormat.addInputPath(job, new
    // Path(String.format(ConfigUtils.HDFS.WORDCOUNT_IN, "test.txt")));
    // 指定輸入路徑
    FileInputFormat.addInputPath(job1, new Path(String.format(ConfigUtils.HDFS.WORDCOUNT_IN, "word")));
    // 指定輸出路徑
    FileOutputFormat.setOutputPath(job1, new Path(String.format(ConfigUtils.HDFS.WORDCOUNT_OUT, random1)));
    FileInputFormat.addInputPath(job2, new Path(String.format(ConfigUtils.HDFS.WORDCOUNT_IN, "word")));
    FileOutputFormat.setOutputPath(job2, new Path(String.format(ConfigUtils.HDFS.WORDCOUNT_OUT, random2)));

    boolean flag1 = job1.waitForCompletion(true);// 執行完MR任務後退出應用
    boolean flag2 = job1.waitForCompletion(true);
    if (flag1 && flag2) {
        System.exit(0);
    } else {
        System.exit(1);
    }

    }
}

4.總結

  這篇文章就和你們分享到這裏,若是在研究的過程有什麼問題,能夠加羣討論或發送郵件給我,我會盡我所能爲您解答,與君共勉!

相關文章
相關標籤/搜索