【小白視角】大數據基礎實踐(五) MapReduce編程基礎操做

這是我參與8月更文挑戰的第7天,活動詳情查看:8月更文挑戰前端

1. MapReduce 簡介

1.1 起源

在函數式語言裏,map表示對一個列表(List)中的每一個元素作計算,reduce表示對一個列表中的每一個元素作迭代計算。java

它們具體的計算是經過傳入的函數來實現的,map和reduce提供的是計算的框架。web

  • 在MapReduce裏,map處理的是原始數據,每條數據之間互相沒有關係;
  • 到了reduce階段,數據是以key後面跟着若干個value來組織的,這些value有相關性,至少它們都在一個key下面,因而就符合函數式語言裏map和reduce的基本思想了。
  • 「map」和「reduce」的概念和它們的主要思想,都是從函數式編程語言借用來的,還有從矢量編程語言裏借來的特性。極大地方便了編程人員在不會分佈式並行編程的狀況下,將本身的程序運行在分佈式系統上。

1.2 模型簡介

  1. MapReduce將複雜的、運行於大規模集羣上的並行計算過程高度地抽象到了兩個函數:MapReduce
  2. 編程容易,不須要掌握分佈式並行編程細節,也能夠很容易把本身的程序運行在分佈式系統上,完成海量數據的計算
  3. MapReduce採用「分而治之」策略,一個存儲在分佈式文件系統中的大規模數據集,會被切分紅許多獨立的分片(split),這些分片能夠被多個Map任務並行處理
  4. MapReduce設計的一個理念就是「計算向數據靠攏」,而不是「數據向計算靠攏」,由於,移動數據須要大量的網絡傳輸開銷
  5. MapReduce框架採用了Master/Slave架構,包括一個Master和若干個SlaveMaster上運行JobTracker(yarn上ResourceManager),Slave上運行TaskTracker(yarn上Nodemanager)
  6. Hadoop框架是用Java實現的,可是,MapReduce應用程序則不必定要用Java來寫

1.3 MRv1體系結構

MapReduce體系結構主要由四個部分組成,分別是:Client、JobTracker、TaskTracker以及Taskapache

在這裏插入圖片描述 結點說明:編程

  • Client

用戶編寫的MapReduce程序經過Client提交到JobTracker端,用戶可經過Client提供的一些接口查看做業運行狀態。markdown

  • JobTracker

JobTracker負責資源監控和做業調度;JobTracker 監控全部TaskTrackerJob的健康情況,一旦發現失敗,就將相應的任務轉移到其餘節點;JobTracker會跟蹤任務的執行進度、資源使用量等信息,並將這些信息告訴任務調度器(TaskScheduler),而調度器會在資源出現空閒時,選擇合適的任務去使用這些資源。網絡

  • TaskTracker

TaskTracker會週期性地經過「心跳」將本節點上資源的使用狀況和任務的運行進度彙報給JobTracker,同時接收JobTracker 發送過來的命令並執行相應的操做(如啓動新任務、殺死任務等)。TaskTracker使用「slot」等量劃分本節點上的資源量(CPU、內存等)。一個Task獲取到一個slot後纔有機會運行,而Hadoop調度器的做用就是將各個TaskTracker上的空閒slot分配給Task使用。slot 分爲Map slotReduce slot兩種,分別供Map TaskReduce Task使用。架構

  • Task

Task分爲Map TaskReduce Task兩種,均由TaskTracker啓動。app

結構缺點:框架

  • 存在單點故障
  • JobTracker「大包大攬」致使任務太重(任務多時內存開銷大,上限4000節點)
  • 容易出現內存溢出(分配資源只考慮MapReduce任務數,不考慮CPU、內存)
  • 資源劃分不合理(強制劃分爲slot ,包括Map slot和Reduce slot)

1.4 YARN

1.4.1 YARN體系結構

架構思想 在這裏插入圖片描述 體系結構 在這裏插入圖片描述 ResourceManager • 處理客戶端請求 • 啓動/監控ApplicationMaster • 監控NodeManager • 資源分配與調度 NodeManager • 單個節點上的資源管理 • 處理來自ResourceManger的命令 • 處理來自ApplicationMaster的命令 ApplicationMaster • 爲應用程序申請資源,並分配給內部任務 • 任務調度、監控與容錯

