HBase數據遷移到Kafka實戰

1.概述

在實際的應用場景中,數據存儲在HBase集羣中,可是因爲一些特殊的緣由,須要將數據從HBase遷移到Kafka。正常狀況下,通常都是源數據到Kafka,再有消費者處理數據,將數據寫入HBase。可是,若是逆向處理,如何將HBase的數據遷移到Kafka呢?今天筆者就給你們來分享一下具體的實現流程。html

2.內容

通常業務場景以下,數據源頭產生數據,進入Kafka,而後由消費者(如Flink、Spark、Kafka API)處理數據後進入到HBase。這是一個很典型的實時處理流程。流程圖以下:app

 

 上述這類實時處理流程,處理數據都比較容易,畢竟數據流向是順序處理的。可是,若是將這個流程逆向,那麼就會遇到一些問題。分佈式

2.1 海量數據

HBase的分佈式特性,集羣的橫向拓展,HBase中的數據每每都是百億、千億級別,或者數量級更大。這類級別的數據,對於這類逆向數據流的場景,會有個很麻煩的問題,那就是取數問題。如何將這海量數據從HBase中取出來?ide

2.2 沒有數據分區

咱們知道HBase作數據Get或者List<Get>很快,也比較容易。而它又沒有相似Hive這類數據倉庫分區的概念,不能提供某段時間內的數據。若是要提取最近一週的數據,可能全表掃描,經過過濾時間戳來獲取一週的數據。數量小的時候,可能問題不大,而數據量很大的時候,全表去掃描HBase很困難。函數

3.解決思路

對於這類逆向數據流程,如何處理。其實,咱們能夠利用HBase Get和List<Get>的特性來實現。由於HBase經過RowKey來構建了一級索引,對於RowKey級別的取數,速度是很快的。實現流程細節以下:工具

 

 數據流程如上圖所示,下面筆者爲你們來剖析每一個流程的實現細節,以及注意事項。oop

3.1 Rowkey抽取

咱們知道HBase針對Rowkey取數作了一級索引,因此咱們能夠利用這個特性來展開。咱們能夠將海量數據中的Rowkey從HBase表中抽取,而後按照咱們制定的抽取規則和存儲規則將抽取的Rowkey存儲到HDFS上。學習

這裏須要注意一個問題,那就是關於HBase Rowkey的抽取,海量數據級別的Rowkey抽取,建議採用MapReduce來實現。這個得益於HBase提供了TableMapReduceUtil類來實現,經過MapReduce任務,將HBase中的Rowkey在map階段按照指定的時間範圍進行過濾,在reduce階段將rowkey拆分爲多個文件,最後存儲到HDFS上。大數據

這裏可能會有同窗有疑問,都用MapReduce抽取Rowkey了,爲啥不直接在掃描處理列簇下的列數據呢?這裏,咱們在啓動MapReduce任務的時候,Scan HBase的數據時只過濾Rowkey(利用FirstKeyOnlyFilter來實現),不對列簇數據作處理,這樣會快不少。對HBase RegionServer的壓力也會小不少。spa

Row Column
row001 info:name
row001 info:age
row001 info:sex
row001 info:sn

這裏舉個例子,好比上表中的數據,其實咱們只須要取出Rowkey(row001)。可是,實際業務數據中,HBase表描述一條數據可能有不少特徵屬性(例如姓名、性別、年齡、身份證等等),可能有些業務數據一個列簇下有十幾個特徵,可是他們卻只有一個Rowkey,咱們也只須要這一個Rowkey。那麼,咱們使用FirstKeyOnlyFilter來實現就很合適了。

/**
 * A filter that will only return the first KV from each row.
 * <p>
 * This filter can be used to more efficiently perform row count operations.
 */

這個是FirstKeyOnlyFilter的一段功能描述,它用於返回第一條KV數據,官方其實用它來作計數使用,這裏咱們稍加改進,把FirstKeyOnlyFilter用來作抽取Rowkey。

3.2 Rowkey生成

抽取的Rowkey如何生成,這裏可能根據實際的數量級來確認Reduce個數。建議生成Rowkey文件時,切合實際的數據量來算Reduce的個數。儘可能不用爲了使用方便就一個HDFS文件,這樣後面很差維護。舉個例子,好比HBase表有100GB,咱們能夠拆分爲100個文件。

