Hbase客戶端編程(Eclipse)

Hadoop 踩坑記(四)java

Hbase客戶端編程(Eclipse)shell

環境

關於 Hbase 的安裝與配置以及 Eclipse 的配置請參考前兩篇文章apache

本系列選用的 hbase 版本爲 1.4.13編程

本系列選用的 hadoop 版本爲 2.8.5服務器

請注意包名、服務器等個性化配置app

引入jar包

須要將 Hbase 中與客戶端相關的 jar 包引入到 Build Pathide

理論上只須要將 org.apache.hadoop.hbase.* 相關包引入便可,但實際操做中仍是遇到了缺乏狀況,所以將 hbase 的 lib 目錄下的全部 jar 包都引入了函數

這種粗暴的方法源於本人對於 java 開發的瞭解太少,經驗豐富的朋友應該能夠按需引入。oop

表的建立與刪除

一個示例的 Student 表結構以下圖所示ui

表1

代碼以下

package wit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
public class HBaseTest {
//聲明靜態配置 HBaseConfiguration
   static Configuration cfg=HBaseConfiguration.create();
    //建立學生表
   public static void createStdTable() throws Exception {
   cfg.set("hbase.zookeeper.quorum","hadoop1-ali,hadoop2-hw");
      //數據表名
    String tablename="Student";
    //列簇名列表
    String[] columnFamilys= new String[] {"Std","Course"};
    //創建鏈接
    Connection con = ConnectionFactory.createConnection(cfg);
    //得到Admin對象
    Admin admin = con.getAdmin();
    //得到表對象
    TableName tName  = TableName.valueOf(tablename);
    //判斷表是否存在
        if (admin.tableExists(tName)) {
            System.out.println("table Exists!");
            System.exit(0);
        }
        else{
        HTableDescriptor tableDesc = new HTableDescriptor(tName);
        //添加列簇
        for(String cf:columnFamilys)
        {
          HColumnDescriptor cfDesc = new HColumnDescriptor(cf);
          if(cf.equals("Course"))//設置課程的最大歷史版本
            cfDesc.setMaxVersions(3);
          tableDesc.addFamily(cfDesc);
        }
        //建立表
            admin.createTable(tableDesc);
            System.out.println("create table success!");
        }
        admin.close();
        con.close();
    }
   public static void  main (String [] agrs) throws Throwable {
      try {
        createStdTable();
      }
      catch (Exception e) {
        e.printStackTrace();
      }
  }

}

其中

cfg.set("hbase.zookeeper.quorum","hadoop1-ali,hadoop2-hw");

導入了 hbase-site.xml 文件中的服務器配置,理論上應該要將該文件引用爲項目配置,但本人水平所限,沒有作到,所以採用代碼中手動配置的方法,後面的代碼中也同樣採用了此種方式。

運行上述代碼,獲得 create table success! 的提示即爲建立成功

刪除表的代碼以下

package wit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;

public class Delete {
//聲明靜態配置 HBaseConfiguration
   static Configuration cfg=HBaseConfiguration.create();
   public static void DeleteTable() throws Exception{
   cfg.set("hbase.zookeeper.quorum","hadoop1-ali,hadoop2-hw");
   Connection con = ConnectionFactory.createConnection(cfg);
   //得到表對象
   TableName tablename = TableName.valueOf("Student");
    //得到Admin對象
   Admin admin = con.getAdmin();
   if(admin.tableExists(tablename)){
     try
       {
        admin.disableTable(tablename);
        admin.deleteTable(tablename);
       }catch(Exception ex){
         ex.printStackTrace();
       }
    }
 }
   public static void  main (String [] agrs) throws Throwable {
    try {
      DeleteTable();
    }
    catch (Exception e) {
      e.printStackTrace();
    }
  }
}

表模式的修改

增長新的列

下面的代碼會在表中新增列 Test

public static void  AddStdColFamily () throws Throwable {
    Connection con = ConnectionFactory.createConnection(cfg);
    //得到表對象
    TableName tablename = TableName.valueOf("Student");
    //得到Admin對象
    Admin admin = con.getAdmin();
    HColumnDescriptor newCol = new HColumnDescriptor("Test");
    newCol.setMaxVersions(3);
    if(admin.tableExists(tablename)){
      try
        {
         admin.disableTable(tablename);
         admin.addColumn(tablename, newCol);
        }catch(Exception ex){
          ex.printStackTrace();
       }
    }
    admin.enableTable(tablename);
    admin.close();
    con.close();
}

修改列簇屬性

修改 Test 列簇的最大歷史版本數爲 5

