上篇文章說了 Hbase 的基礎架構,都是比較理論的知識,最近我也一直在搞 Hbase 的數據遷移, 今天就來一篇實戰型的,把最近一段時間的 Hbase 整合 Hadoop 的基礎知識在梳理一遍,畢竟當初搞得時候仍是有點摸不着方向,寫下來也方便之後查閱。javascript
以前使用 Hbase 大可能是把它當作實時數據庫來作查詢使用的,大部分使用的都是 Hbase 的基礎 Api, Hbase 與 Hadoop Hive 框架的整合還真是沒系統的搞過,話很少說,先看看本文的架構圖:java
PS:文中提到的代碼見最後 參考資料git
着重點在前兩部分,後面的都是你們比較熟悉的部分了。github
1 Hbase 與 Hadoop 集成sql
Hbase 與 Hadoop 相關操做主要能夠分爲以下三種狀況:數據庫
一張 hbase 表數據導入另外一張 hbase 表apache
HDFS 數據導入 Hbase 表編程
HDFS 數據(超大數據)導入 Hbase 表swift
以上三種狀況的數據遷移基本都是依靠 MR 程序來完成的,因此重點又回到了 MR 編程。api
01 hbase表數據導入
思路:準備 MR 程序將一張 Hbase 表寫入到另外一張 Hbase 表便可。
注意:兩張 Hbase 表導入數據的列族信息要一致;有數據的 Hbase 在讀入數據時要注意非空判斷。
準備工做:
準備 user1 表 列族 爲 f1,f1 中有 age ,name屬性 ,做爲輸入表;
準備 user2 表,建立列族 f1,做爲輸出表。
主要代碼:
Mapper 端:這裏注意繼承的 是 TableMapper
1 public class HBaseReadMapper extends TableMapper<Text,Put> { 2 /** 3 * 4 * @param key rowkey 5 * @param value rowkey 此行的數據 Result 類型 6 * @param context 7 * @throws IOException 8 * @throws InterruptedException 9 */ 10 @Override 11 protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { 12 //得到rowkey 的字節數組 13 byte[] rowkey_bytes = key.get(); 14 String rowKeyStr = Bytes.toString(rowkey_bytes); 15 //準備好 put 對象 用於輸出下游 16 Put put = new Put(rowkey_bytes); 17 //text 做爲輸出的 key 18 Text text = new Text(rowKeyStr); 19 //輸出數據 - 寫數據 - 普通 構建put 對象 20 Cell[] cells = value.rawCells(); 21 //將 f1 : name & age 輸出 22 for (Cell cell : cells) { 23 //當前 cell是不是 f1 24 //獲取列族 25 byte[] family = CellUtil.cloneFamily(cell); 26 String familyStr = Bytes.toString(family); 27 28 if("f1".equals(familyStr)){ 29 //在判斷是不是 name | age 30 put.add(cell); 31 } 32 33 if("f2".equals(familyStr)){ 34 put.add(cell); 35 } 36 } 37 //注意非空判斷 否則會報錯 38 if(!put.isEmpty()){ 39 context.write(text,put); 40 } 41 42 } 43 }
Reduce 端 ,使用 TableReducer:
1 public class HbaseWriteReducer extends TableReducer<Text,Put,ImmutableBytesWritable> { 2 3 /** 4 * 將 map 傳過來的數據寫出去 5 * @param key 6 * @param values 7 * @param context 8 * @throws IOException 9 * @throws InterruptedException 10 */ 11 @Override 12 protected void reduce(Text key, Iterable<Put> values, Context context) throws IOException, InterruptedException { 13 //設置rowkey 14 ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(); 15 //設置rowkey 16 immutableBytesWritable.set(key.toString().getBytes()); 17 for (Put value : values) { 18 context.write(immutableBytesWritable,value); 19 } 20 } 21 }
啓動類,將 user1 中 f1 列族下 age,name數值寫入到 user2 中:
1 public class Hbase2HbaseMR extends Configured implements Tool { 2 3 public static void main(String[] args) throws Exception { 4 Configuration configuration = HBaseConfiguration.create(); 5 //設置 hbase 的zk地址 6 configuration.set("hbase.zookeeper.quorum","hadoop102:2181,hadoop103:2181,hadoop104:2181"); 7 int run = ToolRunner.run(configuration, new Hbase2HbaseMR(), args); 8 System.exit(run); 9 } 10 @Override 11 public int run(String[] strings) throws Exception { 12 Job job = Job.getInstance(super.getConf()); 13 job.setJarByClass(Hbase2HbaseMR.class); 14 //mapper 15 TableMapReduceUtil.initTableMapperJob(TableName.valueOf("user"),new Scan(), HBaseReadMapper.class,Text.class,Put.class,job); 16 //reducer 17 TableMapReduceUtil.initTableReducerJob("user2",HbaseWriteReducer.class,job); 18 boolean b = job.waitForCompletion(true); 19 20 return b?0:1; 21 } 22 }
02 HDFS 導入到Hbase
思路:準備 MR 程序將 HDFS 數據寫入到另外一張 Hbase 表便可。
注意:
讀入的是 Mapper 是 HDFS 操做,寫出的 Reduce 是 Hbase 操做;
HDFS 數據格式要與 Hbase 表對應
準備工做:
準備 HDFS 上數據 ;
準備 user2 表,建立列族 f1,做爲輸出表。
主要代碼:
Mapper 端,使用常規 Mapper
1 public class HdfsMapper extends Mapper<LongWritable,Text,Text,NullWritable>{ 2 3 /** 4 * HDFS -- Hbase 5 * 6 * @param key 7 * @param value 8 * @param context 9 * @throws IOException 10 * @throws InterruptedException 11 */ 12 @Override 13 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 14 //數據原樣輸出 15 context.write(value,NullWritable.get()); 16 } 17 }
Reduce 端,使用 TableReducer :
1 public static class HBASEReducer extends TableReducer<Text,NullWritable,ImmutableBytesWritable>{ 2 @Override 3 protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { 4 /** 5 * key --> 一行數據 6 * 樣例數據: 7 * 07 zhangsan 18 8 * 08 lisi 25 9 * 09 wangwu 20 10 * 11 */ 12 //按格式拆分 13 String[] split = key.toString().split("\t"); 14 //構建 put 對象 15 Put put = new Put(Bytes.toBytes(split[0])); 16 put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes()); 17 put.addColumn("f1".getBytes(),"age".getBytes(),split[2].getBytes()); 18 context.write(new ImmutableBytesWritable(split[0].getBytes()),put); 19 } 20 } 21
啓動類:
1 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 2 Configuration configuration = HBaseConfiguration.create(); 3 //設置 hbase zk 地址 4 configuration.set("hbase.zookeeper.quorum","hadoop102:2181,hadoop103:2181,hadoop104:2181"); 5 Job job = Job.getInstance(configuration); 6 job.setJarByClass(Hdfs2HbaseMR.class); 7 //輸入文件路徑 8 FileInputFormat.addInputPath(job,new Path("hdfs://hadoop102:9000/hbase/input")); 9 job.setMapperClass(HdfsMapper.class); 10 job.setMapOutputKeyClass(Text.class); 11 job.setMapOutputValueClass(NullWritable.class); 12 //指定輸出到 Hbase 的 表名 13 TableMapReduceUtil.initTableReducerJob("user2",HBASEReducer.class,job); 14 //設置 reduce 個數 15 job.setNumReduceTasks(1); 16 boolean b = job.waitForCompletion(true); 17 System.exit(b?0:1); 18 }
03 HDFS 大數據導入Hbase
思路:與 2 中的數據導入不一樣的是此次的數據量比較大,使用常規的 MR 可能耗時很是的長,而且一直佔用資源。
咱們能夠先將 Hadoop 上存儲的 HDFS 文件轉換成 HFile 文件,HFile 文件就是 Hbase 底層存儲的類型,轉換完成後,再將轉換好的 HFile 文件指定給對應的 Hbase 表便可。這就是 bulkload 的方式批量加載數據,大體流程以下:
注意:
因爲是文件類型轉換,不作計算操做,因此只須要讀入的 Mapper 操做,,不須要Reduce操做;
文件類型轉換後 還須要作 Hbase 表與 HFile 文件的映射
準備工做:
準備 HDFS 上數據 ;
準備 user2 表,建立列族 f1,做爲輸出表。
主要代碼:
Mapper 端,使用常規 Mapper
1 public class Hdfs2HFileMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put> { 2 @Override 3 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 4 String[] split = value.toString().split("\t"); 5 //封裝輸出類型 6 Put put = new Put(split[0].getBytes()); 7 put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes()); 8 put.addColumn("f1".getBytes(),"age".getBytes(),split[2].getBytes()); 9 // 將封裝好的put對象輸出,rowkey 使用 immutableBytesWritable 10 context.write(new ImmutableBytesWritable(Bytes.toBytes(split[0])),put); 11 } 12 }
啓動類:
1 /** 2 * 3 * 將HDFS文件寫成Hfile格式輸出 4 */ 5 public class Hdfs2HileOut extends Configured implements Tool { 6 7 public static void main(String[] args) throws Exception { 8 Configuration configuration = HBaseConfiguration.create(); 9 configuration.set("hbase.zookeeper.quorum","hadoop102:2181,hadoop103:2181,hadoop104:2181"); 10 int run = ToolRunner.run(configuration, new Hdfs2HileOut(), args); 11 System.exit(run); 12 } 13 @Override 14 public int run(String[] strings) throws Exception { 15 Configuration conf = super.getConf(); 16 Job job = Job.getInstance(conf); 17 job.setJarByClass(Hdfs2HileOut.class); 18 FileInputFormat.addInputPath(job,new Path("hdfs://hadoop102:9000/hbase/input")); 19 20 job.setMapperClass(Hdfs2HFileMapper.class); 21 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 22 job.setMapOutputValueClass(Put.class); 23 Connection connection = ConnectionFactory.createConnection(conf); 24 Table table = connection.getTable(TableName.valueOf("user2")); 25 //使MR能夠向user2表中,增量增長數據 26 HFileOutputFormat2.configureIncrementalLoad(job,table,connection.getRegionLocator(TableName.valueOf("user2"))); 27 //數據寫回到HDFS 寫成HFILE -》 因此指定輸出格式爲Hfile 28 job.setOutputFormatClass(HFileOutputFormat2.class); 29 //HFile 輸出的路徑,用於與表映射的輸入參數 30 HFileOutputFormat2.setOutputPath(job,new Path("hdfs://hadoop102:9000/hbase/out_hfile2")); 31 //開始執行 32 boolean b = job.waitForCompletion(true); 33 return b? 0: 1; 34 } 35 }
加載類:
public class LoadHFile2Hbase { public static void main(String[] args) throws Exception { Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum","hadoop102:2181,hadoop103:2181,hadoop104:2181"); //獲取數據庫鏈接 Connection connection = ConnectionFactory.createConnection(configuration); Table table = connection.getTable(TableName.valueOf("user2")); //構建 LoadIncrementalHfiles 加載 Hfile文件 LoadIncrementalHFiles loadIncrementalHFiles = new LoadIncrementalHFiles(configuration); // 加載上一步輸出的HFile 與表作映射 loadIncrementalHFiles.doBulkLoad(new Path("hdfs://hadoop102:9000/hbase/out_hfile2"),connection.getAdmin(),table,connection.getRegionLocator(TableName.valueOf("user2"))); } }
至此,HDFS 數據遷移至 Hbase 完成。
2 Hbase 與 Hive 集成
hbase 與 hive 相關的數據遷移工做分爲兩種:
hive 表結果 ---> hbase 表
hbase 表數據 ---> hive 表
這部分操做沒有代碼,在 hive 和 hbase 客戶端就能完成操做
01 準備工做
1 首先須要將 Hbase下的5個包拷貝到 hive lib 下,建議使用軟鏈接的形式:
ln -s /home/hadoop/module/hbase-1.2.0-cdh5.14.2/lib/hbase-client-1.2.0-cdh5.14.2.jar /home/hadoop/module/hive-1.1.0-cdh5.14.2/lib/hbase-client-1.2.0-cdh5.14.2.jar ln -s /home/hadoop/module/hbase-1.2.0-cdh5.14.2/lib/hbase-hadoop2-compat-1.2.0-cdh5.14.2.jar /home/hadoop/module/hive-1.1.0-cdh5.14.2/lib/hbase-hadoop2-compat-1.2.0-cdh5.14.2.jar ln -s home/hadoop/module/hbase-1.2.0-cdh5.14.2/lib/hbase-hadoop-compat-1.2.0-cdh5.14.2.jar /home/hadoop/module/hive-1.1.0-cdh5.14.2/lib/hbase-hadoop-compat-1.2.0-cdh5.14.2.jar ln -s home/hadoop/module/hbase-1.2.0-cdh5.14.2/lib/hbase-it-1.2.0-cdh5.14.2.jar /home/hadoop/module/hive-1.1.0-cdh5.14.2/lib/hbase-it-1.2.0-cdh5.14.2.jar ln -s home/hadoop/module/hbase-1.2.0-cdh5.14.2/lib/hbase-server-1.2.0-cdh5.14.2.jar /home/hadoop/module/hive-1.1.0-cdh5.14.2/lib/hbase-server-1.2.0-cdh5.14.2.jar
2 修改 Hive 的配置文件 hive-site.xml 添加本身的 zk 信息:
<property> <name>hive.zookeeper.quorum</name> <value>hadoop102,hadoop103,hadoop104</value> </property> <property> <name>hbase.zookeeper.quorum</name> <value>hadoop102,hadoop103,hadoop104</value> </property>
3 修改 Hive 的配置文件 hive-env.sh 添加以下信息:
export HADOOP_HOME=/kkb/install/hadoop-2.6.0-cdh5.14.2/ export HBASE_HOME=/kkb/install/servers/hbase-1.2.0-cdh5.14.2 export HIVE_CONF_DIR=/kkb/install/hive-1.1.0-cdh5.14.2/conf
至此 準備工做完成。
02 hive表導入hbase
hive 中建立管理表(內部表)與hbase 表完成映射則hive管理表的數據會添加到 hbase 表中 ,命令以下:
create table course.hbase_score(id int,cname string,score int) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties("hbase.columns.mapping" = "cf:name,cf:score") tblproperties("hbase.table.name" = "hbase_score");
從命令中能夠看出 hbase.table.name 是指的 hbase 表名,hbase.columns.mapping 則值的對應列族下的字段,而 hive 表的 id 則會做爲hbase表的 rowkey 進行存儲。
經過向內部表插入數據便可完成數據查詢結果的導入。
insert overwrite table course.hbase_score select id,cname,score from course.score;
最後查看 hbase 表便可看到數據。
03 hbase表導入hive
hbase 結果映射到 hive表比較簡單,建立 hive 外部表便可:
CREATE external TABLE hbase2hive(id int, name string, score int) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:name,cf:score") TBLPROPERTIES("hbase.table.name" ="hbase_hive_score");
從命令中能夠看出 hbase.table.name 是指的 hbase 表名,hbase.columns.mapping 的值則對應hive表的字段,而 hive 表的 id 則會做取 hbase表的 rowkey 進行存儲。
至此,Hbase 與 Hive 的數據遷移就完成了。
3 Hbase 協處理器和基礎 api
關於基礎api這部分比較詳細的介紹就在代碼中了,再此咱們就簡單說一下Hbase 協處理器。
協處理器是爲了解決Hbase早期版本的一些問題,如創建二次索引、複雜過濾器、求和計數分組計數等類sql操做以及訪問控制等。
Hbase 提供兩類協處理器:
協處理器的加載方式
01 靜態加載實現
經過修改 hbase-site.xml 這個文件來實現, 如啓動全局 aggregation,能過操縱全部的表數據。只須要在hbase-site.xml裏面添加如下配置便可,修改完配置以後須要重啓HBase集羣。
<property> <name>hbase.coprocessor.user.region.classes</name> <value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value> </property>
爲全部table加載了一個 cp class,能夠用」 ,」分割加載多個 class。
02 動態加載實現
啓用表aggregation,只對特定的表生效。
下面以協處理器 observer 爲例來簡單說下操做過程:
1 建立 兩張 hbase 表,user1 ,user2:
create 'user1','info; create 'user2','info';
2 協處理器代碼開發,完成往 user1 表插入數據時,先往 user2 表插入數據,代碼以下:
1 public class MyProcessor extends BaseRegionObserver { 2 @Override 3 public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { 4 //獲取鏈接 5 Configuration configuration = HBaseConfiguration.create(); 6 configuration.set("hbase.zookeeper.quorum","hadoop102:2181,hadoop103:2181:hadoop104:2181"); 7 Connection connection = ConnectionFactory.createConnection(configuration); 8 //涉及多個版本得問題 9 List<Cell> cells = put.get("info".getBytes(), "name".getBytes()); 10 //將user1表的name 數據也插入到 user2 中 11 Cell nameCell = cells.get(0); 12 Put put1 = new Put(put.getRow()); 13 put1.add(nameCell); 14 Table table = connection.getTable(TableName.valueOf("user2")); 15 table.put(put1); 16 table.close(); 17 connection.close(); 18 } 19 }
3 將開發好的項目打包上傳到 HDFS ,路徑自定,假設是:
hdfs://hadoop102:9000/processor/processor.jar
4 將 jar 包掛載到 user1 表:
disable 'user1'; alter 'user1',METHOD => 'table_att','Coprocessor'=>'hdfs://hadoop102:9000/processor/processor.jar|com.bigdata.comprocessor.MyProcessor|1001|'; enabled 'user1';
com.bigdata.comprocessor.MyProcessor : 你程序的全類名;
1001 :協處理器編號,自定義便可,表中協處理器的編號不能重複。
5 測試向 user1 中插入數據,user2 是否有數據:
1 public class TestObserver { 2 3 @Test 4 public void testPut() throws IOException { 5 6 //獲取鏈接 7 Configuration configuration = HBaseConfiguration.create(); 8 configuration.set("hbase.zookeeper.quorum", "hadoop102:2181,hadoop103:2181,hadoop104:2181"); 9 //建立鏈接對象 10 Connection connection = ConnectionFactory.createConnection(configuration); 11 Table proc1 = connection.getTable(TableName.valueOf("user1")); 12 Put put = new Put("1110001112".getBytes()); 13 14 put.addColumn("info".getBytes(),"name".getBytes(),"hello".getBytes()); 15 put.addColumn("info".getBytes(),"gender".getBytes(),"male".getBytes()); 16 put.addColumn("info".getBytes(),"nationality".getBytes(),"test".getBytes()); 17 proc1.put(put); 18 proc1.close(); 19 connection.close(); 20 System.out.println("success"); 21 22 } 23 }
關於協處理器卸載:
disable 'user1' alter 'user1',METHOD=>'table_att_unset',NAME=>'coprocessor$1' enable 'user1'
協處理器 observer 大體開發流程就是這樣的。關於基礎 api 放在參考資料的項目中了。
至此,還留有一個問題就是 hbase 的 endpoint 協處理器,其實它解決的問題及時實現 min、 max、 avg、 sum、 distinct、 group by 等sql功能,這個問題咱們放在下期,下期介紹一個基於 hbase 框架之上的框架 -- phoenix,Phoenix之於 Hbase ,就像 hive 之於 Hadoop,會完美的實現 hbase 的 sql 查詢操做。
項目代碼地址: https://github.com/fanpengyi/hbase-api
-- THE END --