Hadoop上路-03_Hadoop JavaAPI

一。Eclipse安裝

1.下載解壓

下載:http://www.eclipse.org/downloads/  java

解壓:SHELL$ sudo tar -zxvf eclipse.tar.gz  算法

 

2.快捷方式

右鍵Ubuntu桌面,建立啓動器apache

 

 

3.建立一個JavaProject

 

 

4.添加必須jar

所有jar均可以在%Hadoop安裝目錄%/share/hadoop目錄中找到。api

 

 

 

二。基本操做

這裏僅限FileSystem中的方法,其數量繁多,具體查看API數組

1.遍歷目錄和文件 listStatus

// 威格靈博客:www.cuiweiyou.com
package com.cuiweiyou.hadooptest;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;

public class HdfsTest {

    private static FileSystem hdfs;
    
    @Test
    public void test() throws Exception {
        // 1.建立配置器 
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
        conf.set("mapred.jop.tracker", "192.168.1.240:9001");
        // 2.建立文件系統 
        hdfs = FileSystem.get(conf);
        // 3.遍歷HDFS上的文件和目錄 
        FileStatus[] fs = hdfs.listStatus(new Path("hdfs:/")); 
        if (fs.length > 0) { 
            for (FileStatus f : fs) { 
                showDir(f);
            }
        }
    }

    private static void showDir(FileStatus fs) throws Exception {
        Path path = fs.getPath();
        System.out.println(path);
        // 若是是目錄
        //if (fs.isDir()) {    //已過時
        if (fs.isDirectory()) {
            FileStatus[] f = hdfs.listStatus(path);
            if (f.length > 0) {
                for (FileStatus file : f) {
                    showDir(file);
                }
            }
        }
    }
}

  

 

2.遍歷文件 listFiles

    @Test
    public void test() throws Exception {
        // 1.配置器
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
        conf.set("mapred.jop.tracker", "192.168.1.240:9001");
        // 2.文件系統
        hdfs = FileSystem.get(conf);
        // 3.遍歷HDFS上的文件
        RemoteIterator<LocatedFileStatus> fs = hdfs.listFiles(new Path("hdfs:/"), true);
        while(fs.hasNext()){
            System.out.println(fs.next());
        }
    }

  

 

3.判斷存在 exists

    @Test
    public void test() throws Exception {
        // 1.建立配置器  
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
        conf.set("mapred.jop.tracker", "192.168.1.240:9001");
        //2.建立文件系統  
        FileSystem hdfs = FileSystem.get(conf);  
        //3.建立可供hadoop使用的文件系統路徑
        Path file = new Path("hdfs:/test.txt");  
        // 4.判斷文件是否存在(文件目標路徑)    
        System.out.println("文件存在:" + hdfs.exists(file));
    }

 

4.判斷目錄/文件 isDirectory/isFile

