Hbase 整合 Hadoop 的數據遷移

上篇文章說了 Hbase 的基礎架構,都是比較理論的知識,最近我也一直在搞 Hbase 的數據遷移, 今天就來一篇實戰型的,把最近一段時間的 Hbase 整合 Hadoop 的基礎知識在梳理一遍,畢竟當初搞得時候仍是有點摸不着方向,寫下來也方便之後查閱。javascript

以前使用 Hbase 大可能是把它當作實時數據庫來作查詢使用的,大部分使用的都是 Hbase 的基礎 Api, Hbase 與 Hadoop Hive 框架的整合還真是沒系統的搞過,話很少說,先看看本文的架構圖:java

 

PS:文中提到的代碼見最後 參考資料git

 

着重點在前兩部分,後面的都是你們比較熟悉的部分了。github

 


 

 

1  Hbase 與 Hadoop 集成sql

Hbase 與 Hadoop 相關操做主要能夠分爲以下三種狀況:數據庫

 

  • 一張 hbase 表數據導入另外一張 hbase 表apache

     

  • HDFS 數據導入 Hbase 表編程

     

  • HDFS 數據(超大數據)導入 Hbase 表swift

 

以上三種狀況的數據遷移基本都是依靠 MR 程序來完成的,因此重點又回到了 MR 編程。api

 

0hbase表數據導入

 

思路:準備 MR 程序將一張 Hbase 表寫入到另外一張 Hbase 表便可。

 

注意:兩張 Hbase 表導入數據的列族信息要一致;有數據的 Hbase 在讀入數據時要注意非空判斷。

 

準備工做:


準備 user1 表 列族 爲 f1,f1 中有 age ,name屬性 ,做爲輸入表;

準備 user2 表,建立列族 f1,做爲輸出表。

 

主要代碼:

 

Mapper 端:這裏注意繼承的 是 TableMapper

 

 1 public class HBaseReadMapper extends TableMapper<Text,Put> {
 2         /**
 3          *
 4          * @param key rowkey
 5          * @param value rowkey 此行的數據  Result 類型
 6          * @param context
 7          * @throws IOException
 8          * @throws InterruptedException
 9          */
10         @Override
11         protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
12             //得到rowkey 的字節數組
13             byte[] rowkey_bytes = key.get();
14             String rowKeyStr = Bytes.toString(rowkey_bytes);
15             //準備好 put 對象 用於輸出下游
16             Put put = new Put(rowkey_bytes);
17             //text 做爲輸出的 key
18             Text text = new Text(rowKeyStr);
19             //輸出數據 - 寫數據 - 普通 構建put 對象
20             Cell[] cells = value.rawCells();
21             //將 f1 : name & age 輸出
22             for (Cell cell : cells) {
23                 //當前 cell是不是 f1
24                 //獲取列族
25                 byte[] family = CellUtil.cloneFamily(cell);
26                 String familyStr = Bytes.toString(family);
27 28                 if("f1".equals(familyStr)){
29                     //在判斷是不是 name | age
30                     put.add(cell);
31                 }
32 33                 if("f2".equals(familyStr)){
34                     put.add(cell);
35                 }
36             }
37           //注意非空判斷 否則會報錯
38             if(!put.isEmpty()){
39                 context.write(text,put);
40             }
41 42         }
43     }

 

Reduce 端 ,使用 TableReducer:

 

 1 public class HbaseWriteReducer extends TableReducer<Text,Put,ImmutableBytesWritable> {
 2  3     /**
 4      * 將 map 傳過來的數據寫出去
 5      * @param key
 6      * @param values
 7      * @param context
 8      * @throws IOException
 9      * @throws InterruptedException
10      */
11     @Override
12     protected void reduce(Text key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
13         //設置rowkey
14         ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable();
15         //設置rowkey
16         immutableBytesWritable.set(key.toString().getBytes());
17         for (Put value : values) {
18             context.write(immutableBytesWritable,value);
19         }
20     }
21 }
 

