大數據技術暑期實習三___大數據和Hadoop的大體概念及Ubuntu環境下Hadoop搭建及應用

1.  大數據java

大數據是指沒法在必定時間範圍內用常規工具進行捕捉、管理和處理的數據集合,須要新的處理模式才能具備更強的決策力、洞察發現力和流程優化能力的海量、高增加率和多樣化的信息資產。linux

主要解決海量數據的存儲和分析計算問題。大數據的特色爲(4V):Volume大量、Velocity高速、Variety多樣、Value低價值密度。其核心技術即分佈式存儲,分佈式處理。git

大數據幫助人們進行精準化定製及預測,典型的案例如超市系統的啤酒和尿布的例子,電商平臺的推薦系統,交通系統對路況數據的預測等。github

2.  Hadoopweb

廣義上來講,Hadoop一般是指一個更普遍的概念——Hadoop生態圈。Hadoop的優點:apache

A.    高可靠性: Hadoop底層維護多個數據副本,因此即便Hadoop某個計算元素或存儲出現故障,也不會致使數據的丟失。app

B.    高擴展性:在集羣間分配任務數據,可方便的擴展數以千計的節點。dom

C.    高效性:在MapReduce的思想下,Hadoop是並行工做的,以加快任務處理速度。eclipse

D.    高容錯性:可以自動將失敗的任務從新分配。分佈式

如今的hadoop版本通常爲2.x相比1.x中MapReduce既負責計算又負責資源調度,2.x中新加入了yarn,他負責資源調度,而MapReduce僅負責計算,即從封閉式到可調度外部資源,能更好的適應需求。

  2.1   HDFS

    HDFS即分佈式文件存儲。

      1)NameNode(nn):存儲文件的元數據,如文件名,文件目錄結構,文件屬性(生成時間、副本數、文件權限),以及每一個文件的塊列表和塊所在的DataNode等。

      2)DataNode(dn):在本地文件系統存儲文件塊數據,以及塊數據的校驗和。

      3)SecondaryNameNode(2nn):用來監控HDFS狀態的輔助後臺程序,每隔一段時間獲取HDFS元數據的快照。

  2.2 MapReduce

    MapReduce將計算過程分爲兩個階段:Map和Reduce,

      1)Map階段並行處理輸入數據

      2)Reduce階段對Map結果進行彙總

3.  Hadoop環境搭建及wordcount程序運行

  3.1 Linux環境中Hadoop僞分佈式的搭建

      參考林子雨的博客

    

ps:今天在從新調試Hadoop運行環境時,出現了DataNode,NameNode, SecondaryNameNode未能同時啓動的狀況,
後來發現時由於從新配置Hadoop時,格式化NameNode次數過多致使的,Hadoop根目錄下tmp文件夾中的臨時文件中DataNode,NameNode的ID不一致,沒法匹配。
解決:關閉Hadoop進程後,清空tmp文件夾中內容,從新格式化NameNode,重啓Hadoop便可,若輸入jps命令發現SecondaryNameNode不能啓動,則能夠嘗試重啓Hadoop進程,還不行的話,能夠重啓一下虛擬機。

 

  3.2  eclipse中hadoop插件的配置。(先把Hadoop啓動)

     插件下載地址

    

 解壓後,將此jar包直接放在虛擬機中eclipse安裝根目錄,plugin文件夾下

重啓eclipse,啓動eclipse以後會有hdfs的文件系統,打開mapreduce視窗,若沒有該mapreduce視窗選項,建議重裝eclipse或檢查Hadoop版本是否匹配

個人Hadoop版本爲2.7.1,通常比此版本低的2.x版本應該都沒問題。

 

 

新建一個Hadoop鏈接,若是你是按照上述方式搭建的Hadoop,這兩個端口就是這樣

若是不是的話,左邊的端口號要與Hadoop的xml配置文件dfs-site.xml中保持一致,右邊的host端口號要與core-site.xml保持一致

 

   3.3 實例

  實例一WordCount

在本地新建一個test.txt文件,內容爲

hello world 
hello hadoop
world count

將linux本地 test.txt 上傳到HDFS上的/mydata/in目錄下。若HDFS目錄不存在,需提早建立。

hadoop fs -mkdir -p /mydata/in
hadoop fs
-put /usr/local/hadoop/test.txt /mydata/in