// 威格靈博客:www.cuiweiyou.com
package com.cuiweiyou.hadooptest; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Test; public class HdfsTest { private static FileSystem hdfs; @Test public void test() throws Exception { // 1.配置器 Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.1.240:9000"); conf.set("mapred.jop.tracker", "192.168.1.240:9001"); // 2.文件系統 hdfs = FileSystem.get(conf); // 3.遍歷HDFS上目前擁有的文件和目錄 FileStatus[] fs = hdfs.listStatus(new Path("hdfs:/")); if (fs.length > 0) { for (FileStatus f : fs) { showDir(f); } } else{ System.out.println("沒什麼好遍歷的..."); } } private static void showDir(FileStatus fs) throws Exception { Path path = fs.getPath(); // 若是是目錄 if (fs.isDirectory()) { System.out.println("目錄:" + path); FileStatus[] f = hdfs.listStatus(path); if (f.length > 0) { for (FileStatus file : f) { showDir(file); } } } else { System.out.println("文件:" + path); } } }

  

 

5.最後修改時間 getModificationTime

// 威格靈博客:www.cuiweiyou.com
package com.cuiweiyou.hadooptest; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Test; public class HdfsTest2 { private static FileSystem hdfs; @Test public void test() throws Exception { // 1.建立配置器 Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.1.240:9000"); conf.set("mapred.jop.tracker", "192.168.1.240:9001"); // 2.建立文件系統(指定爲HDFS文件系統到URI) hdfs = FileSystem.get(conf); // 3.列出HDFS上目前擁有的文件和目錄 FileStatus[] fs = hdfs.listStatus(new Path("hdfs:/")); if(fs.length>0){ for (FileStatus f : fs) { showDir(f); } } } private static void showDir(FileStatus fs) throws Exception { Path path = fs.getPath(); //獲取最後修改時間 long time = fs.getModificationTime(); System.out.println("HDFS文件的最後修改時間:"+new Date(time)); System.out.println(path); if (fs.isDirectory()) { FileStatus[] f = hdfs.listStatus(path); if(f.length>0){ for (FileStatus file : f) { showDir(file); } } } } }

  

 

6.文件備份狀態 getFileBlockLocations

// 威格靈博客:www.cuiweiyou.com
package com.cuiweiyou.hadooptest; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Test; public class HdfsTest2 { @Test public void test() throws Exception { //1.配置器 Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.1.240:9000"); conf.set("mapred.jop.tracker", "192.168.1.240:9001"); //2.文件系統 FileSystem fs = FileSystem.get(conf); //3.已存在的,必須是文件 Path path = new Path("hdfs:/vigiles/dir/test3.txt"); //4.文件狀態 FileStatus status = fs.getFileStatus(path); //5.文件塊 //BlockLocation[] blockLocations = fs.getFileBlockLocations(status, 0, status.getLen()); //方法1,傳入文件的FileStatus BlockLocation[] blockLocations = fs.getFileBlockLocations(path, 0, status.getLen()); //方法2,傳入文件的Path int blockLen = blockLocations.length; System.err.println("塊數量:"+blockLen); //若是文件不夠大,就不會分塊,即獲得1 for (int i = 0; i < blockLen; i++) { //獲得塊文件大小 long sizes = blockLocations[i].getLength(); System.err.println("塊大小:"+sizes); //按照備份數量獲得所有主機名 String[] hosts = blockLocations[i].getHosts(); for (String host : hosts) { System.err.println("主機名:"+host); } //按照備份數量獲得所有主機名 String[] names = blockLocations[i].getNames(); for (String name : names) { System.err.println("IP:"+ name); } } } }

  

 

7.讀取文件 open

// 威格靈博客:www.cuiweiyou.com
package com.cuiweiyou.hadooptest; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Test; public class HdfsTest2 { @Test public void test() throws Exception { Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.1.240:9000"); conf.set("mapred.jop.tracker", "192.168.1.240:9001"); FileSystem fs = FileSystem.get(conf); Path path = new Path("hdfs:/vigiles/dir/test3.txt"); FSDataInputStream is = fs.open(path); FileStatus stat = fs.getFileStatus(path); byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))]; is.readFully(0, buffer); is.close(); fs.close(); System.out.println(new String(buffer)); } }

 

8.複製上傳文件 copyFromLocalFile

@Test
    public void test() throws Exception {
        // 1.建立配置器  
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
        conf.set("mapred.jop.tracker", "192.168.1.240:9001");
        //2.建立文件系統  
        FileSystem hdfs = FileSystem.get(conf);  
        //3.建立可供hadoop使用的文件系統路徑  
        Path src = new Path("file:/home/hadoop/桌面/copy_test.txt"); //本地目錄/文件  
        Path dst = new Path("hdfs:/");  //目標目錄/文件 
        // 4.拷貝本地文件上傳(本地文件,目標路徑)  
        hdfs.copyFromLocalFile(src, dst);  
        System.out.println("文件上傳成功至:" + conf.get("fs.default.name"));  
        // 5.列出HDFS上的文件  
        FileStatus[] fs = hdfs.listStatus(dst);  
        for (FileStatus f : fs) {   
            System.out.println(f.getPath());  
        }
        
        Path path = new Path("hdfs:/copy_test.txt");
        FSDataInputStream is = hdfs.open(path);
        FileStatus stat = hdfs.getFileStatus(path);
        byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];
        is.readFully(0, buffer);
        is.close();
        hdfs.close();
        System.out.println("文件內容:" + new String(buffer));
    }

  

另:移動上傳moveFromLocalFile,和copyFromLocalFile相似,但其操做後源文件將不存在。app

 

9.複製下載文件 copyToLocalFile

    @Test
    public void test() throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
        conf.set("mapred.jop.tracker", "192.168.1.240:9001");
        FileSystem hdfs = FileSystem.get(conf);  
        //建立HDFS源路徑和本地目標路徑
        Path src = new Path("hdfs:/copy_test.txt");  //目標目錄/文件 
        Path dst = new Path("file:/home/hadoop/桌面/new.txt"); //本地目錄/文件  
        //拷貝本地文件上傳(本地文件,目標路徑)  
        hdfs.copyToLocalFile(src, dst);  
    }

 另:moveToLocalFile,其操做後源文件將不存在。eclipse

 

10.建立目錄 mkdirs

    @Test
    public void test() throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
        conf.set("mapred.jop.tracker", "192.168.1.240:9001");
        FileSystem hdfs = FileSystem.get(conf);
        //建立目錄
        hdfs.mkdirs(new Path("hdfs:/eminem"));
    }

 

11.建立目錄/文件 create

  @Test
    public void test() throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
        conf.set("mapred.jop.tracker", "192.168.1.240:9001");
        FileSystem hdfs = FileSystem.get(conf);
        
        // 使用HDFS數據輸出流(寫)對象 在HDSF上根目錄建立一個文件夾,其內再建立文件
        FSDataOutputStream out = hdfs.create(new Path("hdfs:/vigiles/eminem.txt"));
        // 在文件中寫入一行數據,必須使用UTF-8
        out.write("痞子阿姆,Hello !".getBytes("UTF-8"));
        
        out = hdfs.create(new Path("/vigiles/alizee.txt"));
        out.write("艾莉婕,Hello !".getBytes("UTF-8"));
        
        out.close();
        
        FSDataInputStream is = hdfs.open(new Path("hdfs:/vigiles/alizee.txt"));
        FileStatus stat = hdfs.getFileStatus(new Path("hdfs:/vigiles/alizee.txt"));
        byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];
        is.readFully(0, buffer);
        is.close();
        hdfs.close();
        System.out.println(new String(buffer));
    }

 

12.建立空文件 createNewFile

    @Test
    public void test() throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
        conf.set("mapred.jop.tracker", "192.168.1.240:9001");
        FileSystem hdfs = FileSystem.get(conf);
        //建立空文件
        hdfs.createNewFile(new Path("hdfs:/newfile.txt"));
    }

 

13.寫入文件 append

    @Test
    public void test() throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
        conf.set("mapred.jop.tracker", "192.168.1.240:9001");
        FileSystem hdfs = FileSystem.get(conf);
        //建立空文件
        FSDataOutputStream out = hdfs.append(new Path("hdfs:/newfile.txt"));
        out.write("使用append方法寫入文件\n".getBytes("UTF-8"));
        out.close();
        
        out = hdfs.append(new Path("/newfile.txt"));
        out.write("再次寫入!!!\n".getBytes("UTF-8"));
        out.close();
    }

 

14.重命名文件 rename

    @Test
    public void test() throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
        conf.set("mapred.jop.tracker", "192.168.1.240:9001");
        FileSystem fs = FileSystem.get(conf);
        //重命名:fs.rename(源文件,新文件)
        boolean rename = fs.rename(new Path("/copy_test.txt"), new Path("/copy.txt"));
        System.out.println(rename);
    }

 

15.刪除文件 delete

    @Test
    public void test() throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
        conf.set("mapred.jop.tracker", "192.168.1.240:9001");
        FileSystem fs = FileSystem.get(conf);
        //判斷刪除(路徑,true。false=非空時不刪除,拋RemoteException、IOException異常)
        boolean delete = fs.delete(new Path("hdfs:/test.txt"), true);
        System.out.println("執行刪除:"+delete);
        //FileSystem關閉時執行
        boolean exit = fs.deleteOnExit(new Path("/out.txt"));
        System.out.println("執行刪除:"+exit);
        fs.close();
    }

 

 

三。MapReduce經常使用算法

 

1.計數

1)數據準備

 

2)代碼

// 威格靈博客:www.cuiweiyou.com
1 package com.cuiweiyou.hadooptest; 2 3 import java.io.IOException; 4 import java.util.StringTokenizer; 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.IntWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14 15 /* 16 * 單詞計數 17 */ 18 public class WordCount { 19 20 /* 21 * 先通過mapper運算,而後纔是reducer。 22 * 內部類:映射器 Mapper<Key_IN, Value_IN, Key_OUT, Value_OUT> 23 */ 24 public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> { 25 26 //計數,查到一個就佔個坑 27 private static final IntWritable one = new IntWritable(1); 28 //文本 29 private Text word = new Text(); 30 31 /** 32 * 重寫map方法,實現理想效果 33 * MyMapper的實例只有一個,但實例的這個map方法卻一直在執行 34 * Key1:本行首字符在全文中的索引。Value1:本行的文本。context:上下文對象 35 * 這裏K一、V1像這樣[K,V] 36 **/ 37 public void map(Object Key1, Text Value1, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { 38 //拆分字符串,返回單詞集合。默認以空格拆分 39 StringTokenizer itr = new StringTokenizer(Value1.toString()); 40 //遍歷一行的所有單詞 41 while (itr.hasMoreTokens()) { 42 //將文本轉爲臨時Text變量 43 this.word.set(itr.nextToken()); 44 //將單詞保存到上下文對象中(單詞,佔坑),輸出 45 context.write(this.word, one); 46 } 47 } 48 } 49 50 /************************************************************************ 51 * 在Mapper後,Reducer前,有個shuffle過程,會根據k2將對應的v2歸併爲v2[...] * 52 *************************************************************************/ 53 54 /* 55 * mapper結束後,執行如今的reducer。 56 * 內部類:拆分器 Reducer<Key_IN, Value_IN, Key_OUT, Value_OUT> 57 */ 58 public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 59 60 //個數統計 61 private IntWritable result = new IntWritable(); 62 63 /** 64 * 重寫reduce方法,實現理想效果 65 * MyReducer的實例也只有一個,但實例的這個reduce方法卻一直在執行 66 * Key2:單詞。Values2:value的集合,也就是[1,1,1,...]。context:上下文對象 67 * 這裏這裏K二、V2像這樣[K,V[1,1,1,...]] 68 **/ 69 public void reduce(Text Key2, Iterable<IntWritable> Values2, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { 70 int sum = 0; 71 //累加V2的元素,有多少個 1 ,即有多少個指定單詞 72 for (IntWritable val : Values2) { 73 sum += val.get(); 74 } 75 this.result.set(sum); 76 //終於將單詞和總個數再次輸出 77 context.write(Key2, this.result); 78 } 79 } 80 81 public static void main(String[] args) throws Exception { 82 // 聲明配置信息 83 Configuration conf = new Configuration(); 84 conf.set("fs.default.name", "hdfs://192.168.1.240:9000"); 85 // 建立做業 86 Job job = new Job(conf, "word count"); 87 job.setJarByClass(WordCount.class); 88 // 設置mr 89 job.setMapperClass(MyMapper.class); 90 job.setReducerClass(MyReducer.class); 91 // 設置輸出類型,和Context上下文對象write的參數類型一致 92 job.setOutputKeyClass(Text.class); 93 job.setOutputValueClass(IntWritable.class); 94 // 設置輸入輸出路徑 95 FileInputFormat.addInputPath(job, new Path("hdfs:/input")); //文件已經存在 96 FileOutputFormat.setOutputPath(job, new Path("hdfs:/output")); //還沒有存在 97 // 執行 98 System.exit(job.waitForCompletion(true) ? 0 : 1); 99 } 100 }

 

3)結果

 

 

2.排序

1)數據準備

 

