HBase2實戰:HBase Flink和Kafka整合

1.概述

Apache官方發佈HBase2已經有一段時間了,HBase2中包含了許多個Features,從官方JIRA來看,大約有4500+個ISSUES(查看地址),從版本上來看是一個很是大的版本了。本篇博客將爲你們介紹HBase2的新特性,以及如何在實戰中與Flink、Kafka等組件進行整合。html

2.內容

HBase2有哪些新特性值得咱們去關注,這裏給你們列舉部分特定。java

2.1 部分新特性預覽

2.1.1 Region分配優化

在HBase中遇到比較頻繁的問題就是RIT問題,而在新特性中,對於Region的管理和分配有了新的調整。AssignmentManager基於ProcedureV2實現,能夠快速的分配Region,另外維護Region的State存儲再也不依賴Zookeeper,可以更好的面對Region長時間的RIT問題。shell

具體參考JIRA單:[HBASE-14614]、[HBASE-17844]、[HBASE-14350]apache

2.1.2 Offheap優化

在HBase2中減小了對Heap內存的使用,改成Offheap內存,減小垃圾的產生,以及減小GC的停頓時間。json

參考JIRA單:[HBASE-11425]bootstrap

2.1.3 Compaction優化

在HBase2中,引入了MemStore新的實現類CompactingMemstore,這個類和默認的DefaultMemStore類的區別在於實現了在內存中進行Compaction。併發

CompactingMemstore中,數據是經過Segment做爲單位進行組織的,一個MemStore中包含多個Segment。數據最開始寫入時會進入到一個處理Active狀態的Segment中,這個Segment是能夠被修改的。當該Active狀態的Segment中的數據達到閥值後,不是直接Flush到HDFS的HFile文件中,而是先Flush到內存中的一個不可修改的Segment中。CompactingMemstore會在後臺將多個不可修改的Segment合併爲一個更大、更緊湊的Segment。異步

若是RegionServer須要把MemStore中的數據Flush到磁盤,會先選擇其餘類型的MemStore,而後在選擇CompactingMemstore。這是因爲CompactingMemstore對內存的管理更加高效,因此延長CompactingMemstore的生命週期能夠減小總的I/O。當CompactingMemstore被Flush到磁盤時,不可修改的Segment會被移到一個快照中進行合併,而後寫入HFile。jsp

參考JIRA單:[HBASE-15991]ide

2.1.4 RegionServer Group

在引入RegionServer Group以前,HBase默認使用StochasticLoadBalancer策略將表的Region移到到RegionServer裏面。在HBase2中,能夠將RegionServer劃分到多個邏輯組中,這樣能夠提供多租戶的能力。

參考JIRA單:[HBASE-6721]、[HBASE-16430]、[HBASE-17589]、[HBASE-17350]、[HBASE-17349]

2.1.5 Add new AsyncRpcClient

在HBase2中,客戶端請求改成異步RPC機制,再也不是同步Wait,這樣能大大有效的提升客戶端請求的併發量,有效的提升資源利用率。

參考JIRA單:[HBASE-13784]、[HBASE-12684]

3.實戰整合

瞭解了HBase2的一些新特性以後,如何將HBase2運用到實際項目中去,下面將爲你們介紹如何將HBase整合到Flink和Kafka中。數據流向以下圖所示:

 

3.1 基礎環境

整合環境以下所示:

  • JDK1.8
  • HBase-2.1.1
  • Flink-1.7.1
  • Kafka-2.1.0

3.1.1 依賴JAR

整合實戰項目,須要依賴的JAR信息以下:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
    <version>1.7.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.7.1</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-hbase_2.12</artifactId>
    <version>1.7.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>1.7.1</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.7.4</version>
</dependency>

建議使用Maven來管理,能夠很方便的將上述依賴信息配置到pom.xml文件中。

3.2 數據準備