啓動類,將 user1 中 f1 列族下 age,name數值寫入到 user2 中:

 1 public class Hbase2HbaseMR extends Configured implements Tool {
 2  3    public static void main(String[] args) throws Exception {
 4        Configuration configuration = HBaseConfiguration.create();
 5        //設置 hbase 的zk地址
 6        configuration.set("hbase.zookeeper.quorum","hadoop102:2181,hadoop103:2181,hadoop104:2181");
 7        int run = ToolRunner.run(configuration, new Hbase2HbaseMR(), args);
 8        System.exit(run);
 9    }
10     @Override
11     public int run(String[] strings) throws Exception {
12         Job job = Job.getInstance(super.getConf());
13         job.setJarByClass(Hbase2HbaseMR.class);
14         //mapper
15         TableMapReduceUtil.initTableMapperJob(TableName.valueOf("user"),new Scan(), HBaseReadMapper.class,Text.class,Put.class,job);
16         //reducer
17         TableMapReduceUtil.initTableReducerJob("user2",HbaseWriteReducer.class,job);
18         boolean b = job.waitForCompletion(true);
19 20         return b?0:1;
21     }
22 }

 

 

0HDFS 導入到Hbase

 

思路:準備 MR 程序將 HDFS 數據寫入到另外一張 Hbase 表便可。

 

注意:

 

讀入的是 Mapper 是 HDFS 操做,寫出的 Reduce 是 Hbase 操做;

HDFS 數據格式要與 Hbase 表對應

 

準備工做:


準備 HDFS 上數據 ;

準備 user2 表,建立列族 f1,做爲輸出表。

 

主要代碼:

 

Mapper 端,使用常規 Mapper

 

 1 public class HdfsMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
 2  3         /**
 4          * HDFS -- Hbase
 5          *
 6          * @param key
 7          * @param value
 8          * @param context
 9          * @throws IOException
10          * @throws InterruptedException
11          */
12         @Override
13         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
14             //數據原樣輸出
15             context.write(value,NullWritable.get());
16         }
17     }

 

 

Reduce 端,使用 TableReducer :

 

 1 public static class HBASEReducer extends TableReducer<Text,NullWritable,ImmutableBytesWritable>{
 2         @Override
 3         protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
 4             /**
 5              * key --> 一行數據
 6              * 樣例數據:
 7              * 07 zhangsan 18
 8              * 08 lisi 25
 9              * 09 wangwu 20
10              *
11              */
12             //按格式拆分
13             String[] split = key.toString().split("\t");
14             //構建 put 對象
15             Put put = new Put(Bytes.toBytes(split[0]));
16             put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes());
17             put.addColumn("f1".getBytes(),"age".getBytes(),split[2].getBytes());
18             context.write(new ImmutableBytesWritable(split[0].getBytes()),put);
19         }
20     }
21  

 

 

啓動類:

 

 1 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
 2             Configuration configuration = HBaseConfiguration.create();
 3             //設置 hbase zk 地址
 4             configuration.set("hbase.zookeeper.quorum","hadoop102:2181,hadoop103:2181,hadoop104:2181");
 5             Job job = Job.getInstance(configuration);
 6             job.setJarByClass(Hdfs2HbaseMR.class);
 7             //輸入文件路徑
 8             FileInputFormat.addInputPath(job,new Path("hdfs://hadoop102:9000/hbase/input"));
 9             job.setMapperClass(HdfsMapper.class);
10             job.setMapOutputKeyClass(Text.class);
11             job.setMapOutputValueClass(NullWritable.class);
12             //指定輸出到 Hbase 的 表名
13             TableMapReduceUtil.initTableReducerJob("user2",HBASEReducer.class,job);
14             //設置 reduce 個數
15             job.setNumReduceTasks(1);
16             boolean b = job.waitForCompletion(true);
17             System.exit(b?0:1);
18         }

 

 

0HDFS 大數據導入Hbase

 

思路:與 2 中的數據導入不一樣的是此次的數據量比較大,使用常規的 MR 可能耗時很是的長,而且一直佔用資源。

 

咱們能夠先將 Hadoop 上存儲的 HDFS 文件轉換成 HFile 文件,HFile 文件就是 Hbase 底層存儲的類型,轉換完成後,再將轉換好的 HFile 文件指定給對應的 Hbase 表便可。這就是 bulkload 的方式批量加載數據,大體流程以下:

 

 

 

注意:

 

