這個和老API讀取和寫入數據庫的步驟差很少,只須要使用新式的API便可。可是要注意幾個細節性問題。先看一下我修改的那個實例代碼。這個代碼是Hadoop自帶的實例代碼,只不過我把它改爲了用新API操做而已。 java
DBConfiguration.configureDB(getConf(), driverClassName, url ,USER_NAME, PASSWORD); Job job = new Job(getConf(),"hah");1. 這兩句的位置是須要注意的地方:其次序千萬不要顛倒。由於
DBConfiguration.configureDB修改的是conf(也就是Configuation conf = getConf())中的內容,而咱們生成Job實例時,須要使用該conf。
若是二者交換,就會出現新創建的Job實例沒法使用conf中的所有配置信息,從而出現IOException異常。 mysql
2. sql
DBOutputFormat.setOutput(job, "Pageview", "url", "pageview");
其中Pageview是表的名字,而url和pageview是表中字段的名字(此處必定要和數據庫表中的字段一致); 數據庫
3. 數據庫的數據寫入寫出的實體類要繼承Writale 和 DBWritable藉口 express
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * This is a demonstrative program, which uses DBInputFormat for reading * the input data from a database, and DBOutputFormat for writing the data * to the database. * <br> * The Program first creates the necessary tables, populates the input table * and runs the mapred job. * <br> * The input data is a mini access log, with a <code><url,referrer,time> * </code> schema.The output is the number of pageviews of each url in the log, * having the schema <code><url,pageview></code>. * * When called with no arguments the program starts a local HSQLDB server, and * uses this database for storing/retrieving the data. * * But in this program we use MySQL Database , so the HSQLDB isn't used; * * 鏈接數據庫,創建數據庫表,想數據庫表中插入數據,取出數據,存放到其他的表中 */ public class DBCountPageView1 extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(DBCountPageView1.class); private Connection connection; private boolean initialized = false; private static final String[] AccessFieldNames = {"url", "referrer", "time"}; private static final String[] PageviewFieldNames = {"url", "pageview"}; private static final String DB_URL = "jdbc:mysql://"; private static final String DRIVER_CLASS = "com.mysql.jdbc.Driver"; private static final String USER_NAME = "test"; private static final String PASSWORD = "hcc199056"; private void createConnection(String driverClassName , String url) throws Exception { Class.forName(driverClassName); connection = DriverManager.getConnection(url,USER_NAME,PASSWORD); connection.setAutoCommit(false); } private void shutdown() { try { connection.commit(); connection.close(); }catch (Throwable ex) { LOG.warn("Exception occurred while closing connection :" + StringUtils.stringifyException(ex)); } } private void initialize(String driverClassName, String url) throws Exception { if(!this.initialized) { // if(driverClassName.equals(DRIVER_CLASS)) { // startHsqldbServer(); // } createConnection(driverClassName, url); dropTables(); // 建立數據庫表 createTables(); populateAccess(); this.initialized = true; } } private void dropTables() { String dropAccess = "DROP TABLE Access"; String dropPageview = "DROP TABLE Pageview"; try { Statement st = connection.createStatement(); st.executeUpdate(dropAccess); st.executeUpdate(dropPageview); connection.commit(); st.close(); }catch (SQLException ex) { //ignore } } private void createTables() throws SQLException { String createAccess = "CREATE TABLE " + "Access(url VARCHAR(100) NOT NULL," + " referrer VARCHAR(100)," + " time BIGINT NOT NULL, " + " PRIMARY KEY (url, time))"; String createPageview = "CREATE TABLE " + "Pageview(url VARCHAR(100) NOT NULL," + " pageview BIGINT NOT NULL, " + " PRIMARY KEY (url))"; Statement st = connection.createStatement(); try { st.executeUpdate(createAccess); st.executeUpdate(createPageview); connection.commit(); } finally { st.close(); } } /** * Populates the Access table with generated records. *向數據庫表中插入數據 */ private void populateAccess() throws SQLException { PreparedStatement statement = null ; try { statement = connection.prepareStatement( "INSERT INTO Access(url, referrer, time)" + " VALUES (?, ?, ?)"); Random random = new Random(); int time = random.nextInt(50) + 50; final int PROBABILITY_PRECISION = 100; // 1 / 100 final int NEW_PAGE_PROBABILITY = 15; // 15 / 100 //Pages in the site : String[] pages = {"/a", "/b", "/c", "/d", "/e", "/f", "/g", "/h", "/i", "/j"}; //linkMatrix[i] is the array of pages(indexes) that page_i links to. int[][] linkMatrix = {{1,5,7}, {0,7,4,6,}, {0,1,7,8}, {0,2,4,6,7,9}, {0,1}, {0,3,5,9}, {0}, {0,1,3}, {0,2,6}, {0,2,6}}; //a mini model of user browsing a la pagerank int currentPage = random.nextInt(pages.length); String referrer = null; for(int i=0; i<time; i++) { statement.setString(1, pages[currentPage]); statement.setString(2, referrer); statement.setLong(3, i); statement.execute(); int action = random.nextInt(PROBABILITY_PRECISION); //go to a new page with probability NEW_PAGE_PROBABILITY / PROBABILITY_PRECISION if(action < NEW_PAGE_PROBABILITY) { currentPage = random.nextInt(pages.length); // a random page referrer = null; } else { referrer = pages[currentPage]; action = random.nextInt(linkMatrix[currentPage].length); currentPage = linkMatrix[currentPage][action]; } } connection.commit(); }catch (SQLException ex) { connection.rollback(); throw ex; } finally { if(statement != null) { statement.close(); } } } /**Verifies the results are correct */ private boolean verify() throws SQLException { //check total num pageview String countAccessQuery = "SELECT COUNT(*) FROM Access"; String sumPageviewQuery = "SELECT SUM(pageview) FROM Pageview"; Statement st = null; ResultSet rs = null; try { st = connection.createStatement(); rs = st.executeQuery(countAccessQuery); rs.next(); long totalPageview = rs.getLong(1); rs = st.executeQuery(sumPageviewQuery); rs.next(); long sumPageview = rs.getLong(1); LOG.info("totalPageview=" + totalPageview); LOG.info("sumPageview=" + sumPageview); return totalPageview == sumPageview && totalPageview != 0; }finally { if(st != null) st.close(); if(rs != null) rs.close(); } } /** Holds a <url, referrer, time > tuple */ static class AccessRecord implements Writable, DBWritable { String url; String referrer; long time; @Override public void readFields(DataInput in) throws IOException { this.url = Text.readString(in); this.referrer = Text.readString(in); this.time = in.readLong(); } @Override public void write(DataOutput out) throws IOException { Text.writeString(out, url); Text.writeString(out, referrer); out.writeLong(time); } @Override public void readFields(ResultSet resultSet) throws SQLException { this.url = resultSet.getString(1); this.referrer = resultSet.getString(2); this.time = resultSet.getLong(3); } @Override public void write(PreparedStatement statement) throws SQLException { statement.setString(1, url); statement.setString(2, referrer); statement.setLong(3, time); } } /** Holds a <url, pageview > tuple */ static class PageviewRecord implements Writable, DBWritable { String url; long pageview; public PageviewRecord(String url, long pageview) { this.url = url; this.pageview = pageview; } @Override public void readFields(DataInput in) throws IOException { this.url = Text.readString(in); this.pageview = in.readLong(); } @Override public void write(DataOutput out) throws IOException { Text.writeString(out, url); out.writeLong(pageview); } @Override public void readFields(ResultSet resultSet) throws SQLException { this.url = resultSet.getString(1); this.pageview = resultSet.getLong(2); } @Override public void write(PreparedStatement statement) throws SQLException { statement.setString(1, url); statement.setLong(2, pageview); } @Override public String toString() { return url + " " + pageview; } } /** * Mapper extracts URLs from the AccessRecord (tuples from db), * and emits a <url,1> pair for each access record. */ static class PageviewMapper extends Mapper<LongWritable, AccessRecord, Text, LongWritable> { LongWritable ONE = new LongWritable(1L); @Override protected void map(LongWritable key, AccessRecord value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub Text oKey = new Text(value.url); context.write(oKey, ONE); } } /** * Reducer sums up the pageviews and emits a PageviewRecord, * which will correspond to one tuple in the db. */ static class PageviewReducer extends Reducer<Text, LongWritable, PageviewRecord, NullWritable> { NullWritable n = NullWritable.get(); @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub long sum = 0L; for(LongWritable value : values){ sum += value.get(); } context.write(new PageviewRecord(key.toString(), sum), n); } } @Override //Usage DBCountPageView [driverClass dburl] public int run(String[] args) throws Exception { String driverClassName = DRIVER_CLASS; String url = DB_URL; if(args.length > 1) { driverClassName = args[0]; url = args[1]; } //創建鏈接並建立數據庫表 initialize(driverClassName, url); DBConfiguration.configureDB(getConf(), driverClassName, url ,USER_NAME, PASSWORD); Job job = new Job(getConf(),"hah"); job.setMapperClass(PageviewMapper.class); job.setReducerClass(PageviewReducer.class); //讀取Access表中昂的數據 DBInputFormat.setInput(job, AccessRecord.class, "access" , null, "url", "url", "referrer", "time"); //向Pageview中寫入數據 DBOutputFormat.setOutput(job, "Pageview", "url", "pageview"); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(PageviewRecord.class); job.setOutputValueClass(NullWritable.class); int reval = -1; // JobClient.runJob(job); reval = (job.waitForCompletion(true) ? 0 : 1); boolean correct = verify(); if(!correct) { throw new RuntimeException("Evaluation was not correct!"); } return reval; } public static void main(String[] args) throws Exception { int ret = ToolRunner.run(new Configuration(),new DBCountPageView1(), args); System.exit(ret); } }