2)代碼

 1 package com.cuiweiyou.hadooptest;
 2 
 3 import *
 4 
 5 //hadoop默認排序:
 6 //若是k二、v2類型是Text-文本,結果是按照字典順序
 7 //若是k二、v2類型是LongWritable-數字,結果是按照數字大小順序
 8 
 9 public class TestSort {
10     /**
11      * 內部類:映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
12      */
13     public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
14         /**
15          * 重寫map方法
16          */
17         public void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
18             //這裏v1轉爲k2-數字類型,捨棄k1。null爲v2
19             context.write(new LongWritable(Long.parseLong(v1.toString())), NullWritable.get());
20             //由於v1可能重複,這時,k2也是可能有重複的
21         }
22     }
23 
24     /*** 在此方法執行前,有個shuffle過程,會根據k2將對應的v2歸併爲v2[...] ***/
25 
26     /**
27      * 內部類:拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
28      */
29     public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
30         /**
31          * 重寫reduce方法
32          */
33         protected void reduce(LongWritable k2, Iterable<NullWritable> v2, Context context) throws IOException, InterruptedException {
34             //k2=>k3, v2[...]捨棄。null => v3
35             context.write(k2, NullWritable.get());
36             //此時,k3若是發生重複,根據默認算法會發生覆蓋,即最終僅保存一個k3
37         }
38     }
39 
40     public static void main(String[] args) throws Exception {
41         // 聲明配置信息
42         Configuration conf = new Configuration();
43         conf.set("fs.default.name", "hdfs://192.168.1.240:9000");
44 
45         // 建立做業
46         Job job = new Job(conf, "Test Sort");
47         job.setJarByClass(TestSort.class);
48 
49         // 設置mr
50         job.setMapperClass(MyMapper.class);
51         job.setReducerClass(MyReducer.class);
52 
53         // 設置輸出類型,和Context上下文對象write的參數類型一致
54         job.setOutputKeyClass(LongWritable.class);
55         job.setOutputValueClass(NullWritable.class);
56 
57         // 設置輸入輸出路徑
58         FileInputFormat.setInputPaths(job, new Path("/input/"));
59         FileOutputFormat.setOutputPath(job, new Path("/out"));
60 
61         // 執行
62         System.exit(job.waitForCompletion(true) ? 0 : 1);
63     }
64 }

 

