Hadoop 踩坑記(四)java
Hbase客戶端編程(Eclipse)shell
關於 Hbase 的安裝與配置以及 Eclipse 的配置請參考前兩篇文章apache
本系列選用的 hbase 版本爲 1.4.13編程
本系列選用的 hadoop 版本爲 2.8.5服務器
請注意包名、服務器等個性化配置app
須要將 Hbase 中與客戶端相關的 jar 包引入到 Build Pathide
理論上只須要將 org.apache.hadoop.hbase.*
相關包引入便可,但實際操做中仍是遇到了缺乏狀況,所以將 hbase 的 lib
目錄下的全部 jar 包都引入了函數
這種粗暴的方法源於本人對於 java 開發的瞭解太少,經驗豐富的朋友應該能夠按需引入。oop
一個示例的 Student 表結構以下圖所示ui
代碼以下
package wit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; public class HBaseTest { //聲明靜態配置 HBaseConfiguration static Configuration cfg=HBaseConfiguration.create(); //建立學生表 public static void createStdTable() throws Exception { cfg.set("hbase.zookeeper.quorum","hadoop1-ali,hadoop2-hw"); //數據表名 String tablename="Student"; //列簇名列表 String[] columnFamilys= new String[] {"Std","Course"}; //創建鏈接 Connection con = ConnectionFactory.createConnection(cfg); //得到Admin對象 Admin admin = con.getAdmin(); //得到表對象 TableName tName = TableName.valueOf(tablename); //判斷表是否存在 if (admin.tableExists(tName)) { System.out.println("table Exists!"); System.exit(0); } else{ HTableDescriptor tableDesc = new HTableDescriptor(tName); //添加列簇 for(String cf:columnFamilys) { HColumnDescriptor cfDesc = new HColumnDescriptor(cf); if(cf.equals("Course"))//設置課程的最大歷史版本 cfDesc.setMaxVersions(3); tableDesc.addFamily(cfDesc); } //建立表 admin.createTable(tableDesc); System.out.println("create table success!"); } admin.close(); con.close(); } public static void main (String [] agrs) throws Throwable { try { createStdTable(); } catch (Exception e) { e.printStackTrace(); } } }
其中
cfg.set("hbase.zookeeper.quorum","hadoop1-ali,hadoop2-hw");
導入了 hbase-site.xml
文件中的服務器配置,理論上應該要將該文件引用爲項目配置,但本人水平所限,沒有作到,所以採用代碼中手動配置的方法,後面的代碼中也同樣採用了此種方式。
運行上述代碼,獲得 create table success!
的提示即爲建立成功
刪除表的代碼以下
package wit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; public class Delete { //聲明靜態配置 HBaseConfiguration static Configuration cfg=HBaseConfiguration.create(); public static void DeleteTable() throws Exception{ cfg.set("hbase.zookeeper.quorum","hadoop1-ali,hadoop2-hw"); Connection con = ConnectionFactory.createConnection(cfg); //得到表對象 TableName tablename = TableName.valueOf("Student"); //得到Admin對象 Admin admin = con.getAdmin(); if(admin.tableExists(tablename)){ try { admin.disableTable(tablename); admin.deleteTable(tablename); }catch(Exception ex){ ex.printStackTrace(); } } } public static void main (String [] agrs) throws Throwable { try { DeleteTable(); } catch (Exception e) { e.printStackTrace(); } } }
下面的代碼會在表中新增列 Test
public static void AddStdColFamily () throws Throwable { Connection con = ConnectionFactory.createConnection(cfg); //得到表對象 TableName tablename = TableName.valueOf("Student"); //得到Admin對象 Admin admin = con.getAdmin(); HColumnDescriptor newCol = new HColumnDescriptor("Test"); newCol.setMaxVersions(3); if(admin.tableExists(tablename)){ try { admin.disableTable(tablename); admin.addColumn(tablename, newCol); }catch(Exception ex){ ex.printStackTrace(); } } admin.enableTable(tablename); admin.close(); con.close(); }
修改 Test 列簇的最大歷史版本數爲 5
public static void ModifyStdColFamily () throws Throwable { Connection con = ConnectionFactory.createConnection(cfg); //得到表對象 TableName tablename = TableName.valueOf("Student"); //得到Admin對象 Admin admin = con.getAdmin(); HColumnDescriptor modCol = new HColumnDescriptor("Test"); modCol.setMaxVersions(5); if(admin.tableExists(tablename)){ try { admin.disableTable(tablename); admin.modifyColumn(tablename, modCol); }catch(Exception ex){ ex.printStackTrace(); } } admin.enableTable(tablename); admin.close(); con.close(); }
刪除 Test 列
public static void DeleteStdColFamily() throws Throwable { Connection con = ConnectionFactory.createConnection(cfg); //得到表對象 TableName tablename = TableName.valueOf("Student"); //得到Admin對象 Admin admin = con.getAdmin(); if(admin.tableExists(tablename)){ try { admin.disableTable(tablename); admin.deleteColumn(tablename, Bytes.toBytes("Test")); }catch(Exception ex){ ex.printStackTrace(); } } admin.enableTable(tablename); admin.close(); con.close(); }
讀取 hdfs 文件後寫入數據表
hdfs 文件 Std.txt 內容爲
200215125, Jim, Male, 2008-12-09, CS, 89, 78, 56 200215126, Marry, Female, 2001-2-09, AI , 79, 72, 66 200215127, Marker, Male, 2003-12-19, CE, 78, 48, 36
這裏須要注意的是,此文件不能有空行,不然讀取數據時會報錯
代碼以下,Map 過程讀取 Std.txt
文件中的每一行,而後將學生的學號設置爲 key,學生的其它信息設置爲 value 後,寫出到中間結果。Reduce 過程負責將 Map 造成的中間結果寫入到 HBase 的 Student 表中,故 Reduce 繼承至TableReducer,在 main 函數中使用
package wit; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class StdHdfsToHBase { public static class HDFSMap extends Mapper<Object, Text, Text, Text> { //實現map函數,讀取hdfs上的std.txt文件 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { //取出學生的學號爲rowKey String stdRowKey = value.toString().split(",")[0]; System.out.println(stdRowKey); //學號後面的學生信息爲value String stdInfo = value.toString().substring(stdRowKey.length()+1); System.out.println(stdInfo); context.write(new Text(stdRowKey), new Text(stdInfo)); } } public static class HDFSReducer extends TableReducer<Text, Text, ImmutableBytesWritable>{ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Put put = new Put(key.getBytes()); for (Text val : values) { String[] stdInfo = val.toString().split(","); put.addColumn("Std".getBytes(), "Name".getBytes(), stdInfo[0].getBytes()); put.addColumn("Std".getBytes(), "gender".getBytes(), stdInfo[1].getBytes()); put.addColumn("Std".getBytes(), "birth".getBytes(), stdInfo[2].getBytes()); put.addColumn("Std".getBytes(), "dept".getBytes(), stdInfo[3].getBytes()); put.addColumn("Course".getBytes(), "math".getBytes(), Bytes.toBytes(Long.parseLong(stdInfo[4]))); put.addColumn("Course".getBytes(), "arts".getBytes(), Bytes.toBytes(Long.parseLong(stdInfo[5]))); put.addColumn("Course".getBytes(), "phy".getBytes(), Bytes.toBytes(Long.parseLong(stdInfo[6]))); //寫入學生信息到HBase表 context.write(new ImmutableBytesWritable(key.getBytes()), put); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "hadoop1-ali,hadoop2-hw"); Job job = Job.getInstance(conf, "StdHdfsToHBase"); job.setJarByClass(StdHdfsToHBase.class); // 設置Map job.setMapperClass(HDFSMap.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //設置Reducer TableMapReduceUtil.initTableReducerJob("Student", HDFSReducer.class, job); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Put.class); //設置std.txt的輸入目錄 FileInputFormat.addInputPath(job, new Path("hdfs://hadoop1-ali:9000/input/std")); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
原文來自 陳十一的博客