Mapper讀取HBase數據
package MapReduce;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import java.io.IOException;
public class CallMapper extends TableMapper<phoneInfoDBWritable,phoneInfoDBWritable>{
//將log的caller,callee,time,dur提取出來,至關於將每一行數據讀取出來放入到 phoneInfo 對象中。
private phoneInfo pp = new phoneInfo();
private phoneInfoDBWritable pDB = null;
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
//獲取rowkey
String rowkey = new String(key.get());
//獲取一行數據
Cell[] cells = value.rawCells();
// 獲取的數據,通話時長,日期
String caller = "";
String callee = "";
String time = "";
String dur = "";
String flag = "";
String dateCallk = "";
//循環取出
for (Cell cell :cells){
// 取出行名稱
String lineName = new String(CellUtil.cloneQualifier(cell));
// 判斷打電話的人
if(lineName.equals("caller")){
caller = new String(CellUtil.cloneValue(cell));
}
// 接電話的人
if(lineName.equals("callee")){
callee = new String(CellUtil.cloneValue(cell));
}
// 判斷日期
if(lineName.equals("time")){
time = new String(CellUtil.cloneValue(cell));
}
// 判斷時長
if(lineName.equals("dur")){
dur = new String(CellUtil.cloneValue(cell));
}
// 判斷日期
if(lineName.equals("flag")){
flag = new String(CellUtil.cloneValue(cell));
}
//01_手機號_yyyMMddhhmmss_1
String[] split = rowkey.split("_");
//截取打電話的人的電話號碼
String phoneNum = split[1];
//拼接key
dateCallk = phoneNum + "_" + split[2].substring(0, 6);
//輸出到文件
}
//測試輸出內容
pp.setCaller(caller);
pp.setCallee(callee);
pp.setTime(time);
pp.setDur(dur);
pp.setFlag(flag);
//System.err.println("rowkey: " + rowkey + "-" +caller+ "-" +callee+ "-" + time + "-" +dur+ "-" +flag);
//String string = "rowkey: " + rowkey + "-" +caller+ "-" +callee+ "-" + time + "-" +dur+ "-" +flag;
//將數據寫入到mysql中
pDB = new phoneInfoDBWritable(pp);
context.write(pDB,null);
}
}
Driver配置分發任務
package MapReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
public class MRRunner {
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
//建立configuration
conf.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3");
conf.set("hbase.zookeeper.property.clientPort", "2181");
Job job = Job.getInstance(conf, "db store");
//實現與數據庫的鏈接
DBConfiguration.configureDB(job.getConfiguration(), "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/callphone", "root","root");
//將從HBase表中獲取的數據封裝寫入到數據庫表的格式
DBOutputFormat.setOutput(job, "phone", "caller", "callee", "time", "dur","flag");
//設置Driver
job.setJarByClass(MRRunner.class);
//設置數據輸出學出到mysql的類格式
job.setOutputFormatClass(DBOutputFormat.class);
//掃描HBase表
Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setCaching(500);
//設置Mapper
job.setMapperClass(CallMapper.class);
TableMapReduceUtil.initTableMapperJob(
"phone:log",
scan,
CallMapper.class,
phoneInfoDBWritable.class,
phoneInfoDBWritable.class,
job);
// 設置Reduce數量,沒有使用到Reducer
job.setNumReduceTasks(0);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
phoneInfo封裝讀取到的HBase
package MapReduce;
/**
* 構建phoneInfo類,將HBase表中的數據存儲到phoneInfo對象中
* 實現封裝數據
*/
public class phoneInfo{
private String caller;
private String callee;
private String time;
private String dur;
private String flag;
public String getCaller() {
return caller;
}
public void setCaller(String caller) {
this.caller = caller;
}
public String getCallee() {
return callee;
}
public void setCallee(String callee) {
this.callee = callee;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
public String getDur() {
return dur;
}
public void setDur(String dur) {
this.dur = dur;
}
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
}
phoneInfoDBWritable實現DBWritable用於存放phoneInfo對象
package MapReduce;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
/**
* 編寫phoneInfoDBWritable類實現DBWritable,完成HBase的數據寫入到指定的MySQL的序列化
*/
public class phoneInfoDBWritable implements DBWritable {
private phoneInfo phoneinfo;
public phoneInfoDBWritable() { }
public phoneInfoDBWritable(phoneInfo phoneinfo) {
this.phoneinfo = phoneinfo;
}
public void write(PreparedStatement statement) throws SQLException {
statement.setString(1, phoneinfo.getCaller());
statement.setString(2, phoneinfo.getCallee());
statement.setString(3, phoneinfo.getTime());
statement.setString(4, phoneinfo.getDur());
statement.setString(5, phoneinfo.getFlag());
}
public void readFields(ResultSet resultSet) throws SQLException {
}
}