3)結果

 

 

3.去重

 1     /*
 2      * 內部類:映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
 3      */
 4     public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
 5         /****
 6          * 重寫map方法
 7         ****/
 8         public void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
 9             //由於咱們讀入的數據就是一行一個數字,直接使用
10             //這個數字有幾個都無所謂,只有知道有這麼一個數字便可,因此輸出的v2爲null
11             context.write(new LongWritable(Long.parseLong(v1.toString())), NullWritable.get());
12         }
13     }
14     
15     /** 在此方法執行前,有個shuffle過程,會根據k2將對應的v2歸併爲v2[...] **/
16 
17     /*
18      * 內部類:拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
19      */
20     public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
21         /****
22          * 重寫reduce方法
23         ****/
24         protected void reduce(LongWritable k2, Iterable<NullWritable> v2, Context context) throws IOException, InterruptedException {
25             //此時,k3(即眼前的k2)若是發生重複,根據默認算法會發生覆蓋,即最終僅保存一個k3,達到去重到效果,而v3是null無所謂
26             context.write(k2, NullWritable.get());
27 
28         }
29     }

 

4.過濾

 1     /*
 2      * 內部類:映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
 3      */
 4     public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
 5         String tmp = "8238";
 6         
 7         /**
 8          * 重寫map方法。k1:行首字符索引,v1:這一行文本
 9         **/
10         protected void map(LongWritable k1, Text v1, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException ,InterruptedException {
11             System.out.println(v1+", "+tmp);
12             //若是行文本是指定值,過濾之
13             if(v1.toString().equals(tmp)){
14                 System.out.println("有了");
15                 //保存(按照泛型限制,k2是Text,v2是Nullritable)
16                 context.write(v1, NullWritable.get());
17             }
18         }
19     }
20 
21     /*
22      * 內部類:拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
23      */
24     public static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
25         /**
26          * 重寫reduce方法
27         **/
28         protected void reduce(Text k2, Iterable<NullWritable> v2, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException ,InterruptedException {
29             context.write(k2, NullWritable.get());
30         }
31     }

 若是報錯:oop

  Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.LongWritable, received org.apache.hadoop.io.Text
必定要檢查main方法裏:ui

// 設置輸出類型,和Context上下文對象write的參數類型一致
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

 

5.TopN

1)數值最大

 1     // map(泛型定義了輸入和輸出類型)
 2     public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
 3 
 4         // 首先建立一個臨時變量,保存一個可存儲的最小值:Long.MIN_VALUE=-9223372036854775808
 5         long temp = Long.MIN_VALUE;
 6 
 7         // 找出最大值。這個map不斷迭代v1,最終保存最大值
 8         protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
 9             