public static void  ModifyStdColFamily () throws Throwable {
  Connection con = ConnectionFactory.createConnection(cfg);
  //得到表對象
  TableName tablename = TableName.valueOf("Student");
  //得到Admin對象
   Admin admin = con.getAdmin();
   HColumnDescriptor modCol = new HColumnDescriptor("Test");
   modCol.setMaxVersions(5);
   if(admin.tableExists(tablename)){
     try
     {
       admin.disableTable(tablename);
       admin.modifyColumn(tablename, modCol);
      }catch(Exception ex){
          ex.printStackTrace();
      }
     }
      admin.enableTable(tablename);
      admin.close();
      con.close();
}

刪除列

刪除 Test 列

public static void  DeleteStdColFamily() throws Throwable {
   Connection con = ConnectionFactory.createConnection(cfg);
   //得到表對象
   TableName tablename = TableName.valueOf("Student");
   //得到Admin對象
   Admin admin = con.getAdmin();
 if(admin.tableExists(tablename)){
     try
     {
       admin.disableTable(tablename);
       admin.deleteColumn(tablename, Bytes.toBytes("Test"));
      }catch(Exception ex){
         ex.printStackTrace();
       }
    }
    admin.enableTable(tablename);
    admin.close();
    con.close();
}

在表中插入和修改數據(略)

與MapReduce集成

讀取 hdfs 文件後寫入數據表

hdfs 文件 Std.txt 內容爲

200215125, Jim, Male, 2008-12-09, CS, 89, 78, 56
200215126, Marry, Female, 2001-2-09, AI , 79, 72, 66
200215127, Marker, Male, 2003-12-19, CE, 78, 48, 36

這裏須要注意的是,此文件不能有空行,不然讀取數據時會報錯

代碼以下,Map 過程讀取 Std.txt 文件中的每一行,而後將學生的學號設置爲 key,學生的其它信息設置爲 value 後,寫出到中間結果。Reduce 過程負責將 Map 造成的中間結果寫入到 HBase 的 Student 表中,故 Reduce 繼承至TableReducer,在 main 函數中使用

package wit;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class StdHdfsToHBase {
  public static class HDFSMap extends Mapper<Object, Text, Text, Text> {
      //實現map函數,讀取hdfs上的std.txt文件
    public void map(Object key, Text value, Context context)
    throws IOException, InterruptedException
{
    //取出學生的學號爲rowKey
    String stdRowKey = value.toString().split(",")[0];
     System.out.println(stdRowKey);
    //學號後面的學生信息爲value
    String stdInfo =
value.toString().substring(stdRowKey.length()+1);
     System.out.println(stdInfo);
    context.write(new Text(stdRowKey), new Text(stdInfo));
}
   }
  public static class HDFSReducer extends TableReducer<Text, Text,
ImmutableBytesWritable>{
     @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
    throws IOException, InterruptedException {
        Put put = new Put(key.getBytes());
        for (Text val : values) {
         String[] stdInfo = val.toString().split(",");
             put.addColumn("Std".getBytes(), "Name".getBytes(), 
stdInfo[0].getBytes());
             put.addColumn("Std".getBytes(), "gender".getBytes(), 
stdInfo[1].getBytes());
             put.addColumn("Std".getBytes(), "birth".getBytes(), 
stdInfo[2].getBytes());
             put.addColumn("Std".getBytes(), "dept".getBytes(), 
stdInfo[3].getBytes());
             put.addColumn("Course".getBytes(), "math".getBytes(), 
Bytes.toBytes(Long.parseLong(stdInfo[4])));
             put.addColumn("Course".getBytes(), "arts".getBytes(), 
Bytes.toBytes(Long.parseLong(stdInfo[5])));
             put.addColumn("Course".getBytes(), "phy".getBytes(), 
Bytes.toBytes(Long.parseLong(stdInfo[6])));
             //寫入學生信息到HBase表
           context.write(new ImmutableBytesWritable(key.getBytes()), put); 
          }
    }
}
  public static void main(String[] args) throws
    IOException, ClassNotFoundException, InterruptedException {
      Configuration conf = HBaseConfiguration.create();
      conf.set("hbase.zookeeper.quorum", "hadoop1-ali,hadoop2-hw");
      Job job = Job.getInstance(conf, "StdHdfsToHBase");
      job.setJarByClass(StdHdfsToHBase.class);
      // 設置Map
      job.setMapperClass(HDFSMap.class); 
      job.setMapOutputKeyClass(Text.class);
          job.setMapOutputValueClass(Text.class);
          //設置Reducer
      TableMapReduceUtil.initTableReducerJob("Student",
      HDFSReducer.class, job);
          job.setOutputKeyClass(ImmutableBytesWritable.class);
          job.setOutputValueClass(Put.class);
      //設置std.txt的輸入目錄
      FileInputFormat.addInputPath(job, new
      Path("hdfs://hadoop1-ali:9000/input/std"));
      System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

原文來自 陳十一的博客

相關文章
相關標籤/搜索