MapReduce入門(一)單詞計數

1、MR計算模型的由來html

        MapReduce最先是由Google公司研究提出的一種面向大規模數據處理的並行計算模型和方法。Google公司設計MapReduce的初衷,主要是爲了解決其搜索引擎中大規模網頁數據的並行化處理。java

        Google公司發明了MapReduce以後,首先用其從新改寫了其搜索引擎中的Web文檔索引處理系統。但因爲MapReduce能夠廣泛應用於不少大規模數據的計算問題,所以自發明MapReduce之後,Google公司內部進一步將其普遍應用於不少大規模數據處理問題。到目前爲止,Google公司內有上萬個各類不一樣的算法問題和程序都使用MapReduce進行處理。 linux

        2003年和2004年,Google公司在國際會議上分別發表了兩篇關於Google分佈式文件系統和MapReduce的論文,公佈了 Google的GFS和MapReduce的基本原理和主要設計思想。程序員

       2004年,開源項目Lucene(搜索索引程序庫)和Nutch(搜索引擎)的創始人Doug Cutting發現MapReduce正是其所須要的解決大規模Web數據處理的重要技術,於是模仿Google MapReduce,基於Java設計開發了一個稱爲Hadoop的開源MapReduce並行計算框架和系統。web

       自此,Hadoop成爲Apache開源組織下最重要的項目,自其推出後很快獲得了全球學術界和工業界的廣泛關注,並獲得推廣和普及應用。 MapReduce的推出給大數據並行處理帶來了巨大的革命性影響,使其已經成爲事實上的大數據處理的工業標準。算法

2、MapReduce基本設計思想apache

對付大數據並行處理:分而治之:編程

       一個大數據若能夠分爲具備一樣計算過程的數據塊,而且這些數據塊之間不存在數據依賴關係,則提升處理速度的最好辦法就是採用「分而治之」的策略進行並行化計算。性能優化

       MapReduce採用了這種「分而治之」的設計思想,對相互間不具備或者有較少數據依賴關係的大數據,用必定的數據劃分方法對數據分片,而後將每一個數據分片交由一個節點去處理,最後彙總處理結果。服務器

上升到抽象模型:Map與Reduce:

       MapReduce借鑑了函數式程序設計語言Lisp的設計思想。

      用Map和Reduce兩個函數提供了高層的並行編程抽象模型和接口,程序員只要實現這兩個基本接口便可快速完成並行化程序的設計。

        MapReduce的設計目標是能夠對一組順序組織的數據元素/記錄進行處理。

        現實生活中,大數據每每是由一組重複的數據元素/記錄組成,例如,一個Web訪問日誌文件數據會由大量的重複性的訪問日誌構成,對這種順序式數據元素/記錄的處理一般也是順序式掃描處理。

MapReduce提供瞭如下的主要功能:

        數據劃分和計算任務調度:系統自動將一個做業(Job)待處理的大數據劃分爲不少個數據塊,每一個數據塊對應於一個計算任務(Task),並自動調度計算節點來處理相應的數據塊。做業和任務調度功能主要負責分配和調度計算節點(Map節點或Reduce節點),同時負責監控這些節點的執行狀態,並負責Map節點執行的同步控制。

        數據/代碼互定位:爲了減小數據通訊,一個基本原則是本地化數據處理,即一個計算節點儘量處理其本地磁盤上所分佈存儲的數據,這實現了代碼向數據的遷移;當沒法進行這種本地化數據處理時,再尋找其餘可用節點並將數據從網絡上傳送給該節點(數據向代碼遷移),但將盡量從數據所在的本地機架上尋 找可用節點以減小通訊延遲。

       系統優化:爲了減小數據通訊開銷,中間結果數據進入Reduce節點前會進行必定的合併處理;一個Reduce節點所處理的數據可能會來自多個Map節點,爲了不Reduce計算階段發生數據相關性,Map節點輸出的中間結果需使用必定的策略進行適當的劃分處理,保證相關性數據發送到同一個Reduce節點;此外,系統還進行一些計算性能優化處理,如對最慢的計算任務採用多備份執行、選最快完成者做爲結果。

        出錯檢測和恢復:以低端商用服務器構成的大規模MapReduce計算集羣中,節點硬件(主機、磁盤、內存等)出錯和軟件出錯是常態,所以 MapReduce須要能檢測並隔離出錯節點,並調度分配新的節點接管出錯節點的計算任務。同時,系統還將維護數據存儲的可靠性,用多備份冗餘存儲機制提 高數據存儲的可靠性,並能及時檢測和恢復出錯的數據