準備數據源,將數據寫入到Kafka集羣,經過Flink進行消費,進行業務邏輯處理,而後將處理後的結果寫入到HBase進行落地。數據準備的實現代碼以下:

public class JProducer extends Thread {

    public static void main(String[] args) {
        JProducer jproducer = new JProducer();
        jproducer.start();
    }
    
    @Override
    public void run() {
        producer();
    }

    private void producer() {
        Properties props = config();
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
            String json = "{\"id\":" + i + ",\"ip\":\"192.168.0." + i + "\",\"date\":" + new Date().toString() + "}";
            String k = "key" + i;
            producer.send(new ProducerRecord<String, String>("flink_topic", k, json));
        }
        producer.close();
    }

    private Properties config() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092");
        props.put("acks", "1");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("partitioner.class", "org.smartloli.kafka.connector.flink.producer.TestSimplePartitioner");
        return props;
    }
}

經過上述應用程序,將生產的消息數據寫入到Kafka的Topic中,準備好數據源。

3.3 處理數據並落地到HBase

使用Flink消費Kafka集羣中剛剛準備好的數據源,而後進行邏輯處理後,將結果寫入到HBase集羣進行存儲,具體實現代碼以下:

public class FlinkHBase {

    private static String zkServer = "dn1,dn2,dn3";
    private static String port = "2181";
    private static TableName tableName = TableName.valueOf("testflink");
    private static final String cf = "ke";
    private static final String topic = "flink_topic";

    public static void main(String[] args) {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<String> transction = env.addSource(new FlinkKafkaConsumer010<String>(topic, new SimpleStringSchema(), configByKafka()));
        transction.rebalance().map(new MapFunction<String, Object>() {
            private static final long serialVersionUID = 1L;

            public String map(String value) throws IOException {
                write2HBase(value);
                return value;
            }
        }).print();
        try {
            env.execute();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    public static Properties configByKafka() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092");
        props.put("group.id", "kv_flink");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }

    public static void write2HBase(String value) throws IOException {
        Configuration config = HBaseConfiguration.create();

        config.set("hbase.zookeeper.quorum", zkServer);
        config.set("hbase.zookeeper.property.clientPort", port);
        config.setInt("hbase.rpc.timeout", 30000);
        config.setInt("hbase.client.operation.timeout", 30000);
        config.setInt("hbase.client.scanner.timeout.period", 30000);

        Connection connect = ConnectionFactory.createConnection(config);
        Admin admin = connect.getAdmin();
        if (!admin.tableExists(tableName)) {
            admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(cf)));
        }
        Table table = connect.getTable(tableName);
        TimeStamp ts = new TimeStamp(new Date());
        Date date = ts.getDate();
        Put put = new Put(Bytes.toBytes(date.getTime()));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("test"), Bytes.toBytes(value));
        table.put(put);
        table.close();
        connect.close();
    }
}

將該應用程序提交到Flink集羣,經過Flink消費Kafka集羣中的數據,成功執行該應用程序後,能夠到HBase集羣進行驗證,看數據是否有寫入成功。

3.4 數據驗證

進入到HBase集羣,執行hbase shell命令進入到Console界面,而後執行以下命令查看數據是否有寫入成功:

hbase(main):009:0> scan 'testflink',LIMIT=>2

執行上述命令,結果以下所示:

4.總結

HBase2發佈的新特性頗有必要去研究和剖析,對於優化HBase集羣或多或少有些許幫助。經過研究這些新特性,來幫助咱們有效的應用到實戰項目中。

5.結束語

這篇博客就和你們分享到這裏,若是你們在研究學習的過程中有什麼問題,能夠加羣進行討論或發送郵件給我,我會盡我所能爲您解答,與君共勉!

另外,博主出書了《Kafka並不難學》,喜歡的朋友或同窗, 能夠在公告欄那裏點擊購買連接購買博主的書進行學習,在此感謝你們的支持。

相關文章
相關標籤/搜索