刷新eclipse的hdfs文件系統發現文件已經上傳成功

新建一個MapReduce項目,編寫java代碼

import java.io.IOException;  
import java.util.StringTokenizer;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
/**
 * 
 * @author 20163490 王敬斯
 * @data 2019/9/3
 * @todo Simple counting with Hadoop
 */

public class WordCount {  
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  
        Job job = Job.getInstance();  
        job.setJobName("WordCount");  //    Job名稱
        //mapreduce調用
        job.setJarByClass(WordCount.class);  
        job.setMapperClass(doMapper.class);  
        job.setReducerClass(doReducer.class);  
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(IntWritable.class);  
        Path in = new Path("hdfs://localhost:9000/mydata/in/test.txt");  //文件輸入絕對路徑
        Path out = new Path("hdfs://localhost:9000/mydata/out");      //文件輸出絕對路徑
        FileInputFormat.addInputPath(job, in);  
        FileOutputFormat.setOutputPath(job, out);  
        System.exit(job.waitForCompletion(true) ? 0 : 1);  
    }  
    //Mapper 文本切割
    public static class doMapper extends Mapper<Object, Text, Text, IntWritable>{      
        /* 第一個Object表示輸入key的類型
         * 第二個Text表示輸入value的類型
         * 第三個Text表示表示輸出鍵的類型
         * 第四個IntWritable表示輸出值的類型
         */
        public static final IntWritable one = new IntWritable(1);  
        public static Text word = new Text();  
        @Override  
        protected void map(Object key, Text value, Context context)  //
                    throws IOException, InterruptedException {  
            StringTokenizer tokenizer = new StringTokenizer(value.toString(), "\t");  //經過空格進行分類
                word.set(tokenizer.nextToken());  
                context.write(word, one);  
        }  
    }  
  //Reduce 彙總計數,參數同Map
    public static class doReducer extends Reducer<Text, IntWritable, Text, IntWritable>{  
        private IntWritable result = new IntWritable();  
        @Override  
        protected void reduce(Text key, Iterable<IntWritable> values, Context context)  
        throws IOException, InterruptedException {  
        int sum = 0;  
        //循環遍歷
        for (IntWritable value : values) {  
        sum += value.get();  //value同樣則sum+1
        }  
        result.set(sum);  
        context.write(key, result);  //輸出結果
        }  
    }  
}  
WordCount

運行選擇run on hadoop

刷新hdfs文件,發如今out輸出文件夾中有輸出文件

  實例二  Pi值計算

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.util.Random;

/**
 * 在正方形內生成的樣本點越多,計算Pi值越精確,這樣,這個問題就很適合用Hadoop來處理啦。假設要在正方形內生成1000萬個點,能夠設置10個Map任務,每一個Map任務處理100萬個點,也能夠設置100個Map任務,每一個Map任務處理10萬個點。
 */
public class CalPI {
    public static class PiMapper
            extends Mapper<Object, Text, Text, IntWritable>{

        private static Random rd = new Random();

        public void map(Object key, Text value, Context context
        ) throws IOException, InterruptedException {
            int pointNum = Integer.parseInt(value.toString());

            for(int i = 0; i < pointNum; i++){
                // 取隨機數
                double x = rd.nextDouble();
                double y = rd.nextDouble();
                // 計算與(0.5,0.5)的距離,若是小於0.5就在單位圓裏面
                x -= 0.5;
                y -= 0.5;
                double distance = Math.sqrt(x*x + y*y);

                IntWritable result = new IntWritable(0);
                if (distance <= 0.5){
                    result = new IntWritable(1);
                }


                context.write(value, result);
            }
        }
    }

    public static class PiReducer
            extends Reducer<Text,IntWritable,Text,DoubleWritable> {
        private DoubleWritable result = new DoubleWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context
        ) throws IOException, InterruptedException {

            double pointNum = Double.parseDouble(key.toString());
            double sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum/pointNum*4);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf,"calculate pi");
        job.setJarByClass(CalPI.class);
        job.setMapperClass(PiMapper.class);
//      job.setCombinerClass(PiReducer.class);
        job.setReducerClass(PiReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}
Pi值計算

  實例三 日誌分析

需求一:去除日誌中字段長度小於等於11的日誌。

數據準備:(本身在電腦的任一日誌文件中截取十幾二十行就行)