因爲是文件類型轉換,不作計算操做,因此只須要讀入的 Mapper 操做,,不須要Reduce操做;

 

文件類型轉換後 還須要作 Hbase 表與 HFile 文件的映射

 

準備工做:


準備 HDFS 上數據 ;

準備 user2 表,建立列族 f1,做爲輸出表。

 

主要代碼:

 

Mapper 端,使用常規 Mapper

 

 1 public class Hdfs2HFileMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put> {
 2     @Override
 3     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 4         String[] split = value.toString().split("\t");
 5         //封裝輸出類型
 6         Put put = new Put(split[0].getBytes());
 7         put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes());
 8         put.addColumn("f1".getBytes(),"age".getBytes(),split[2].getBytes());
 9         // 將封裝好的put對象輸出,rowkey 使用 immutableBytesWritable
10         context.write(new ImmutableBytesWritable(Bytes.toBytes(split[0])),put);
11     }
12 }

 

 

啓動類:

 

 1 /**
 2  *
 3  * 將HDFS文件寫成Hfile格式輸出
 4  */
 5 public class Hdfs2HileOut extends Configured implements Tool {
 6  7     public static void main(String[] args) throws Exception {
 8         Configuration configuration = HBaseConfiguration.create();
 9         configuration.set("hbase.zookeeper.quorum","hadoop102:2181,hadoop103:2181,hadoop104:2181");
10         int run = ToolRunner.run(configuration, new Hdfs2HileOut(), args);
11         System.exit(run);
12     }
13     @Override
14     public int run(String[] strings) throws Exception {
15         Configuration conf = super.getConf();
16         Job job = Job.getInstance(conf);
17         job.setJarByClass(Hdfs2HileOut.class);
18         FileInputFormat.addInputPath(job,new Path("hdfs://hadoop102:9000/hbase/input"));
19 20         job.setMapperClass(Hdfs2HFileMapper.class);
21         job.setMapOutputKeyClass(ImmutableBytesWritable.class);
22         job.setMapOutputValueClass(Put.class);
23         Connection connection = ConnectionFactory.createConnection(conf);
24         Table table = connection.getTable(TableName.valueOf("user2"));
25         //使MR能夠向user2表中,增量增長數據
26         HFileOutputFormat2.configureIncrementalLoad(job,table,connection.getRegionLocator(TableName.valueOf("user2")));
27         //數據寫回到HDFS 寫成HFILE -》 因此指定輸出格式爲Hfile
28         job.setOutputFormatClass(HFileOutputFormat2.class);
29         //HFile 輸出的路徑,用於與表映射的輸入參數
30         HFileOutputFormat2.setOutputPath(job,new Path("hdfs://hadoop102:9000/hbase/out_hfile2"));
31         //開始執行
32         boolean b = job.waitForCompletion(true);
33         return b? 0: 1;
34     }
35 }

 

 

加載類:

 

public class LoadHFile2Hbase {
    public static void main(String[] args) throws Exception {
        Configuration configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum","hadoop102:2181,hadoop103:2181,hadoop104:2181");
        //獲取數據庫鏈接
        Connection connection = ConnectionFactory.createConnection(configuration);
        Table table = connection.getTable(TableName.valueOf("user2"));
        //構建 LoadIncrementalHfiles 加載 Hfile文件
        LoadIncrementalHFiles loadIncrementalHFiles = new LoadIncrementalHFiles(configuration);
        // 加載上一步輸出的HFile 與表作映射
        loadIncrementalHFiles.doBulkLoad(new Path("hdfs://hadoop102:9000/hbase/out_hfile2"),connection.getAdmin(),table,connection.getRegionLocator(TableName.valueOf("user2")));
    }
}

 

 

至此,HDFS 數據遷移至 Hbase 完成。

 

 


 

 

2   Hbase 與 Hive 集成

 

hbase 與 hive 相關的數據遷移工做分爲兩種:

 

  • hive 表結果 ---> hbase 表

     

  • hbase 表數據 --->  hive 表

 

這部分操做沒有代碼,在 hive 和 hbase 客戶端就能完成操做

 

01 準備工做

 

 

1 首先須要將 Hbase下的5個包拷貝到 hive lib 下,建議使用軟鏈接的形式:

 