1.4.2 YARN工做流程

在這裏插入圖片描述 步驟1:用戶編寫客戶端應用程序,向YARN提交應用程序,提交的內容包括ApplicationMaster程序、啓動ApplicationMaster的命令、用戶程序等 步驟2:YARN中的ResourceManager負責接收和處理來自客戶端的請求,爲應用程序分配一個容器,在該容器中啓動一個ApplicationMaster 步驟3:ApplicationMaster被建立後會首先向ResourceManager註冊 步驟4:ApplicationMaster採用輪詢的方式向ResourceManager申請資源 步驟5:ResourceManager以「容器」的形式向提出申請的ApplicationMaster分配資源 步驟6:在容器中啓動任務(運行環境、腳本) 步驟7:各個任務向ApplicationMaster彙報本身的狀態和進度 步驟8:應用程序運行完成後,ApplicationMasterResourceManager的應用程序管理器註銷並關閉本身

2. MapReduce 工做流程

在這裏插入圖片描述 ➢ 不一樣的Map任務之間不會進行通訊 ➢ 不一樣的Reduce任務之間也不會發生任何信息交換 ➢ 用戶不能顯式地從一臺機器向另外一臺機器發送消息 ➢ 全部的數據交換都是經過MapReduce框架自身去實現的

在這裏插入圖片描述

例子 在這裏插入圖片描述 在這裏插入圖片描述

3. Java Api要點

  • Writable

Hadoop 自定義的序列化接口。當要在進程間傳遞對象或持久化對象的時候,就須要序列化對象成字節流,反之當要將接收到或從磁盤讀取的字節流轉換爲對象,就要進行反序列化。Map 和 Reduce 的 key、value 數據格式均爲 Writeable 類型,其中 key 還需實現WritableComparable 接口。Java 基本類型對應 writable 類型的封裝以下:

Java primitive Writable implementation
boolean BooleanWritable
byte ByteWritable
int ShortWritable
float FloatWritable
long LongWritable
double DoubleWritable
enum EnumWritable
Map MapWritable

(2)InputFormat 用於描述輸入數據的格式。提供兩個功能:

getSplits()數據分片,按照某個策略將輸入數據切分紅若干個 split,以便肯定Map任務個數以及對應的 splitcreateRecordReader(),將某個split解析成一個個 key-value 對。 FileInputFormat 是全部以文件做爲數據源的 InputFormat 實現基類,小文件不會進行分片,記錄讀取調用子類 TextInputFormat 實現;

  • TextInputFormat 是默認處理類,處理普通文本文件,以文件中每一行做爲一條記錄,行起始偏移量爲 key,每一行文本爲 value;
  • CombineFileInputFormat 針對小文件設計,能夠合併小文件;
  • KeyValueTextInputFormat 適合處理一行兩列並以tab做爲分隔符的數據;
  • NLineInputFormat 控制每一個 split 中的行數。

(3)OutputFormat

主要用於描述輸出數據的格式。Hadoop 自帶多種 OutputFormat 的實現。

  • TextOutputFormat 默認的輸出格式,key 和 value 中間用 tab 分隔;
  • SequenceFileOutputFormat,將 key 和 value 以 SequenceFile 格式輸出;
  • SequenceFileAsOutputFormat,將 key 和 value 以原始二進制格式輸出;
  • MapFileOutputFormat,將 key 和 value 寫入 MapFile 中;
  • MultipleOutputFormat,默認狀況下 Reducer 會產生一個輸出,用該格式能夠實現一個Reducer 多個輸出。

(4)Mapper/Reducer

封裝了應用程序的處理邏輯,主要由 map、reduce 方法實現。

(5)Partitioner

根據 map 輸出的 key 進行分區,經過 getPartition()方法返回分區值,默認使用哈希函

數。分區的數目與一個做業的reduce任務的數目是同樣的。HashPartitioner是默認的Partioner。

4. 實驗過程