10             // 將文本轉數值
11             long val = Long.parseLong(v1.toString());
12             // 若是v1比臨時變量大,則保存v1的值
13             if (temp < val) {
14                 temp = val;
15             }
16         }
17 
18         /** ---此方法在所有的map任務結束後執行一次。這時僅輸出臨時變量到最大值--- **/
19         protected void cleanup(Context context) throws IOException, InterruptedException {
20             context.write(new LongWritable(temp), NullWritable.get());
21             System.out.println("文件讀取完畢,保存最大值");    //輸出兩次,對應兩個文本文件
22         }
23     }
24 
25     // reduce
26     public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
27         // 臨時變量
28         Long temp = Long.MIN_VALUE;
29 
30         // 由於一個文件獲得一個最大值,咱們有兩個txt文件會獲得兩個值。再次將這些值比對,獲得最大的
31         protected void reduce(LongWritable k2, Iterable<NullWritable> v2, Context context) throws IOException, InterruptedException {
32 
33             long val = Long.parseLong(k2.toString());
34             // 若是k2比臨時變量大,則保存k2的值
35             if (temp < val) {
36                 temp = val;
37             }
38         }
39 
40         /** !!!此方法在所有的reduce任務結束後執行一次。這時僅輸出惟一最大值!!! **/
41         protected void cleanup(Context context) throws IOException, InterruptedException {
42             context.write(new LongWritable(temp), NullWritable.get());
43         }
44     }

 

2)數值前5

 1     // map
 2     public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
 3 
 4         // 首先建立一個臨時變量,保存一個可存儲的最小值:Long.MIN_VALUE=-9223372036854775808
 5         long temp = Long.MIN_VALUE;
 6         // Top5存儲空間,咱們取前5個
 7         long[] tops;
 8 
 9         /** 這個方法在run中調用,在所有map以前執行一次 **/
10         protected void setup(Context context) {
11             // 初始化數組長度爲5
12             tops = new long[5];
13         }
14 
15         // 找出最大值
16         protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
17             
18             // 將文本轉數值
19             final long val = Long.parseLong(v1.toString());
20             // 保存在0索引
21             tops[0] = val;
22             // 排序後最大值在最後一個索引,這樣從[5]到[0]依次減少。每執行一次map,最小的[0]都會賦予新值
23             Arrays.sort(tops);
24         }
25 
26         /** ---此方法在所有到map任務結束後執行一次。輸出map後獲得的前5個最大值--- **/
27         protected void cleanup(Context context) throws IOException, InterruptedException {
28             for (int i = 0; i < tops.length; i++) {
29                 context.write(new LongWritable(tops[i]), NullWritable.get());
30             }
31         }
32     }
33 
34     // reduce
35     public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
36         Long temp = Long.MIN_VALUE;
37         long[] tops;
38 
39         /** 次方法在run中調用,在所有map以前執行一次 **/
40         protected void setup(Context context) {
41             tops = new long[5];
42         }
43 
44         // 由於每一個文件都獲得5個值,再次將這些值比對,獲得最大的
45         protected void reduce(LongWritable k2, Iterable<NullWritable> v2, Context context) throws IOException, InterruptedException {
46             long top = Long.parseLong(k2.toString());
47             tops[0] = top;
48             Arrays.sort(tops);
49         }
50 
51         /** ---此方法在所有到reduce任務結束後執行一次--- **/
52         protected void cleanup(Context context) throws IOException, InterruptedException {
53             for (int i = 0; i < tops.length; i++) {
54                 context.write(new LongWritable(tops[i]), NullWritable.get());
55             }
56         }
57     }

 

3)數量最大

 

    public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> {

        private static final IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object Key1, Text Value1, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            String[] strings = Value1.toString().split(" ");
            for (String str : strings) {
                this.word.set(str);
                context.write(this.word, one);
            }
        }
    }

    public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

        //臨時變量,保存最大數量的單詞
        private String keyer;    //注意這裏不能用Hadoop的類型,如Text 
        private IntWritable valer;    //這裏最好也是基本的java數據類型,如int
        //計數
        private Integer temp = Integer.MIN_VALUE;

        public void reduce(Text Key2, Iterable<IntWritable> Values2, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            int sum = 0;
            //統計數量
            for (IntWritable val : Values2) {
                sum += val.get();
            }
            //保存最大數量值
            if (sum > temp) {
                temp = sum;

                keyer = Key2.toString();
                valer = new IntWritable(temp);
            }
        }

        //最終輸出最大數量的單詞
        protected void cleanup(Context context) throws IOException, InterruptedException {
            context.write(new Text(keyer), valer);
        }
    }

 