3、MapReduce的編寫

1.加入依賴jar包--編寫pom.xml

<properties>
    <org.apache.hadoop.version>2.7.5</org.apache.hadoop.version>
</properties>

<!--分佈式計算-->
<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-core</artifactId>
        <version>${org.apache.hadoop.version}</version>
    </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>${org.apache.hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>${org.apache.hadoop.version}</version>
        </dependency>

    <!--分佈式存儲-->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>${org.apache.hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>${org.apache.hadoop.version}</version>
    </dependency>
    </dependencies>

2.在resources中添加core-site.xml文件,配置內容以下:

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://master2:9000</value>
    </property>
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
    <property>
        <name>yarn.resourcemanager.scheduler.address</name>
        <value>master2:8030</value>
    </property>
    <property>
        <name>fs.hdfs.impl</name>
        <value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
    </property>
</configuration>

master2:表示我集羣中NameNode的主機名,填寫主機名須要在本地機中的hosts中添加IP配置,

若是不配置,請填寫主機名所對應的IP。

 

3..mapreduce函數(類)的編寫,有三個類分別是表明map、reduce、job

3.1:編寫WCMapper類(map)繼承Mapper並重寫map這個方法,具體內容以下:

package com.day01;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * java基本數據類型步支持序列化--存放在內存中不在磁盤中(不能被持久化)
 * 用來處理map任務:映射
 * map任務接收的是kv,輸出的也是kv 1(行號),hello world
 * 第一個泛型表示:輸入key的數據類型 輸入的數據至關於文件開頭的偏移量(行號)沒有實際意義
 * 第二個泛型表示:輸入value的數據類型 輸入的文件的一行內容
 * 第三個泛型表示:輸出key的數據類型 輸出的key是一個字符串
 * 第四個泛型表示:輸出value的數據類型
 *
 * LongWritable:等價於java中的long
 * Text :等價與java中的string
 * IntWritable:等價於java中的int
 *
 * XXXWritable 是hadoop定義的基本數據類型,至關於對java中的數據類型作一個封裝,同時序列化(能夠網絡傳輸以及存儲到磁盤上)
 *
 */
public class WCMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
    //map方法每次執行一行數據,會被循環調用map方法(有多少行就調用多少次)
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //將一行數據(text類型)變爲string類型
        String line = value.toString();
        String[] words = line.split(" ");
        //定義value
        IntWritable one = new IntWritable(1);
        //便利單詞,輸出word 1
        for (int i = 0; i < words.length; i++) {
            Text keyOut = new Text(words[i]);
            //輸出word 1
            context.write(keyOut,one);
        }
    }
}

3.二、編寫WCReducer類(reduce)繼承Reducer並重寫reduce這個方法,具體內容以下:

package com.day01;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

/**
 * 用來處理reduce任務:合併
 * 在reduce端框架會將相同的key的value放在一個集合(迭代器)
 */
public class WCReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    //每次處理一個key,會被循環調用,有多少個key就會調用幾回
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //獲取迭代器
        Iterator<IntWritable> iterator = values.iterator();
        int count = 0;
        while (iterator.hasNext()){
            IntWritable one = iterator.next();
            count+=one.get();
        }
        //context的write只接受hadoop的數據類型,不接受java的數據類型
        context.write(key,new IntWritable(count));
    }
}

3.三、編寫WCJob類(job),具體內容以下:

package com.day01;

import com.google.common.io.Resources;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.util.ArrayList;
//mapred是hadoop的1.X的包,mapreduce是2.X的API

/**
 * 測試-設定任務的運行
 * 輸入與輸出的路徑
 */
public class WCJob {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration coreSiteConf = new Configuration();
        coreSiteConf.addResource(Resources.getResource("core-site.xml"));
        //設置一個任務,後面是job的名稱
        Job job = Job.getInstance(coreSiteConf, "wc");
        //設置job的運行類,就是此類
        job.setJarByClass(WCJob.class);
        //設置Map和Reduce處理類
        job.setMapperClass(WCMapper.class);
        job.setReducerClass(WCReducer.class);
        //設置map輸出類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //設置job/reduce輸出類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //設置任務的輸入路徑
        FileInputFormat.addInputPath(job, new Path("/wc"));
        //設置任務的輸出路徑--保存結果(這個目錄必須是不存在的目錄)
        //刪除存在的文件
        deleteFileName("/wcout");