一、計數統計類應用 仿照 WordCount 例子,編寫「TelPubXxx」類實現對撥打公共服務號碼的電話信息的統計。給出的一個文本輸入文件以下,第一列爲電話號碼、第二列爲公共服務號碼,中間以空格隔開。 13718855152 11216810117315 110 39451849 112 13718855153 110 13718855154 112 18610117315 114 18610117315 114 MapReduce 程序執行後輸出結果以下,電話號碼之間用「|」鏈接: 110 13718855153|16810117315 112 13718855154|39451849|13718855152 114 18610117315|18610117315 在這裏插入圖片描述 在這裏插入圖片描述 在這裏插入圖片描述 運行成功 在這裏插入圖片描述

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TelPubZqc {
    public static class TelMap extends Mapper<Object, Text, Text, Text> {
        private Text pub = new Text();
        private Text tel = new Text();
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            //Map (Key Value)
            String[] s=value.toString().split(" ");
            tel.set(s[0]);
            pub.set(s[1]);
            context.write(pub,tel);
        }
    }
    public static class TelReducer extends Reducer<Text, Text, Text, Text> {
        private Text result = new Text();
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            StringBuilder s= new StringBuilder();
            for (Text val : values) {
                if(s.toString().equals("")){
                    s.append(val.toString());
                }
                else s.append("|").append(val.toString());
            }
            result.set(String.valueOf(s));
            context.write(key, result);// 輸出結果
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();// 加載hadoop配置
        conf.set("fs.defaultFS", "hdfs://localhost:9000");
        String[] otherArgs = new String[]{"input/input.txt","output/outputTel"};
        if (otherArgs.length < 2) {
            System.err.println("Usage: PubTel <in> [<in>...] <out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "word count");// 設置環境參數
        job.setJarByClass(TelPubZqc.class);// 設置程序主類
        job.setMapperClass(TelMap.class);// 設置用戶實現的Mapper類
        job.setCombinerClass(TelReducer.class);
        job.setReducerClass(TelReducer.class);// 設置用戶實現的Reducer類
        job.setOutputKeyClass(Text.class);// 設置輸出key類型
        job.setOutputValueClass(Text.class); // 設置輸出value類型
        for (int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));// 添加輸入文件路徑
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));// 設置輸出文件路徑
        System.exit(job.waitForCompletion(true) ? 0 : 1); // 提交做業並等待結束
    }
}
複製代碼

二、兩表聯結 Join 應用 仿照單表關聯例子,編寫「RelationXxx」類實現多表關聯。中文文本文件轉成 UTF-8 編碼格式,不然會亂碼。 輸入 score.txt:

studentid classid score
s003001 fd3003 84
s003001 fd3004 90
s003002 fd2001 71
s002001 fd1001 66
s001001 fd1001 98
s001001 fd1002 60
輸入 major.txt:
classid classname deptname
-- -- --
fd1001 數據挖掘 數學系
fd2001 電子工程 電子系
fd2002 電子技術 電子系
fd3001 大數據 計算機系
fd3002 網絡工程 計算機系
fd3003 Java 應用 計算機系
fd3004 web 前端 計算機系
輸出結果:
classid classname deptname
-- -- --
fd1001 數據挖掘 數學系
fd1001 數據挖掘 數學系
fd2001 電子工程 電子系
fd3003 Java 應用 計算機系
fd3004 web 前端 計算機系
在這裏插入圖片描述

將其中須要的東西傳到hdfs中去。

在這裏插入圖片描述

沒有報錯。查看結果

在這裏插入圖片描述

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;

