import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
*
* 項目名稱:aggressiondatafilter 類名稱:Test 類描述: 建立人:黃傳聰 建立時間:2013-12-9 下午10:21:35
* 修改人: 修改時間: 修改備註:
*
* @version
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Test<K extends DBWritable, V> extends OutputFormat<K, V> {
private static final String DB_URL = "jdbc:mysql://10.5.82.224:3306/ale";
private static final String DRIVER_CLASS = "com.mysql.jdbc.Driver";
private static final String USER_NAME = "test";
private static final String PASSWORD = "hcc199056";
// private static final Log LOG = LogFactory
// .getLog(DBOutputFormatForAgg.class);
private static Connection connection;
@Override
public void checkOutputSpecs(JobContext context) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
return new FileOutputCommitter(FileOutputFormat.getOutputPath(context),
context);
}
@Override
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
TestRecordWriter<K, V> writer = null;
try {
// DBConfiguration dbConf = new DBConfiguration(context.getConfiguration());
writer = new TestRecordWriter<K,V>(DRIVER_CLASS, DB_URL, USER_NAME, PASSWORD);
// writer.write(K, V);
} catch (SQLException | ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return writer;
}
}
和重寫其餘的Format類型同樣,須要繼承OutputFormat基類。
每個Format都有一個對應的
RecordWriter
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
*
* 項目名稱:aggressiondatafilter
* 類名稱:TestRecordWriter
* 類描述:須要寫入數據庫 數據
* 建立人:黃傳聰
* 建立時間:2013-12-9 下午10:27:56
* 修改人:
* 修改時間:
* 修改備註:
* @version
* @param <K>
*/
@InterfaceStability.Evolving
public class TestRecordWriter<K, V> extends RecordWriter<K, V>{
private Connection connection;
private PreparedStatement ps;
private ResultSet rs = null;
private String url;
private String userName;
private String password;
/**
* TestRecordWriter
* 方法名稱:
* 方法描述:構造函數,用於獲取connection 和 statement
* 參數:
* @param connection
* @param statement
* @throws SQLException
* @throws ClassNotFoundException
*/
public TestRecordWriter(String driveClass, String url, String userName, String password) throws SQLException, ClassNotFoundException {
Class.forName(driveClass);
this.url = url;
this.userName = userName;
this.password = password;
}
@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
}
@Override
public void write(K key, V value) throws IOException, InterruptedException {
// TODO Auto-generated method stub
// StoreEPCData dataKey = (StoreEPCData) key;
// BussinessInfo bussinessInfo = (BussinessInfo) value;
try {
connection = DriverManager.getConnection(url, userName, password);
connection.setAutoCommit(false);
} catch (SQLException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
String sql = "insert into t_aggression_event (parent_id, action, biz_step, disposion, biz_location,read_point,biz_transtration,flag) values (?,?,?,?,?,?,?,?)";
//集合事件id
long retVal = -1;
try {
ps = connection
.prepareStatement(sql , Statement.RETURN_GENERATED_KEYS);
// ps.setString(1, bussinessInfo.getParentId());
// ps.setString(2, bussinessInfo.getAction());
// ps.setInt(3, bussinessInfo.getBizStep());
// ps.setInt(4, bussinessInfo.getDisposion());
// ps.setInt(5, bussinessInfo.getBizLocation());
// ps.setString(6, dataKey.getReadPoint());
// ps.setInt(7, bussinessInfo.getBizLocation());
// ps.setBoolean(8, false);
ps.setString(1, "1");
ps.setString(2, "1");
ps.setInt(3, 1);
ps.setInt(4, 1);
ps.setInt(5, 1);
ps.setString(6, "1");
ps.setInt(7, 1);
ps.setBoolean(8, false);
ps.executeUpdate();
if((rs=ps.getGeneratedKeys()).next()){
retVal = rs.getLong(1);
}
if(retVal < 0){
throw new SQLException("事件Id未獲取到,集合事件信息未插入");
}
//插入訂閱報告信息
sql = "insert into t_aggression_event_subscribe (event_id) values (?)" ;
ps = connection.prepareStatement(sql);
ps.setLong(1, retVal);
ps.executeUpdate();
//插入集合事件標籤信息
sql = "insert into t_aggression_event_epc (epc, read_point, event_id, base_reader_name) values (?, ?, ?, ?)";
// ps = connection.prepareStatement(sql);
// ps.setString(1, dataKey.getEpcData());
// ps.setString(2, dataKey.getReadPoint());
// ps.setLong(3, retVal);
// ps.setString(4, dataKey.getBaseReader());
ps = connection.prepareStatement(sql);
ps.setString(1, "1");
ps.setString(2, "1");
ps.setLong(3, 1);
ps.setString(4, "1");
ps.executeUpdate();
connection.commit();
} catch (SQLException ex) {
try {
connection.rollback();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}finally{
if(rs != null){
try {
rs.close();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if(ps != null){
try {
ps.close();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if(connection != null){
try {
connection.close();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
看DBOutputFormat的源碼也是,就是在RecordWriter中進行對數據庫的寫操做。
在Main函數中:job.setOutputFormatClass(Test.class);就能夠了。(由於個人數據庫的鏈接地址和用戶名,密碼都是寫死的)