前一段時間,在上一篇博文中描述了本身抽時間在構建的徹底分佈式Hadoop環境過程當中遇到的一些問題以及構建成功後,經過Eclipse操做HDFS的時候遇到的一些問題,最近又想進一步學習學習Hadoop操做Mysql數據庫的一些知識,在這裏網上存在不少分歧,不少人可能會笑話,用那麼「笨重」的Hadoop來操做數據庫,腦子有問題吧,Hadoop的HDFS優點在於處理分佈式文件系統,這種說法沒有任何錯誤,數據庫的操做講究「安全、輕便、快捷」,用Hadoop操做徹底是不符合常理啊,那爲啥還要學習這個東西呢?其實退一步講,在以前access數據庫的應用佔必定份額的時候,不少人選擇使用文件做爲數據的倉儲,增刪查改所有是操做文件,一個文件可能就是一個數據庫或者一個數據表,那麼對於一些實時性要求不是很高且數據量比較小的操做,選擇用hadoop操做數據庫,其實說來也不是不能夠考錄,不說了,每一個人有本身的觀點,固然這個也與每一個人所在的公司的要求有關係,下面就說說本身遇到的比較惱人的一個問題:仍是classNotFound的問題:html
首先要說明的是:你的運行環境,先的明白你的代碼究竟是在服務器端仍是在本地,其次再參考不一樣的代碼進行模擬。java
參考文章:http://www.cnblogs.com/xia520pi/archive/2012/06/12/2546261.html項目的簡單結構:mysql
下面說說本地運行的時候3種classNotFount的問題sql
(1)MySql的驅動找不到,這個很容易解決,在本身的項目中引入MySql的官方驅動jar包就能夠解決了,如上圖紅色框數據庫
(2)對JDBC的Jar包處理 apache
由於程序雖然用Eclipse編譯運行但最終要提交到Hadoop集羣上,因此JDBC的jar必須放到Hadoop集羣中。有兩種方式:安全
<1>在每一個節點下的${HADOOP_HOME}/lib下添加該包,重啓集羣,通常是比較原始的方法。服務器
咱們的Hadoop安裝包在"/usr/hadoop",因此把Jar放到"/usr/hadoop/lib"下面,而後重啓,記得是Hadoop集羣中全部的節點都要放,由於執行分佈式是程序是在每一個節點本地機器上進行。app
<2>在Hadoop集羣的分佈式文件系統中建立"/lib"文件夾,並把咱們的的JDBC的jar包上傳上去,而後在主程序添加以下語句,就能保證 Hadoop集羣中全部的節點都能使用這個jar包。由於這個jar包放在了HDFS上,而不是本地系統,這個要理解清楚。分佈式
(3)關聯數據庫表的實體類找不到(本篇文章解決的重點),StudentRecord.class not found。。。。
出現此問題的源代碼以下:
1 package cn.hadoop.db; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import java.net.URI; 7 import java.sql.PreparedStatement; 8 import java.sql.ResultSet; 9 import java.sql.SQLException; 10 11 import org.apache.hadoop.filecache.DistributedCache; 12 import org.apache.hadoop.fs.FileSystem; 13 import org.apache.hadoop.fs.Path; 14 import org.apache.hadoop.io.LongWritable; 15 import org.apache.hadoop.io.Text; 16 import org.apache.hadoop.io.Writable; 17 import org.apache.hadoop.mapred.FileOutputFormat; 18 import org.apache.hadoop.mapred.JobClient; 19 import org.apache.hadoop.mapred.JobConf; 20 import org.apache.hadoop.mapred.MapReduceBase; 21 import org.apache.hadoop.mapred.Mapper; 22 import org.apache.hadoop.mapred.OutputCollector; 23 import org.apache.hadoop.mapred.Reporter; 24 import org.apache.hadoop.mapred.lib.IdentityReducer; 25 import org.apache.hadoop.mapred.lib.db.DBConfiguration; 26 import org.apache.hadoop.mapred.lib.db.DBInputFormat; 27 import org.apache.hadoop.mapred.lib.db.DBWritable; 28 29 import cn.hadoop.db.DBAccessReader.Student.DBInputMapper; 30 31 public class DBAccessReader { 32 33 public static class Student implements Writable, DBWritable{ 34 public int id; 35 public String name; 36 public String sex; 37 public int age; 38 39 public Student() { 40 41 } 42 @Override 43 public void write(PreparedStatement statement) throws SQLException { 44 statement.setInt(1, this.id); 45 statement.setString(2, this.name); 46 statement.setString(3, this.sex); 47 statement.setInt(4, this.age); 48 } 49 50 @Override 51 public void readFields(ResultSet resultSet) throws SQLException { 52 this.id = resultSet.getInt(1); 53 this.name = resultSet.getString(2); 54 this.sex = resultSet.getString(3); 55 this.age = resultSet.getInt(4); 56 } 57 58 @Override 59 public void write(DataOutput out) throws IOException { 60 out.writeInt(this.id); 61 Text.writeString(out, this.name); 62 Text.writeString(out, this.sex); 63 out.writeInt(this.age); 64 } 65 66 @Override 67 public void readFields(DataInput in) throws IOException { 68 this.id = in.readInt(); 69 this.name = Text.readString(in); 70 this.sex = Text.readString(in); 71 this.age = in.readInt(); 72 } 73 74 @Override 75 public String toString() { 76 return new String("Student [id=" + id + ", name=" + name + ", sex=" + sex 77 + ", age=" + age + "]"); 78 } 79 80 public static class DBInputMapper extends MapReduceBase implements Mapper<LongWritable, cn.hadoop.db.DBAccessReader.Student, LongWritable, Text>{ 81 82 @Override 83 public void map(LongWritable key, cn.hadoop.db.DBAccessReader.Student value, 84 OutputCollector<LongWritable, Text> collector, 85 Reporter reporter) throws IOException { 86 collector.collect(new LongWritable(value.id), new Text(value.toString())); 87 88 } 89 90 } 91 92 93 94 } 95 public static void main(String[] args) throws IOException{ 96 97 JobConf conf = new JobConf(DBAccessReader.class); 98 conf.set("mapred.job.tracker", "192.168.56.10:9001"); 99 100 FileSystem fileSystem = FileSystem.get( 101 URI.create("hdfs://192.168.56.10:9000/"), conf); 102 103 DistributedCache 104 .addFileToClassPath( 105 new Path( 106 "hdfs://192.168.56.10:9000/lib/mysql-connector-java-5.1.18-bin.jar"), 107 conf, fileSystem); 108 conf.setOutputKeyClass(LongWritable.class); 109 conf.setOutputValueClass(Text.class); 110 111 conf.setInputFormat(DBInputFormat.class); 112 113 114 115 FileOutputFormat.setOutputPath(conf, new Path( 116 "hdfs://192.168.56.10:9000/user/studentInfo")); 117 118 DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", 119 "jdbc:mysql://192.168.56.109:3306/school", "root", "1qaz2wsx"); 120 121 String[] fields = { "id", "name", "sex", "age" }; 122 123 DBInputFormat.setInput(conf, cn.hadoop.db.DBAccessReader.Student.class, "student", null, 124 "id", fields); 125 126 conf.setMapperClass(DBInputMapper.class); 127 conf.setReducerClass(IdentityReducer.class); 128 129 JobClient.runJob(conf); 130 } 131 }
運行的時候,報的錯誤以下:
錯誤很明顯,就是找不到實體類Student,但是看代碼好多遍,這個類明明在啊,爲啥會報錯找不到呢???我也迷糊了很長時間,各類嘗試都是不行,最後仍是將目標鎖定在日誌信息裏面,很明顯,這是在服務器端去找DBAccessReader這個Job的jar,明顯咱們沒有上傳,確定是找不到到,因此報錯,錯誤很明顯,就在main方法下面的這裏:
1 JobConf conf = new JobConf(DBAccessReader.class); 2 conf.set("mapred.job.tracker", "192.168.56.10:9001");
因此,修改代碼以下之後,問題獲得解決:
1 package cn.hadoop.db; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import java.net.URI; 7 import java.sql.PreparedStatement; 8 import java.sql.ResultSet; 9 import java.sql.SQLException; 10 11 import org.apache.hadoop.filecache.DistributedCache; 12 import org.apache.hadoop.fs.FileSystem; 13 import org.apache.hadoop.fs.Path; 14 import org.apache.hadoop.io.LongWritable; 15 import org.apache.hadoop.io.Text; 16 import org.apache.hadoop.io.Writable; 17 import org.apache.hadoop.mapred.FileOutputFormat; 18 import org.apache.hadoop.mapred.JobClient; 19 import org.apache.hadoop.mapred.JobConf; 20 import org.apache.hadoop.mapred.MapReduceBase; 21 import org.apache.hadoop.mapred.Mapper; 22 import org.apache.hadoop.mapred.OutputCollector; 23 import org.apache.hadoop.mapred.Reporter; 24 import org.apache.hadoop.mapred.lib.IdentityReducer; 25 import org.apache.hadoop.mapred.lib.db.DBConfiguration; 26 import org.apache.hadoop.mapred.lib.db.DBInputFormat; 27 import org.apache.hadoop.mapred.lib.db.DBWritable; 28 29 import cn.hadoop.db.DBAccessReader.Student.DBInputMapper; 30 31 public class DBAccessReader { 32 33 public static class Student implements Writable, DBWritable { 34 public int id; 35 public String name; 36 public String sex; 37 public int age; 38 39 public Student() { 40 41 } 42 43 @Override 44 public void write(PreparedStatement statement) throws SQLException { 45 statement.setInt(1, this.id); 46 statement.setString(2, this.name); 47 statement.setString(3, this.sex); 48 statement.setInt(4, this.age); 49 } 50 51 @Override 52 public void readFields(ResultSet resultSet) throws SQLException { 53 this.id = resultSet.getInt(1); 54 this.name = resultSet.getString(2); 55 this.sex = resultSet.getString(3); 56 this.age = resultSet.getInt(4); 57 } 58 59 @Override 60 public void write(DataOutput out) throws IOException { 61 out.writeInt(this.id); 62 Text.writeString(out, this.name); 63 Text.writeString(out, this.sex); 64 out.writeInt(this.age); 65 } 66 67 @Override 68 public void readFields(DataInput in) throws IOException { 69 this.id = in.readInt(); 70 this.name = Text.readString(in); 71 this.sex = Text.readString(in); 72 this.age = in.readInt(); 73 } 74 75 @Override 76 public String toString() { 77 return new String("Student [id=" + id + ", name=" + name + ", sex=" 78 + sex + ", age=" + age + "]"); 79 } 80 81 public static class DBInputMapper extends MapReduceBase 82 implements 83 Mapper<LongWritable, cn.hadoop.db.DBAccessReader.Student, LongWritable, Text> { 84 85 @Override 86 public void map(LongWritable key, 87 cn.hadoop.db.DBAccessReader.Student value, 88 OutputCollector<LongWritable, Text> collector, 89 Reporter reporter) throws IOException { 90 collector.collect(new LongWritable(value.id), 91 new Text(value.toString())); 92 93 } 94 95 } 96 97 } 98 99 public static void main(String[] args) throws IOException { 100 101 JobConf conf = new JobConf(); 102 FileSystem fileSystem = FileSystem.get( 103 URI.create("hdfs://192.168.56.10:9000/"), conf); 104 105 DistributedCache 106 .addFileToClassPath( 107 new Path( 108 "hdfs://192.168.56.10:9000/lib/mysql-connector-java-5.1.18-bin.jar"), 109 conf, fileSystem); 110 conf.setOutputKeyClass(LongWritable.class); 111 conf.setOutputValueClass(Text.class); 112 113 conf.setInputFormat(DBInputFormat.class); 114 115 FileOutputFormat.setOutputPath(conf, new Path( 116 "hdfs://192.168.56.10:9000/user/studentInfo")); 117 118 DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", 119 "jdbc:mysql://192.168.56.109:3306/school", "root", "1qaz2wsx"); 120 121 String[] fields = { "id", "name", "sex", "age" }; 122 123 DBInputFormat.setInput(conf, cn.hadoop.db.DBAccessReader.Student.class, 124 "student", null, "id", fields); 125 126 conf.setMapperClass(DBInputMapper.class); 127 conf.setReducerClass(IdentityReducer.class); 128 129 JobClient.runJob(conf); 130 } 131 }
如下是運行時打印出的日誌信息:
三月 13, 2016 5:39:57 下午 org.apache.hadoop.util.NativeCodeLoader <clinit> 警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 三月 13, 2016 5:39:57 下午 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles 警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 三月 13, 2016 5:39:57 下午 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles 警告: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). 三月 13, 2016 5:39:57 下午 org.apache.hadoop.filecache.TrackerDistributedCacheManager downloadCacheObject 信息: Creating mysql-connector-java-5.1.18-bin.jar in /tmp/hadoop-hadoop/mapred/local/archive/2605709384407216388_-2048973133_91096108/192.168.56.10/lib-work-2076365714246383853 with rwxr-xr-x 三月 13, 2016 5:39:58 下午 org.apache.hadoop.filecache.TrackerDistributedCacheManager downloadCacheObject 信息: Cached hdfs://192.168.56.10:9000/lib/mysql-connector-java-5.1.18-bin.jar as /tmp/hadoop-hadoop/mapred/local/archive/2605709384407216388_-2048973133_91096108/192.168.56.10/lib/mysql-connector-java-5.1.18-bin.jar 三月 13, 2016 5:39:58 下午 org.apache.hadoop.filecache.TrackerDistributedCacheManager localizePublicCacheObject 信息: Cached hdfs://192.168.56.10:9000/lib/mysql-connector-java-5.1.18-bin.jar as /tmp/hadoop-hadoop/mapred/local/archive/2605709384407216388_-2048973133_91096108/192.168.56.10/lib/mysql-connector-java-5.1.18-bin.jar 三月 13, 2016 5:39:58 下午 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息: Running job: job_local_0001 三月 13, 2016 5:39:59 下午 org.apache.hadoop.mapred.Task initialize 信息: Using ResourceCalculatorPlugin : null 三月 13, 2016 5:39:59 下午 org.apache.hadoop.mapred.MapTask runOldMapper 信息: numReduceTasks: 1 三月 13, 2016 5:39:59 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 信息: io.sort.mb = 100 三月 13, 2016 5:39:59 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 信息: data buffer = 79691776/99614720 三月 13, 2016 5:39:59 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 信息: record buffer = 262144/327680 三月 13, 2016 5:39:59 下午 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息: map 0% reduce 0% 三月 13, 2016 5:40:04 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush 信息: Starting flush of map output 三月 13, 2016 5:40:04 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill 信息: Finished spill 0 三月 13, 2016 5:40:04 下午 org.apache.hadoop.mapred.Task done 信息: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting 三月 13, 2016 5:40:04 下午 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: 三月 13, 2016 5:40:04 下午 org.apache.hadoop.mapred.Task sendDone 信息: Task 'attempt_local_0001_m_000000_0' done. 三月 13, 2016 5:40:05 下午 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息: map 100% reduce 0% 三月 13, 2016 5:40:05 下午 org.apache.hadoop.mapred.Task initialize 信息: Using ResourceCalculatorPlugin : null 三月 13, 2016 5:40:05 下午 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: 三月 13, 2016 5:40:05 下午 org.apache.hadoop.mapred.Merger$MergeQueue merge 信息: Merging 1 sorted segments 三月 13, 2016 5:40:05 下午 org.apache.hadoop.mapred.Merger$MergeQueue merge 信息: Down to the last merge-pass, with 1 segments left of total size: 542 bytes 三月 13, 2016 5:40:05 下午 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: 三月 13, 2016 5:40:06 下午 org.apache.hadoop.mapred.Task done 信息: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting 三月 13, 2016 5:40:06 下午 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: 三月 13, 2016 5:40:06 下午 org.apache.hadoop.mapred.Task commit 信息: Task attempt_local_0001_r_000000_0 is allowed to commit now 三月 13, 2016 5:40:06 下午 org.apache.hadoop.mapred.FileOutputCommitter commitTask 信息: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.56.10:9000/user/studentInfo 三月 13, 2016 5:40:08 下午 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: reduce > reduce 三月 13, 2016 5:40:08 下午 org.apache.hadoop.mapred.Task sendDone 信息: Task 'attempt_local_0001_r_000000_0' done. 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息: map 100% reduce 100% 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息: Job complete: job_local_0001 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息: Counters: 20 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息: File Input Format Counters 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息: Bytes Read=0 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息: File Output Format Counters 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息: Bytes Written=513 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息: FileSystemCounters 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息: FILE_BYTES_READ=1592914 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息: HDFS_BYTES_READ=1579770 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息: FILE_BYTES_WRITTEN=3270914 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息: HDFS_BYTES_WRITTEN=513 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息: Map-Reduce Framework 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息: Reduce input groups=9 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息: Map output materialized bytes=546 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息: Combine output records=0 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息: Map input records=9 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息: Reduce shuffle bytes=0 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息: Reduce output records=9 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息: Spilled Records=18 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息: Map output bytes=522 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息: Total committed heap usage (bytes)=231874560 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息: Map input bytes=9 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息: Combine input records=0 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息: Map output records=9 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息: SPLIT_RAW_BYTES=75 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息: Reduce input records=9
這是運行的結果:
到此,Hadoop鏈接數據庫讀取數據表輸出的操做完成了,固然這就是一個簡單的演示,實際項目中不會用到,只是能夠幫咱們熟悉熟悉Hadoop操做數據庫的流程,下面給出
Hadoop處理文件之後,將結果寫入數據庫的示例代碼,和上面的差很少:
1 package cn.hadoop.db; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import java.net.URI; 7 import java.sql.PreparedStatement; 8 import java.sql.ResultSet; 9 import java.sql.SQLException; 10 import java.util.Iterator; 11 import java.util.StringTokenizer; 12 13 import org.apache.hadoop.filecache.DistributedCache; 14 import org.apache.hadoop.fs.FileSystem; 15 import org.apache.hadoop.fs.Path; 16 import org.apache.hadoop.io.IntWritable; 17 import org.apache.hadoop.io.Text; 18 import org.apache.hadoop.io.Writable; 19 import org.apache.hadoop.mapred.FileInputFormat; 20 import org.apache.hadoop.mapred.JobClient; 21 import org.apache.hadoop.mapred.JobConf; 22 import org.apache.hadoop.mapred.MapReduceBase; 23 import org.apache.hadoop.mapred.Mapper; 24 import org.apache.hadoop.mapred.OutputCollector; 25 import org.apache.hadoop.mapred.Reducer; 26 import org.apache.hadoop.mapred.Reporter; 27 import org.apache.hadoop.mapred.TextInputFormat; 28 import org.apache.hadoop.mapred.lib.db.DBConfiguration; 29 import org.apache.hadoop.mapred.lib.db.DBOutputFormat; 30 import org.apache.hadoop.mapred.lib.db.DBWritable; 31 32 public class WriteDB { 33 34 public static void main(String[] args) throws IOException { 35 JobConf conf = new JobConf(); 36 37 FileSystem fileSystem = FileSystem.get( 38 URI.create("hdfs://192.168.56.10:9000/"), conf); 39 DistributedCache 40 .addFileToClassPath( 41 new Path( 42 "hdfs://192.168.56.10:9000/lib/mysql-connector-java-5.1.18-bin.jar"), 43 conf, fileSystem); 44 conf.setInputFormat(TextInputFormat.class); 45 conf.setOutputFormat(DBOutputFormat.class); 46 47 conf.setOutputKeyClass(Text.class); 48 conf.setOutputValueClass(IntWritable.class); 49 50 conf.setMapperClass(Map.class); 51 conf.setCombinerClass(Combine.class); 52 conf.setReducerClass(Reduce.class); 53 54 FileInputFormat.setInputPaths(conf, new Path( 55 "hdfs://192.168.56.10:9000/user/db_in")); 56 57 DBConfiguration 58 .configureDB( 59 conf, 60 "com.mysql.jdbc.Driver", 61 "jdbc:mysql://192.168.56.109:3306/school?characterEncoding=UTF-8", 62 "root", "1qaz2wsx"); 63 64 String[] fields = { "word", "number" }; 65 66 DBOutputFormat.setOutput(conf, "wordcount", fields); 67 JobClient.runJob(conf); 68 69 } 70 } 71 72 class Map extends MapReduceBase implements 73 Mapper<Object, Text, Text, IntWritable> { 74 75 private final static IntWritable one = new IntWritable(1); 76 77 private Text word = new Text(); 78 79 @Override 80 public void map(Object key, Text value, 81 OutputCollector<Text, IntWritable> output, Reporter reporter) 82 throws IOException { 83 String line = value.toString(); 84 StringTokenizer tokenizer = new StringTokenizer(line); 85 while (tokenizer.hasMoreTokens()) { 86 word.set(tokenizer.nextToken()); 87 output.collect(word, one); 88 } 89 } 90 91 } 92 93 class Combine extends MapReduceBase implements 94 Reducer<Text, IntWritable, Text, IntWritable> { 95 96 @Override 97 public void reduce(Text key, Iterator<IntWritable> values, 98 OutputCollector<Text, IntWritable> output, Reporter reporter) 99 throws IOException { 100 int sum = 0; 101 while (values.hasNext()) { 102 sum += values.next().get(); 103 } 104 output.collect(key, new IntWritable(sum)); 105 } 106 107 } 108 109 class Reduce extends MapReduceBase implements 110 Reducer<Text, IntWritable, WordRecord, Text> { 111 112 @Override 113 public void reduce(Text key, Iterator<IntWritable> values, 114 OutputCollector<WordRecord, Text> output, Reporter reporter) 115 throws IOException { 116 int sum = 0; 117 while (values.hasNext()) { 118 sum += values.next().get(); 119 } 120 WordRecord wordcount = new WordRecord(); 121 wordcount.word = key.toString(); 122 wordcount.number = sum; 123 output.collect(wordcount, new Text()); 124 } 125 126 } 127 128 class WordRecord implements Writable, DBWritable { 129 130 public String word; 131 public int number; 132 133 @Override 134 public void write(PreparedStatement statement) throws SQLException { 135 statement.setString(1, this.word); 136 statement.setInt(2, this.number); 137 } 138 139 @Override 140 public void readFields(ResultSet resultSet) throws SQLException { 141 this.word = resultSet.getString(1); 142 this.number = resultSet.getInt(2); 143 } 144 145 @Override 146 public void write(DataOutput out) throws IOException { 147 Text.writeString(out, this.word); 148 out.writeInt(this.number); 149 } 150 151 @Override 152 public void readFields(DataInput in) throws IOException { 153 this.word = Text.readString(in); 154 this.number = in.readInt(); 155 } 156 157 }
運行打印的日誌信息以下:
三月 13, 2016 6:09:31 下午 org.apache.hadoop.util.NativeCodeLoader <clinit> 警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 三月 13, 2016 6:09:31 下午 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles 警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 三月 13, 2016 6:09:31 下午 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles 警告: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). 三月 13, 2016 6:09:31 下午 org.apache.hadoop.mapred.FileInputFormat listStatus 信息: Total input paths to process : 2 三月 13, 2016 6:09:32 下午 org.apache.hadoop.filecache.TrackerDistributedCacheManager downloadCacheObject 信息: Creating mysql-connector-java-5.1.18-bin.jar in /tmp/hadoop-hadoop/mapred/local/archive/-8205516116475251282_-2048973133_91096108/192.168.56.10/lib-work-1371358416408211818 with rwxr-xr-x 三月 13, 2016 6:09:33 下午 org.apache.hadoop.filecache.TrackerDistributedCacheManager downloadCacheObject 信息: Cached hdfs://192.168.56.10:9000/lib/mysql-connector-java-5.1.18-bin.jar as /tmp/hadoop-hadoop/mapred/local/archive/-8205516116475251282_-2048973133_91096108/192.168.56.10/lib/mysql-connector-java-5.1.18-bin.jar 三月 13, 2016 6:09:33 下午 org.apache.hadoop.filecache.TrackerDistributedCacheManager localizePublicCacheObject 信息: Cached hdfs://192.168.56.10:9000/lib/mysql-connector-java-5.1.18-bin.jar as /tmp/hadoop-hadoop/mapred/local/archive/-8205516116475251282_-2048973133_91096108/192.168.56.10/lib/mysql-connector-java-5.1.18-bin.jar 三月 13, 2016 6:09:33 下午 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息: Running job: job_local_0001 三月 13, 2016 6:09:33 下午 org.apache.hadoop.mapred.Task initialize 信息: Using ResourceCalculatorPlugin : null 三月 13, 2016 6:09:33 下午 org.apache.hadoop.mapred.MapTask runOldMapper 信息: numReduceTasks: 1 三月 13, 2016 6:09:33 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 信息: io.sort.mb = 100 三月 13, 2016 6:09:34 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 信息: data buffer = 79691776/99614720 三月 13, 2016 6:09:34 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 信息: record buffer = 262144/327680 三月 13, 2016 6:09:34 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush 信息: Starting flush of map output 三月 13, 2016 6:09:34 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill 信息: Finished spill 0 三月 13, 2016 6:09:34 下午 org.apache.hadoop.mapred.Task done 信息: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting 三月 13, 2016 6:09:34 下午 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息: map 0% reduce 0% 三月 13, 2016 6:09:36 下午 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: hdfs://192.168.56.10:9000/user/db_in/file2.txt:0+41 三月 13, 2016 6:09:36 下午 org.apache.hadoop.mapred.Task sendDone 信息: Task 'attempt_local_0001_m_000000_0' done. 三月 13, 2016 6:09:36 下午 org.apache.hadoop.mapred.Task initialize 信息: Using ResourceCalculatorPlugin : null 三月 13, 2016 6:09:36 下午 org.apache.hadoop.mapred.MapTask runOldMapper 信息: numReduceTasks: 1 三月 13, 2016 6:09:36 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 信息: io.sort.mb = 100 三月 13, 2016 6:09:36 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 信息: data buffer = 79691776/99614720 三月 13, 2016 6:09:36 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 信息: record buffer = 262144/327680 三月 13, 2016 6:09:36 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush 信息: Starting flush of map output 三月 13, 2016 6:09:36 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill 信息: Finished spill 0 三月 13, 2016 6:09:36 下午 org.apache.hadoop.mapred.Task done 信息: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting 三月 13, 2016 6:09:37 下午 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息: map 100% reduce 0% 三月 13, 2016 6:09:39 下午 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: hdfs://192.168.56.10:9000/user/db_in/file1.txt:0+24 三月 13, 2016 6:09:39 下午 org.apache.hadoop.mapred.Task sendDone 信息: Task 'attempt_local_0001_m_000001_0' done. 三月 13, 2016 6:09:39 下午 org.apache.hadoop.mapred.Task initialize 信息: Using ResourceCalculatorPlugin : null 三月 13, 2016 6:09:39 下午 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: 三月 13, 2016 6:09:39 下午 org.apache.hadoop.mapred.Merger$MergeQueue merge 信息: Merging 2 sorted segments 三月 13, 2016 6:09:39 下午 org.apache.hadoop.mapred.Merger$MergeQueue merge 信息: Down to the last merge-pass, with 2 segments left of total size: 116 bytes 三月 13, 2016 6:09:39 下午 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: 三月 13, 2016 6:09:41 下午 org.apache.hadoop.mapred.Task done 信息: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting 三月 13, 2016 6:09:42 下午 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: reduce > reduce 三月 13, 2016 6:09:42 下午 org.apache.hadoop.mapred.Task sendDone 信息: Task 'attempt_local_0001_r_000000_0' done. 三月 13, 2016 6:09:42 下午 org.apache.hadoop.mapred.FileOutputCommitter cleanupJob 警告: Output path is null in cleanup 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息: map 100% reduce 100% 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息: Job complete: job_local_0001 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息: Counters: 19 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息: File Input Format Counters 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息: Bytes Read=65 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息: File Output Format Counters 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息: Bytes Written=0 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息: FileSystemCounters 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息: FILE_BYTES_READ=2389740 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息: HDFS_BYTES_READ=2369826 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息: FILE_BYTES_WRITTEN=4905883 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息: Map-Reduce Framework 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息: Reduce input groups=7 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息: Map output materialized bytes=124 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息: Combine output records=9 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息: Map input records=5 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息: Reduce shuffle bytes=0 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息: Reduce output records=7 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息: Spilled Records=18 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息: Map output bytes=104 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息: Total committed heap usage (bytes)=482291712 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息: Map input bytes=65 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息: Combine input records=10 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息: Map output records=10 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息: SPLIT_RAW_BYTES=198 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息: Reduce input records=9
數據庫中的結果以下:
如下代碼都是本人親自測試和運行過的,hadoop的版本和服務器環境信息請參看上一篇博文。