本身寫Hadoop的DBOutputFormat 類

import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 
 * 項目名稱:aggressiondatafilter 類名稱:Test 類描述: 建立人:黃傳聰 建立時間:2013-12-9 下午10:21:35
 * 修改人: 修改時間: 修改備註:
 * 
 * @version
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Test<K extends DBWritable, V> extends OutputFormat<K, V> {

	private static final String DB_URL = "jdbc:mysql://10.5.82.224:3306/ale";
	private static final String DRIVER_CLASS = "com.mysql.jdbc.Driver";
	private static final String USER_NAME = "test";
	private static final String PASSWORD = "hcc199056";
//	private static final Log LOG = LogFactory
//			.getLog(DBOutputFormatForAgg.class);
	
	private static  Connection connection;

	@Override
	public void checkOutputSpecs(JobContext context) throws IOException,
			InterruptedException {
		// TODO Auto-generated method stub

	}

	@Override
	public OutputCommitter getOutputCommitter(TaskAttemptContext context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return new FileOutputCommitter(FileOutputFormat.getOutputPath(context),
                context);
	}

	@Override
	public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		TestRecordWriter<K, V> writer = null;
		try {
//			DBConfiguration dbConf = new DBConfiguration(context.getConfiguration());
			
			writer = new TestRecordWriter<K,V>(DRIVER_CLASS, DB_URL, USER_NAME, PASSWORD);
//			writer.write(K, V);
		} catch (SQLException | ClassNotFoundException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} 
		return writer;
	}
	

}

和重寫其餘的Format類型同樣,須要繼承OutputFormat基類。
每個Format都有一個對應的
RecordWriter
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

/**
 *
 * 項目名稱:aggressiondatafilter
 * 類名稱:TestRecordWriter
 * 類描述:須要寫入數據庫 數據
 * 建立人:黃傳聰
 * 建立時間:2013-12-9 下午10:27:56
 * 修改人:
 * 修改時間:
 * 修改備註:
 * @version
 * @param <K>
 */
@InterfaceStability.Evolving
public class TestRecordWriter<K, V> extends RecordWriter<K, V>{

	private Connection connection;
    private PreparedStatement ps;
    private ResultSet rs = null;
    private String url;
    private String userName;
    private String password;
	/**
	
	  * TestRecordWriter
	  * 方法名稱: 
	  * 方法描述:構造函數,用於獲取connection 和 statement
	  * 參數:
	  * @param connection
	  * @param statement
	 * @throws SQLException 
	 * @throws ClassNotFoundException 
	  */
	public TestRecordWriter(String driveClass, String url, String userName, String password) throws SQLException, ClassNotFoundException {
		Class.forName(driveClass);
		this.url = url;
		this.userName = userName;
		this.password = password;
	}


	@Override
	public void close(TaskAttemptContext context) throws IOException,
			InterruptedException {
		// TODO Auto-generated method stub
		
	}

	@Override
	public void write(K key, V value) throws IOException, InterruptedException {
		// TODO Auto-generated method stub
//		StoreEPCData dataKey = (StoreEPCData) key;
//		BussinessInfo bussinessInfo = (BussinessInfo) value;
		try {
			connection = DriverManager.getConnection(url, userName, password);
			connection.setAutoCommit(false);
		} catch (SQLException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}
		String sql = "insert into t_aggression_event (parent_id, action, biz_step, disposion, biz_location,read_point,biz_transtration,flag) values (?,?,?,?,?,?,?,?)";
		//集合事件id 
		long retVal = -1;
		try {
			ps = connection
					.prepareStatement(sql , Statement.RETURN_GENERATED_KEYS);
//			ps.setString(1, bussinessInfo.getParentId());
//			ps.setString(2, bussinessInfo.getAction());
//			ps.setInt(3, bussinessInfo.getBizStep());
//			ps.setInt(4, bussinessInfo.getDisposion());
//			ps.setInt(5, bussinessInfo.getBizLocation());
//			ps.setString(6, dataKey.getReadPoint());
//			ps.setInt(7, bussinessInfo.getBizLocation());
//			ps.setBoolean(8, false);
			ps.setString(1, "1");
			ps.setString(2, "1");
			ps.setInt(3, 1);
			ps.setInt(4, 1);
			ps.setInt(5, 1);
			ps.setString(6, "1");
			ps.setInt(7, 1);
			ps.setBoolean(8, false);
			ps.executeUpdate();
			
			
			if((rs=ps.getGeneratedKeys()).next()){
				retVal = rs.getLong(1);
			}
			if(retVal < 0){
				throw new SQLException("事件Id未獲取到,集合事件信息未插入");
			}
			//插入訂閱報告信息
			sql = "insert into t_aggression_event_subscribe (event_id) values (?)" ;
			ps = connection.prepareStatement(sql);
			ps.setLong(1, retVal);
			ps.executeUpdate();
			
			//插入集合事件標籤信息
			sql = "insert into t_aggression_event_epc (epc, read_point, event_id, base_reader_name) values (?, ?, ?, ?)";
//			ps = connection.prepareStatement(sql);
//			ps.setString(1, dataKey.getEpcData());
//			ps.setString(2, dataKey.getReadPoint());
//			ps.setLong(3, retVal);
//			ps.setString(4, dataKey.getBaseReader());
			ps = connection.prepareStatement(sql);
			ps.setString(1, "1");
			ps.setString(2, "1");
			ps.setLong(3, 1);
			ps.setString(4, "1");
			ps.executeUpdate();
			connection.commit();
		} catch (SQLException ex) {
			try {
				connection.rollback();
			} catch (SQLException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} 
		}finally{
			
			if(rs != null){
				try {
					rs.close();
				} catch (SQLException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			
			if(ps != null){
				try {
					ps.close();
				} catch (SQLException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				
			}
			
			if(connection != null){
				try {
					connection.close();
				} catch (SQLException  e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
		
	}

}
看DBOutputFormat的源碼也是,就是在RecordWriter中進行對數據庫的寫操做。 在Main函數中:job.setOutputFormatClass(Test.class);就能夠了。(由於個人數據庫的鏈接地址和用戶名,密碼都是寫死的)
相關文章
相關標籤/搜索