        FileOutputFormat.setOutputPath(job, new Path("/wcout"));
        //運行任務 true:表示打印詳情
        boolean flag = job.waitForCompletion(true);
        if (flag){
            System.out.println(flag);
            readContent("/wcout/part-r-00000");
        }else {
            System.out.println(flag+",讀取文件失敗");
        }
    }

    //刪除已經存在在hdfs上面的文件文件
    private static void deleteFileName(String path) throws IOException {
        //將要刪除的文件
        Path fileName = new Path(path);
        Configuration entries = new Configuration();
        //解析core-site-master2.xml文件
        entries.addResource(Resources.getResource("core-site.xml"));
        //獲取客戶端文件系統
        FileSystem fileSystem = FileSystem.get(entries);
        if (fileSystem.exists(fileName)){
            System.out.println(fileName+"已經存在,正在刪除它...");
            boolean flag = fileSystem.delete(fileName, true);
            if (flag){
                System.out.println(fileName+"刪除成功");
            }else {
                System.out.println(fileName+"刪除失敗");
                return;
            }
        }
        //關閉資源
        fileSystem.close();
    }

    //讀取文件內容
    private static void readContent(String path) throws IOException {
        //將要讀取的文件路徑
        Path fileName = new Path(path);
        ArrayList<String> returnValue = new ArrayList<String>();
        Configuration configuration = new Configuration();
        configuration.addResource(Resources.getResource("core-site.xml"));
        //獲取客戶端系統文件
        FileSystem fileSystem = FileSystem.get(configuration);
        //open打開文件--獲取文件的輸入流用於讀取數據
        FSDataInputStream inputStream = fileSystem.open(fileName);
        InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
        //一行一行的讀取數據
        LineNumberReader lineNumberReader = new LineNumberReader(inputStreamReader);
        //定義一個字符串變量用於接收每一行的數據
        String str = null;
        //判斷什麼時候沒有數據
        while ((str=lineNumberReader.readLine())!=null){
            returnValue.add(str);
        }
        //打印數據到控制檯
        System.out.println("文件內容以下:");
        for (String read :
                returnValue) {
            System.out.println(read);
        }
        //關閉資源
        lineNumberReader.close();
        inputStream.close();
        inputStreamReader.close();
    }
}

-----在本地運行

a.在你的集羣的hdfs上建立/wc

hadoop fs -mkdir /wc

b.將兩個文件寫入內容,內容間以空格隔開,並將文件put到hdfs中的/wc上

c.在C:\Windows\System32中添加hadoop.dll與winutils.exe文件

hadoop.dll與winutils.exe這兩個文件的連接:https://pan.baidu.com/s/10xa7wC3BwlH3oF7DoYFE1A 

提取碼:c7ie
d.在D:\soft\hadoop\hadoop-2.7.5\bin中添加hadoop.dll與winutils.exe文件
e.將idea中core-site.xml中關於yarn的配置刪除掉、

-通過以上a-b-c-d-e操做以後,直接Run,出現如下內容就恭喜你成功!!!!!!!

 

 

----------在linux上運行
將寫的項目打包
1.點擊左下角的方框,再店家maven projects-->找到想要打包的項目
-->點擊Lifecycle-->雙擊package就會開始打包
2.將jar包上傳並更名(可改可不改)
3.開始運行jar包
例如運行mrdemo.jar中的WEJob類
hadoop jar mrdemo.jar com.day01.WCJob

 

也能夠在web中輸入http://192.168.228.13:8088/cluster/apps/FINISHED

192.168.228.13:修改爲本身的主機IP,出現SUCCEEDED表示執行成功

過程當中出現下面異常,在網上查看說是hadoop的一個BUG

java.lang.InterruptedException
        at java.lang.Object.wait(Native Method)
        at java.lang.Thread.join(Thread.java:1252)
        at java.lang.Thread.join(Thread.java:1326)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:716)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:476)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:652)

異常問題:

保證本地運行成功

上傳jar包到hdfs上運行找不到類異常--沒有將第三方依賴一塊兒打包

---將第三方jar包放在lib裏面,再打包(就會自動加載lib裏面的jar包)就解決了--以下圖

還有其餘異常,歡迎分享一塊兒解決....

相關文章
相關標籤/搜索