ln -s /home/hadoop/module/hbase-1.2.0-cdh5.14.2/lib/hbase-client-1.2.0-cdh5.14.2.jar  /home/hadoop/module/hive-1.1.0-cdh5.14.2/lib/hbase-client-1.2.0-cdh5.14.2.jar   
ln -s /home/hadoop/module/hbase-1.2.0-cdh5.14.2/lib/hbase-hadoop2-compat-1.2.0-cdh5.14.2.jar  /home/hadoop/module/hive-1.1.0-cdh5.14.2/lib/hbase-hadoop2-compat-1.2.0-cdh5.14.2.jar
ln -s home/hadoop/module/hbase-1.2.0-cdh5.14.2/lib/hbase-hadoop-compat-1.2.0-cdh5.14.2.jar       /home/hadoop/module/hive-1.1.0-cdh5.14.2/lib/hbase-hadoop-compat-1.2.0-cdh5.14.2.jar          
ln -s home/hadoop/module/hbase-1.2.0-cdh5.14.2/lib/hbase-it-1.2.0-cdh5.14.2.jar       /home/hadoop/module/hive-1.1.0-cdh5.14.2/lib/hbase-it-1.2.0-cdh5.14.2.jar   
ln -s home/hadoop/module/hbase-1.2.0-cdh5.14.2/lib/hbase-server-1.2.0-cdh5.14.2.jar        /home/hadoop/module/hive-1.1.0-cdh5.14.2/lib/hbase-server-1.2.0-cdh5.14.2.jar

 

 

2 修改 Hive 的配置文件 hive-site.xml 添加本身的 zk 信息:

 

<property>
    <name>hive.zookeeper.quorum</name>
    <value>hadoop102,hadoop103,hadoop104</value>
  </property>
  <property>
    <name>hbase.zookeeper.quorum</name>
    <value>hadoop102,hadoop103,hadoop104</value>
  </property>
 

3 修改 Hive 的配置文件 hive-env.sh 添加以下信息:

 

export HADOOP_HOME=/kkb/install/hadoop-2.6.0-cdh5.14.2/
export HBASE_HOME=/kkb/install/servers/hbase-1.2.0-cdh5.14.2
export HIVE_CONF_DIR=/kkb/install/hive-1.1.0-cdh5.14.2/conf
 

至此 準備工做完成。

 

02 hive表導入hbase

 

hive 中建立管理表(內部表)與hbase 表完成映射則hive管理表的數據會添加到 hbase 表中 ,命令以下:

 

create table course.hbase_score(id int,cname string,score int) 
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'  
with serdeproperties("hbase.columns.mapping" = "cf:name,cf:score") 
tblproperties("hbase.table.name" = "hbase_score");
 

從命令中能夠看出 hbase.table.name 是指的 hbase 表名,hbase.columns.mapping 則值的對應列族下的字段,而 hive 表的 id 則會做爲hbase表的 rowkey 進行存儲。

 

經過向內部表插入數據便可完成數據查詢結果的導入。

 

insert overwrite table course.hbase_score select id,cname,score from course.score;

 

最後查看 hbase 表便可看到數據。

 

03 hbase表導入hive 

 

hbase 結果映射到 hive表比較簡單,建立 hive 外部表便可:

 

CREATE external TABLE hbase2hive(id int, name string, score int) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:name,cf:score") 
TBLPROPERTIES("hbase.table.name" ="hbase_hive_score");
 

從命令中能夠看出 hbase.table.name 是指的 hbase 表名,hbase.columns.mapping 的值則對應hive表的字段,而 hive 表的 id 則會做取 hbase表的 rowkey 進行存儲。

 

至此,Hbase 與 Hive 的數據遷移就完成了。

 


 

 

3  Hbase 協處理器和基礎 api

 

關於基礎api這部分比較詳細的介紹就在代碼中了,再此咱們就簡單說一下Hbase 協處理器。

 

協處理器是爲了解決Hbase早期版本的一些問題,如創建二次索引、複雜過濾器、求和計數分組計數等類sql操做以及訪問控制等。

 

Hbase 提供兩類協處理器:

 

  • observer 相似數據庫的觸發器,我的理解相似攔截器的功能;

     

  • endpoint 相似數據庫的存儲過程,能夠實現類sql的統計操做。

 

 