需求二:對web訪問日誌中的各字段識別切分,去除日誌中不合法的記錄,根據統計需求,生成各種訪問請求過濾數據。

數據準備:(本身在電腦的任一日誌文件中截取十幾二十行就行)

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 去除日誌中字段長度小於等於11的日誌。
 * create date 2019.9.3
 * author wangrenyi
 */
public class LogDriver {
    public static void main(String[] args) throws Exception {
        // 1 獲取 job 信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2 加載 jar 包
        job.setJarByClass(LogDriver.class);
        // 3 關聯 map
        job.setMapperClass(LogMapper.class);
        // 4 設置最終輸出類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        // 設置 reducetask 個數爲 0
        job.setNumReduceTasks(0);
        // 5 設置輸入和輸出路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 6 提交
        job.waitForCompletion(true);
    }
}
MapperClass:
package stdu.wry.mapreduce.log1;
import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    private Text k = new Text();

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
            throws IOException, InterruptedException {
        // 1 獲取 1 行數據
        String line = value.toString();
        // 2 解析日誌
        boolean result = parseLog(line, context);
        // 3 日誌不合法退出
        if (!result) {
            return;
        }
        // 4 設置 key
        k.set(line);
        // 5 寫出數據
        context.write(k, NullWritable.get());
    }

    // 解析日誌
    private boolean parseLog(String line, Mapper<LongWritable, Text, Text, NullWritable>.Context context) {
        // 切割
        String[] fields = line.split(" ");
        if (fields.length > 11) {
            // 系統計數器
            context.getCounter("LogMapper", "parseLog_true").increment(1);
            return true;
        } else {
            context.getCounter("LogMapper", "parseLog_false").increment(1);
            return false;
        }
    }

}
需求一
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 * 對web訪問日誌中的各字段識別切分,去除日誌中不合法的記錄,根據統計需求,生成各種訪問請求過濾數據。
 * create date 2019.9.3
 * author wangrenyi
 */
public class LogDriver {
    public static void main(String[] args) throws Exception {
        // 1 獲取 job 信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2 加載 jar 包
        job.setJarByClass(LogDriver.class);
        // 3 關聯 map
        job.setMapperClass(LogMapper.class);
        // 4 設置最終輸出類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        // 5 設置輸入和輸出路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 6 提交
        job.waitForCompletion(true);
    }
}
package stdu.wry.mapreduce.log2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 * 對web訪問日誌中的各字段識別切分,去除日誌中不合法的記錄,根據統計需求,生成各種訪問請求過濾數據。
 * create date 2019.9.3
 * author wangrenyi
 */
public class LogDriver {
    public static void main(String[] args) throws Exception {
        // 1 獲取 job 信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2 加載 jar 包
        job.setJarByClass(LogDriver.class);
        // 3 關聯 map
        job.setMapperClass(LogMapper.class);
        // 4 設置最終輸出類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        // 5 設置輸入和輸出路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 6 提交
        job.waitForCompletion(true);
    }
}
package stdu.wry.mapreduce.log2;
import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    Text k = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1 獲取 1 行
        String line = value.toString();
        // 2 解析日誌是否合法
        LogBean bean = pressLog(line);
        if (!bean.isValid()) {
            return;
        }
        k.set(bean.toString());
        // 3 輸出
        context.write(k, NullWritable.get());
    }

    // 解析日誌
    private LogBean pressLog(String line) {
        LogBean logBean = new LogBean();
        // 1 截取
        String[] fields = line.split(" ");
        if (fields.length > 11) {
            // 2 封裝數據
            logBean.setRemote_addr(fields[0]);
            logBean.setRemote_user(fields[1]);
            logBean.setTime_local(fields[3].substring(1));
            logBean.setRequest(fields[6]);
            logBean.setStatus(fields[8]);
            logBean.setBody_bytes_sent(fields[9]);
            logBean.setHttp_referer(fields[10]);
            if (fields.length > 12) {
                logBean.setHttp_user_agent(fields[11] + " " + fields[12]);
            } else {
                logBean.setHttp_user_agent(fields[11]);
            }
            // 大於 400, HTTP 錯誤
            if (Integer.parseInt(logBean.getStatus()) >= 400) {
                logBean.setValid(false);
            }
        } else {
            logBean.setValid(false);
        }
        return logBean;
    }
}
需求二
相關文章
相關標籤/搜索