public class RelationZqc {
    public static int time = 0;
    public static class RelationMap extends Mapper<Object, Text, Text, Text> {
        private Text classID = new Text();
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String filename=((FileSplit)context.getInputSplit()).getPath().getName();
            String[] s = value.toString().split(" ");
            if(filename.equals("score.txt")){
                classID.set(s[1]);
                String val="1," + s[0] + "," + s[2];
                context.write(classID,new Text(val));
            }
            else if (filename.equals("major.txt")){
                if(!s[0].equals("classid")){
                    classID.set(s[0]);
                    String val = "2," + s[1] + "," + s[2];
                    context.write(classID,new Text(val));
                }
            }
        }
    }

    public static class RelationReduce extends Reducer<Text, Text, Text, Text> {
        private Text result = new Text();
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            String[][] studentTable=new String[10][2];
            String[] data;
            String classID = "nil";
            if(time == 0){
                context.write(new Text("classid"), new Text("classname deptname studentid score"));
                time++;
            }
            int cnt = 0;
            for (Text val : values) {
                data = val.toString().split(",");
                if(data[0].equals("1")){
                    studentTable[cnt][0] = data[1];
                    studentTable[cnt][1] = data[2];
                    cnt = cnt + 1;
                }
                else if(data.length == 3 && data[0].equals("2")){
                    classID = data[1] + " " + data[2];
                }
            }
            for(int i = 0; i < cnt; i++){
                if(classID.equals("nil")) continue;
                String s=classID+" "+studentTable[i][0]+" "+studentTable[i][1];
                result.set(s);
                context.write(key, result);
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();// 加載hadoop配置
        conf.set("fs.defaultFS", "hdfs://localhost:9000");
        String[] otherArgs = new String[]{"input/score.txt", "input/major.txt", "output/outputRelationZqc"};
// String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
        if (otherArgs.length < 2) {
            System.err.println("Usage: Relation <in> <in> [<in>...] <out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "RelationZqc");// 設置環境參數
        job.setJarByClass(RelationZqc.class);// 設置程序主類
        job.setMapperClass(RelationMap.class);// 設置用戶實現的Mapper類
        job.setReducerClass(RelationReduce.class);// 設置用戶實現的Reducer類
        job.setOutputKeyClass(Text.class);// 設置輸出key類型
        job.setOutputValueClass(Text.class); // 設置輸出value類型
        for (int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));// 添加輸入文件路徑
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));// 設置輸出文件路徑
        System.exit(job.waitForCompletion(true) ? 0 : 1); // 提交做業並等待結束
    }
}
複製代碼

三、簡單排序類應用編寫 MapReduce 程序「SortXxx」 類,要求輸入文件 sort1.txt、sort2.txt、sort3.txt 內容,由程序隨機生成若干條數據並存儲到 HDFS 上,每條數據佔一行,數據能夠是日期也能夠是數字;輸出結果爲兩列數據,第一列是輸入文件中的原始數據,第二列是該數據的排位。 在這裏插入圖片描述 運行成功 在這裏插入圖片描述

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;


public class SortZqc {
    public static class SortMap extends Mapper<Object,Text,IntWritable,IntWritable>{
        private static IntWritable data = new IntWritable();
        //實現map函數
        public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
            String line=value.toString();
            data.set(Integer.parseInt(line));
            context.write(data, new IntWritable(1));
        }
    }

    public static class SortReduce extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{
        IntWritable n = new IntWritable(1);  //用n表明位次
        public void reduce(IntWritable key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{
            for(IntWritable val:values){
                context.write(key,n);
                n = new IntWritable(n.get()+1);
            }
        }
    }

    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();// 加載hadoop配置
        conf.set("fs.defaultFS", "hdfs://localhost:9000");
        String[] otherArgs = new String[]{"input/sort1.txt","input/sort2.txt","input/sort3.txt","output/outputSortZqc"};
        if (otherArgs.length < 2) {
            System.err.println("Usage: data sort <in> [<in>...] <out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "data sort");// 設置環境參數
        job.setJarByClass(SortZqc.class);// 設置程序主類
        job.setMapperClass(SortMap.class);// 設置用戶實現的Mapper類
        job.setCombinerClass(SortReduce.class);
        job.setReducerClass(SortReduce.class);// 設置用戶實現的Reducer類
        job.setOutputKeyClass(IntWritable.class);// 設置輸出key類型
        job.setOutputValueClass(IntWritable.class); // 設置輸出value類型
        for (int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));// 添加輸入文件路徑
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));// 設置輸出文件路徑
        System.exit(job.waitForCompletion(true) ? 0 : 1); // 提交做業並等待結束
    }

}
複製代碼

最後

小生凡一,期待你的關注。

相關文章
相關標籤/搜索