協處理器的加載方式

 

01 靜態加載實現

 

經過修改 hbase-site.xml 這個文件來實現, 如啓動全局 aggregation,能過操縱全部的表數據。只須要在hbase-site.xml裏面添加如下配置便可,修改完配置以後須要重啓HBase集羣。

 

<property>
  <name>hbase.coprocessor.user.region.classes</name>
  <value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>
</property>

 

爲全部table加載了一個 cp class,能夠用」 ,」分割加載多個 class。

 

02 動態加載實現

 

啓用表aggregation,只對特定的表生效。

 

下面以協處理器 observer 爲例來簡單說下操做過程:

 

1 建立 兩張 hbase 表,user1 ,user2:

 

create 'user1','info;
create 'user2','info';

 

2 協處理器代碼開發,完成往 user1 表插入數據時,先往 user2 表插入數據,代碼以下:

 

 1 public class MyProcessor extends BaseRegionObserver {
 2     @Override
 3     public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
 4 //獲取鏈接
 5         Configuration configuration = HBaseConfiguration.create();
 6         configuration.set("hbase.zookeeper.quorum","hadoop102:2181,hadoop103:2181:hadoop104:2181");
 7         Connection connection = ConnectionFactory.createConnection(configuration);
 8 //涉及多個版本得問題
 9         List<Cell> cells = put.get("info".getBytes(), "name".getBytes());
10 //將user1表的name 數據也插入到 user2 中
11         Cell nameCell = cells.get(0);
12         Put put1 = new Put(put.getRow());
13         put1.add(nameCell);
14         Table table = connection.getTable(TableName.valueOf("user2"));
15         table.put(put1);
16         table.close();
17         connection.close();
18     }
19 } 
 

3 將開發好的項目打包上傳到 HDFS ,路徑自定,假設是:

 

hdfs://hadoop102:9000/processor/processor.jar
 

4 將 jar 包掛載到 user1 表:

 

disable 'user1';
alter 'user1',METHOD => 'table_att','Coprocessor'=>'hdfs://hadoop102:9000/processor/processor.jar|com.bigdata.comprocessor.MyProcessor|1001|';
enabled 'user1';
 

com.bigdata.comprocessor.MyProcessor : 你程序的全類名;

 

1001 :協處理器編號,自定義便可,表中協處理器的編號不能重複。

 

5 測試向 user1 中插入數據,user2 是否有數據:

 

 1 public class TestObserver {
 2  3     @Test
 4     public void testPut() throws IOException {
 5  6         //獲取鏈接
 7         Configuration configuration = HBaseConfiguration.create();
 8         configuration.set("hbase.zookeeper.quorum", "hadoop102:2181,hadoop103:2181,hadoop104:2181");
 9         //建立鏈接對象
10         Connection connection = ConnectionFactory.createConnection(configuration);
11         Table proc1 = connection.getTable(TableName.valueOf("user1"));
12         Put put = new Put("1110001112".getBytes());
13 14         put.addColumn("info".getBytes(),"name".getBytes(),"hello".getBytes());
15         put.addColumn("info".getBytes(),"gender".getBytes(),"male".getBytes());
16         put.addColumn("info".getBytes(),"nationality".getBytes(),"test".getBytes());
17         proc1.put(put);
18         proc1.close();
19         connection.close();
20         System.out.println("success");
21 22     }
23 }

 

關於協處理器卸載:

 

disable 'user1'
alter 'user1',METHOD=>'table_att_unset',NAME=>'coprocessor$1'
enable 'user1'

 

協處理器 observer 大體開發流程就是這樣的。關於基礎 api 放在參考資料的項目中了。

 

至此,還留有一個問題就是 hbase 的 endpoint 協處理器,其實它解決的問題及時實現 min、 max、 avg、 sum、 distinct、 group by 等sql功能,這個問題咱們放在下期,下期介紹一個基於 hbase 框架之上的框架 -- phoenix,Phoenix之於 Hbase ,就像 hive 之於 Hadoop,會完美的實現 hbase 的 sql 查詢操做。

 

項目代碼地址: https://github.com/fanpengyi/hbase-api  

 

-- THE END --

相關文章
相關標籤/搜索