3.3 數據處理

在步驟1中,按照抽取規則和存儲規則,將數據從HBase中經過MapReduce抽取Rowkey並存儲到HDFS上。而後,咱們在經過MapReduce任務讀取HDFS上的Rowkey文件,經過List<Get>的方式去HBase中獲取數據。拆解細節以下:

 

 Map階段,咱們從HDFS讀取Rowkey的數據文件,而後經過批量Get的方式從HBase取數,而後組裝數據發送到Reduce階段。在Reduce階段,獲取來自Map階段的數據,寫數據到Kafka,經過Kafka生產者回調函數,獲取寫入Kafka狀態信息,根據狀態信息判斷數據是否寫入成功。若是成功,記錄成功的Rowkey到HDFS,便於統計成功的進度;若是失敗,記錄失敗的Rowkey到HDFS,便於統計失敗的進度。

3.4 失敗重跑

經過MapReduce任務寫數據到Kafka中,可能會有失敗的狀況,對於失敗的狀況,咱們只須要記錄Rowkey到HDFS上,當任務執行完成後,再去程序檢查HDFS上是否存在失敗的Rowkey文件,若是存在,那麼再次啓動步驟10,即讀取HDFS上失敗的Rowkey文件,而後再List<Get> HBase中的數據,進行數據處理後,最後再寫Kafka,以此類推,直到HDFS上失敗的Rowkey處理完成爲止。

 

4.實現代碼

這裏實現的代碼量也並不複雜,下面提供一個僞代碼,能夠在此基礎上進行改造(例如Rowkey的抽取、MapReduce讀取Rowkey並批量Get HBase表,而後在寫入Kafka等)。示例代碼以下:

public class MRROW2HDFS {

    public static void main(String[] args) throws Exception {

        Configuration config = HBaseConfiguration.create(); // HBase Config info
        Job job = Job.getInstance(config, "MRROW2HDFS");
        job.setJarByClass(MRROW2HDFS.class);
        job.setReducerClass(ROWReducer.class);

        String hbaseTableName = "hbase_tbl_name";

        Scan scan = new Scan();
        scan.setCaching(1000);
        scan.setCacheBlocks(false);
        scan.setFilter(new FirstKeyOnlyFilter());

        TableMapReduceUtil.initTableMapperJob(hbaseTableName, scan, ROWMapper.class, Text.class, Text.class, job);
        FileOutputFormat.setOutputPath(job, new Path("/tmp/rowkey.list")); // input you storage rowkey hdfs path
        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

    public static class ROWMapper extends TableMapper<Text, Text> {

        @Override
        protected void map(ImmutableBytesWritable key, Result value,
                Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
                throws IOException, InterruptedException {

            for (Cell cell : value.rawCells()) {
                // Filter date range
                // context.write(...);
            }
        }
    }
    
    public static class ROWReducer extends Reducer<Text,Text,Text,Text>{
        private Text result = new Text();
        
        @Override
        protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
            for(Text val:values){
                result.set(val);
                context.write(key, result);
            }
        }
    }
}

5.總結

整個逆向數據處理流程,並不算複雜,實現也是很基本的MapReduce邏輯,沒有太複雜的邏輯處理。在處理的過程當中,須要幾個細節問題,Rowkey生成到HDFS上時,可能存在行位空格的狀況,在讀取HDFS上Rowkey文件去List<Get>時,最好對每條數據作個過濾空格處理。另外,就是對於成功處理Rowkey和失敗處理Rowkey的記錄,這樣便於任務失敗重跑和數據對帳。能夠知曉數據遷移進度和完成狀況。同時,咱們可使用Kafka Eagle監控工具來查看Kafka寫入進度。

6.結束語

這篇博客就和你們分享到這裏,若是你們在研究學習的過程中有什麼問題,能夠加羣進行討論或發送郵件給我,我會盡我所能爲您解答,與君共勉!

另外,博主出書了《Kafka並不難學》和《Hadoop大數據挖掘從入門到進階實戰》,喜歡的朋友或同窗, 能夠在公告欄那裏點擊購買連接購買博主的書進行學習,在此感謝你們的支持。關注下面公衆號,根據提示,可免費獲取書籍的教學視頻。

相關文章
相關標籤/搜索