對數據庫HBASE的操做有shell端和java API兩種方式。
在此以前要先說一下HBASE的結構及其數據存儲結構:
HBASE是基於HDFS的,是一種NoSQL的數據庫。它的數據模型以下所示:
Row Key |
Timestamp |
Column Family |
URI |
Parser |
r1 |
t3 |
url=http://www.taobao.com |
title=每天特價 |
t2 |
host=taobao.com |
|
t1 |
|
|
r2 |
t5 |
url=http://www.alibaba.com |
content=天天… |
t4 |
host=alibaba.com |
|
RowKey:行鍵,也是表的主鍵。表中的記錄是按照Row Key排序。
Timestamp:時間戳,每次數據操做對應的時間戳,能夠看做數據的version number。
ColumnFamily:列簇,Table在水平方向有一個或者多個Column Family組成,一個Column Family中能夠由任意多個Column組成,即Column Family支持動態擴展,無需預先定義Column的數量以及類型,全部Column均以二進制格式存儲,用戶須要自行進行類型轉換。
shell中輸入 hbase shell進入hbase操做,假設這樣的person表:
rowid,username,userid,birth,phone,sex
建立該表須要使用:
create 'person','username','userid','birth','phone','sex'
第一個是表名,後面都是屬性,rowkey列不須要本身建立和設定列名。
ps,hbase shell 中,都沒有分號作結尾。並且表名列名基本都加引號,這和SQL不同。
加入單個數據,咱們使用put命令。注意put命令的操做,只能一行一列的單個值去添加,不能一次加一行:
put 'person','1','username','Aran'
表示person表中行號(row key)爲1列名爲username的值爲Aran。
若是清空表,須要:
若是刪除表,須要:
disable 'person'
drop 'person'
Hbase中的查詢分爲兩種:以rowkey方式的行查詢,和以值爲基礎遍歷.
rowkey方式:
值查詢:
scan 'person',FILTER=>"ValueFilter(=,'binary:1992-2-12')"
Hbase支持cvs格式批量導入:
相似這樣格式的文件
1,User1,8237764069450,2001-8-19,682318616,1
能夠在shell中(不是hbase shell中)導入:
hbase org.apache.hadoop.hbase.mapreduce.ImportTsv-Dimporttsv.separator=","-Dimporttsv.columns=HBASE_ROW_KEY,username,userid,birth,phone,sex person hdfs://192.168.70.28:9000/dataImport/HbaseTable-1.cvs
分割符爲,而且,導入不用寫rowkey,默認第一個就是rowkey。
就介紹這麼多。詳細的請參考:
對於開發者而言,經過shell去操做數據仍是不多見。咱們能夠使用JAVA API。
新建Java項目,添加的包有Hadoop下的Hadoop-core和Hbase/lib/下的全部jar。
只寫了查詢相關的Func,直接上代碼:
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
publicclassHbaseAPI{
privatestaticConfiguration conf =null;
static{
conf =HBaseConfiguration.create();
conf.set("hbase.zookeeper.property.clientPort","2181");
conf.set("hbase.zookeeper.quorum","192.168.70.29");
conf.set("hbase.master","192.168.70.28:60000");
//timeout time
conf.setLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,1200000);
}
//query by rowkey
publicstaticvoidQueryByrowKey(String tableName,String rowKey){
try{
HTable table =newHTable(conf,tableName);
Get g =newGet(rowKey.getBytes());
long start =System.currentTimeMillis();
Result r = table.get(g);
/*System.out.println("column:rowkey"
+ "====value:" + new String(r.getRow()));
for (KeyValue keyValue : r.raw()) {
System.out.println("column:" + new String(keyValue.getFamily())
+ "====value:" + new String(keyValue.getValue()));
}*/
long end = System.currentTimeMillis();
System.out.println(end-start);
}catch(IOException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//query by condition
publicstaticvoidQueryByCondition(String tableName,String columnName,String columnValue){
try{
HTable table =newHTable(conf,tableName);
Filter filter =newSingleColumnValueFilter(columnName.getBytes(),null,CompareOp.EQUAL,columnValue.getBytes());
Scan s =newScan();
s.setFilter(filter);
long start =System.currentTimeMillis();
ResultScanner rs = table.getScanner(s);
int hang =0;
for(Result r : rs){
/*System.out.println(new String(r.getRow()));
System.out.println("column:rowkey"
+ "====value:" + new String(r.getRow()));
for (KeyValue keyValue : r.raw()) {
System.out.println("column:" + new String(keyValue.getFamily())
+ "====value:" + new String(keyValue.getValue()));
}*/
hang++;
}
long end =System.currentTimeMillis();
System.out.println("rownum:"+hang);
System.out.println(end-start);
}catch(IOException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
publicstaticvoid main(String[] args){
//HbaseAPI.QueryByrowKey("person1000", "12307999");
//HbaseAPI.QueryByCondition("person1000", "username", "User2312397");
HbaseAPI.QueryByCondition("person1000","sex","0");
}
}
特別說下conf.setLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, 1200000)表示的是每次訪問最大時長,超過這個時長自動終止。
上文中的QueryByCondition並不許確,SingleColumnValueFilter的四個參數分別是列簇名、列名、過濾條件,過濾值。因爲上文中的Hbase表每一個列都是列簇,因此沒有問題,現提供同時有列簇和列的查詢:
publicstaticvoidQueryByCondition(String tableName,String familyName,String columnName,String columnValue){
try{
HTable table =newHTable(conf,tableName);
Filter filter =newSingleColumnValueFilter(familyName.getBytes(),columnName.getBytes(),CompareOp.EQUAL,columnValue.getBytes());
Scan s =newScan();
s.setCaching(1000);
s.setFilter(filter);
long start =System.currentTimeMillis();
ResultScanner rs = table.getScanner(s);
int hang =0;
for(Result r : rs){
System.out.println(newString(r.getRow()));
System.out.println("column:rowkey"
+"====value:"+newString(r.getRow()));
for(KeyValue keyValue : r.raw()){
System.out.println("column:"+newString(keyValue.getFamily())+":"+newString(keyValue.getQualifier())
+"====value:"+newString(keyValue.getValue()));
}
hang++;
}
long end =System.currentTimeMillis();
System.out.println("rownum:"+hang);
System.out.println(end-start);
}catch(IOException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
代碼中有一個s.setCaching(),這涉及到了Hbase 的Scanner Caching功能:
base.client.scanner.caching配置項能夠設置HBase scanner一次從服務端抓取的數據條數,默認狀況下一次一條。經過將其設置成一個合理的值,能夠減小scan過程當中next()的時間開銷,代價是scanner須要經過客戶端的內存來維持這些被cache的行記錄。
有三個地方能夠進行配置:1)在HBase的conf配置文件中進行配置;2)經過調用HTable.setScannerCaching(int scannerCaching)進行配置;3)經過調用Scan.setCaching(int caching)進行配置。三者的優先級愈來愈高。
參考文件: