轉自:http://www.cnblogs.com/liqizhou/archive/2012/05/16/2503458.htmlhtml
前之前帖子介紹,怎樣讀取文本數據源和多個數據源的合併:http://www.cnblogs.com/liqizhou/archive/2012/05/15/2501835.html
mysql
這一個博客介紹一下MapReduce怎樣讀取關係數據庫的數據,選擇的關係數據庫爲MySql,由於它是開源的軟件,因此你們用的比較多。之前上學的時候就沒有用過開源的軟件,直接用盜版,也至關與免費,且比開源好用,例如向oracle,windows7等等。如今工做了,因爲公司考慮成本的問題,因此都用成開源的,ubuntu,mysql等,本人如今支持開源,特別像hadoop這樣的東西,真的太好了,不但可使用軟件,也能夠讀到源代碼。話不說多了。sql
hadoop技術推出一首曾遭到關係數據庫研究者的挑釁和批評,認爲MapReduce不具備關係數據庫中的結構化數據存儲和處理能力。爲此,hadoop社區和研究人員作了多的努力,在hadoop0.19版支持MapReduce訪問關係數據庫,如:mysql,MySQL、PostgreSQL、Oracle 等幾個數據庫系統。數據庫
Hadoop訪問關係數據庫主要經過一下接口實現的:DBInputFormat類,包所在位置:org.apache.hadoop.mapred.lib.db 中。DBInputFormat 在 Hadoop 應用程序中經過數據庫供應商提供的 JDBC接口來與數據庫進行交互,而且可使用標準的 SQL 來讀取數據庫中的記錄。學習DBInputFormat首先必須知道二個條件。apache
在使用 DBInputFormat 以前,必須將要使用的 JDBC 驅動拷貝到分佈式系統各個節點的$HADOOP_HOME/lib/目錄下。ubuntu
MapReduce訪問關係數據庫時,大量頻繁的從MapReduce程序中查詢和讀取數據,這大大的增長了數據庫的訪問負載,所以,DBInputFormat接口僅僅適合讀取小數據量的數據,而不適合處理數據倉庫。要處理數據倉庫的方法有:利用數據庫的Dump工具將大量待分析的數據輸出爲文本,並上傳的Hdfs中進行處理,處理的方法可參考:http://www.cnblogs.com/liqizhou/archive/2012/05/15/2501835.htmlwindows
DBInputFormat 類中包含如下三個內置類oracle
protected class DBRecordReader implementsRecordReader<LongWritable, T>:用來從一張數據庫表中讀取一條條元組記錄。app
2.public static class NullDBWritable implements DBWritable,Writable:主要用來實現 DBWritable 接口。DBWritable接口要實現二個函數,第一是write,第二是readFileds,這二個函數都不難理解,一個是寫,一個是讀出全部字段。原型以下:
分佈式
public void write(PreparedStatement statement) throwsSQLException;public void readFields(ResultSet resultSet) throws SQLException;
protected static class DBInputSplit implements InputSplit:主要用來描述輸入元組集合的範圍,包括 start 和 end 兩個屬性,start 用來表示第一條記錄的索引號,end 表示最後一條記錄的索引號.
下面對怎樣使用 DBInputFormat 讀取數據庫記錄進行詳細的介紹,具體步驟以下:
DBConfiguration.configureDB (JobConf job, StringdriverClass, String dbUrl, String userName, String passwd)函數,配置JDBC 驅動,數據源,以及數據庫訪問的用戶名和密碼。MySQL 數據庫的 JDBC 的驅動爲「com.mysql.jdbc.Driver」,數據源爲「jdbc:mysql://localhost/testDB」,其中testDB爲訪問的數據庫。useName通常爲「root」,passwd是你數據庫的密碼。
DBInputFormat.setInput(JobConf job, Class<?extends DBWritable> inputClass, String tableName, String conditions,String orderBy, String... fieldNames),這個方法的參數很容易看懂,inputClass實現DBWritable接口。,string tableName表名, conditions表示查詢的條件,orderby表示排序的條件,fieldNames是字段,這至關與把sql語句拆分的結果。固然也能夠用sql語句進行重載。etInput(JobConf job, Class<?extends DBWritable> inputClass, String inputQuery, StringinputCountQuery)。
編寫MapReduce函數,包括Mapper 類、Reducer 類、輸入輸出文件格式等,而後調用JobClient.runJob(conf)。
上面講了理論,下面舉個例子:假設 MySQL 數據庫中有數據庫student,假設數據庫中的字段有「id」,「name」,「gender","number"。
第一步要實現DBwrite和write數據接口。代碼以下:
public class StudentRecord implements Writable, DBWritable{ int id; String name; String gender; String number; @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub this.id = in.readInt(); this.gender = Text.readString(in); this.name = in.readString(); this.number = in.readString(); } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeInt(this.id); Text.writeString(out,this.name); out.writeInt(this.gender); out.writeInt(this.number); } @Override public void readFields(ResultSet result) throws SQLException { // TODO Auto-generated method stub this.id = result.getInt(1); this.name = result.getString(2); this.gender = result.getString(3); this.number = result.getString(4); } @Override public void write(PreparedStatement stmt) throws SQLException{ // TODO Auto-generated method stub stmt.setInt(1, this.id); stmt.setString(2, this.name); stmt.setString(3, this.gender); stmt.setString(4, this.number); } @Override public String toString() { // TODO Auto-generated method stub return new String(this.name + " " + this.gender + " " +this.number); }
第二步,實現Map和Reduce類
public class DBAccessMapper extends MapReduceBase implements Mapper<LongWritable, TeacherRecord, LongWritable, Text> { @Override public void map(LongWritable key, TeacherRecord value, OutputCollector<LongWritable, Text> collector, Reporter reporter) throws IOException { // TODO Auto-generated method stub new collector.collect(new LongWritable(value.id), new Text(value .toString())); } }
第三步:主函數的實現,函數
public class DBAccessReader { public static void main(String[] args) throws IOException { JobConf conf = new JobConf(DBAccessReader.class); conf.setOutputKeyClass(LongWritable.class); conf.setOutputValueClass(Text.class); conf.setInputFormat(DBInputFormat.class); FileOutputFormat.setOutputPath(conf, new Path("dboutput")); DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver", "jdbc:mysql://localhost/school","root","123456"); String [] fields = {"id", "name", "gender", "number"}; DBInputFormat.setInput(conf, StudentRecord.class,"Student",null "id", fields); conf.setMapperClass(DBAccessMapper.class); conf.setReducerClass(IdentityReducer.class); JobClient.runJob(conf); } }
每每對於數據處理的結果的數據量通常不會太大,可能適合hadoop直接寫入數據庫中。hadoop提供了相應的數據庫直接輸出的計算髮結果。
DBOutFormat: 提供數據庫寫入接口。
DBRecordWriter:提供向數據庫中寫入的數據記錄的接口。
DBConfiguration:提供數據庫配置和建立連接的接口。
DBOutFormat提供一個靜態方法setOutput(job,String table,String ...filedNames);該方法的參數很容易看懂。假設要插入一個Student的數據,其代碼爲
public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); JobConf conf = new JobConf(); conf.setOutputFormat(DBOutputFormat.class); DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver", "jdbc:mysql://localhost/school","root","123456"); DBOutputFormat.setOutput(conf,"Student", 456, "liqizhou", "man", "20004154578"); JobClient.runJob(conf);