6.單表關聯

 

    /*
        父 子
        子 孫
        1 2
        2 3
        A B
        B C
     */
    // map
    public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
        // 拆分原始數據
        protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
            // 按製表符拆分記錄。一行拆出兩個角色
            String[] splits = v1.toString().split(" ");
            //針對無心義的換行過濾
            if (splits.length > 1) {
                // 把「父」做爲k2;「子「加下劃線區分,做爲v2
                context.write(new Text(splits[0]), new Text("_" + splits[1]));
                
                // 把「子」做爲k2;「父」輩做爲v2。就是把原兩個單詞調換位置保存
                context.write(new Text(splits[1]), new Text(splits[0]));
            }
        }

        /**
         * 父 _子
         * 子 父
         * 
         * 子 _孫
         * 孫 子
         **/
    }
    
    /**
     * k2 v2[...]
     * 子 [父,_孫]
     **/

    // reduce
    public static class MyReducer extends Reducer<Text, Text, Text, Text> {
        // 拆分k2v2[...]數據
        protected void reduce(Text k2, Iterable<Text> v2, Context context) throws IOException, InterruptedException {
            String grandson = ""; // 「孫」 
            String grandfather = ""; // 「父」 

            // 從迭代中遍歷v2[...]
            for (Text man : v2) {
                String p = man.toString();
                System.out.println("獲得:" + p);
                // 若是單詞是如下劃線開始的
                if (p.startsWith("_")) {
                    grandson = p.substring(1);
                }
                // 若是單詞沒有下劃線起始
                else {
                    // 直接賦值給孫輩變量
                    grandfather = p;
                }
            }

            // 在獲得有效數據的狀況下
            if (grandson != "" && grandfather != "") {
                // 寫出獲得的結果。
                context.write(new Text(grandson), new Text(grandfather));
            }

            /**
             * k3=父,v3=孫
             **/
        }
    }

  

 

7.雙表關聯

 

// map
    public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
        // 拆分原始數據
        protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
            // 拆分記錄
            String[] splited = v1.toString().split(" ");
            // 若是第一列是數字(使用正則判斷),就是歌曲表。先讀入那個文件由hadoop決定
            if (splited[0].matches("^[-+]?(([0-9]+)([.]([0-9]+))?|([.]([0-9]+))?)$")) {
                String id = splited[0];
                String song = splited[1];
                //v2加兩條下劃線做爲前綴標識爲歌曲
                context.write(new Text(id), new Text("__" + song));
            }
            // 不然就是歌手錶
            else {
                String singer = splited[0];
                String id = splited[1];
                //v2-加兩條橫線做爲前綴標識爲歌手
                context.write(new Text(id), new Text("--" + singer));
            }
            /**
             * 1 __Eminem 1 --LoseYourself
             **/
        }
    }

    // reduce
    public static class MyReducer extends Reducer<Text, Text, Text, Text> {
        // 拆分k2v2[...]數據
        protected void reduce(Text k2, Iterable<Text> v2, Context context) throws IOException, InterruptedException {
            String song = ""; // 歌曲
            String singer = ""; // 歌手
            /**
             * 1, [__Eminem, --LoseYourself]
             **/
            for (Text text : v2) {
                String tmp = text.toString();

                if (tmp.startsWith("__")) {
                    // 若是是__開頭的是song
                    song = tmp.substring(2); // 從索引2開始截取字符串
                }
                if (tmp.startsWith("--")) {
                    // 若是是--開頭的是歌手
                    singer = tmp.substring(2);
                }
            }
            context.write(new Text(singer), new Text(song));
        }
        /**
         * k3=Eminem,v3=LoseYourself
         *
        
        Eminem    LoseYourself
        Alizee    LaIslaBonita
        Michael    YouAreNotAlone
        Manson    FuckFrankie

         *
         **/
    }

 

- end

威格靈博客:www.cuiweiyou.com
this

相關文章
相關標籤/搜索