Apache官方發佈HBase2已經有一段時間了,HBase2中包含了許多個Features,從官方JIRA來看,大約有4500+個ISSUES(查看地址),從版本上來看是一個很是大的版本了。本篇博客將爲你們介紹HBase2的新特性,以及如何在實戰中與Flink、Kafka等組件進行整合。html
HBase2有哪些新特性值得咱們去關注,這裏給你們列舉部分特定。java
在HBase中遇到比較頻繁的問題就是RIT問題,而在新特性中,對於Region的管理和分配有了新的調整。AssignmentManager基於ProcedureV2實現,能夠快速的分配Region,另外維護Region的State存儲再也不依賴Zookeeper,可以更好的面對Region長時間的RIT問題。shell
具體參考JIRA單:[HBASE-14614]、[HBASE-17844]、[HBASE-14350]apache
在HBase2中減小了對Heap內存的使用,改成Offheap內存,減小垃圾的產生,以及減小GC的停頓時間。json
參考JIRA單:[HBASE-11425]bootstrap
在HBase2中,引入了MemStore新的實現類CompactingMemstore,這個類和默認的DefaultMemStore類的區別在於實現了在內存中進行Compaction。併發
CompactingMemstore中,數據是經過Segment做爲單位進行組織的,一個MemStore中包含多個Segment。數據最開始寫入時會進入到一個處理Active狀態的Segment中,這個Segment是能夠被修改的。當該Active狀態的Segment中的數據達到閥值後,不是直接Flush到HDFS的HFile文件中,而是先Flush到內存中的一個不可修改的Segment中。CompactingMemstore會在後臺將多個不可修改的Segment合併爲一個更大、更緊湊的Segment。異步
若是RegionServer須要把MemStore中的數據Flush到磁盤,會先選擇其餘類型的MemStore,而後在選擇CompactingMemstore。這是因爲CompactingMemstore對內存的管理更加高效,因此延長CompactingMemstore的生命週期能夠減小總的I/O。當CompactingMemstore被Flush到磁盤時,不可修改的Segment會被移到一個快照中進行合併,而後寫入HFile。jsp
參考JIRA單:[HBASE-15991]ide
在引入RegionServer Group以前,HBase默認使用StochasticLoadBalancer策略將表的Region移到到RegionServer裏面。在HBase2中,能夠將RegionServer劃分到多個邏輯組中,這樣能夠提供多租戶的能力。
參考JIRA單:[HBASE-6721]、[HBASE-16430]、[HBASE-17589]、[HBASE-17350]、[HBASE-17349]
在HBase2中,客戶端請求改成異步RPC機制,再也不是同步Wait,這樣能大大有效的提升客戶端請求的併發量,有效的提升資源利用率。
參考JIRA單:[HBASE-13784]、[HBASE-12684]
瞭解了HBase2的一些新特性以後,如何將HBase2運用到實際項目中去,下面將爲你們介紹如何將HBase整合到Flink和Kafka中。數據流向以下圖所示:
整合環境以下所示:
整合實戰項目,須要依賴的JAR信息以下:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.12</artifactId> <version>1.7.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.7.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hbase_2.12</artifactId> <version>1.7.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.7.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.4</version> </dependency>
建議使用Maven來管理,能夠很方便的將上述依賴信息配置到pom.xml文件中。
準備數據源,將數據寫入到Kafka集羣,經過Flink進行消費,進行業務邏輯處理,而後將處理後的結果寫入到HBase進行落地。數據準備的實現代碼以下:
public class JProducer extends Thread { public static void main(String[] args) { JProducer jproducer = new JProducer(); jproducer.start(); } @Override public void run() { producer(); } private void producer() { Properties props = config(); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { String json = "{\"id\":" + i + ",\"ip\":\"192.168.0." + i + "\",\"date\":" + new Date().toString() + "}"; String k = "key" + i; producer.send(new ProducerRecord<String, String>("flink_topic", k, json)); } producer.close(); } private Properties config() { Properties props = new Properties(); props.put("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092"); props.put("acks", "1"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("partitioner.class", "org.smartloli.kafka.connector.flink.producer.TestSimplePartitioner"); return props; } }
經過上述應用程序,將生產的消息數據寫入到Kafka的Topic中,準備好數據源。
使用Flink消費Kafka集羣中剛剛準備好的數據源,而後進行邏輯處理後,將結果寫入到HBase集羣進行存儲,具體實現代碼以下:
public class FlinkHBase { private static String zkServer = "dn1,dn2,dn3"; private static String port = "2181"; private static TableName tableName = TableName.valueOf("testflink"); private static final String cf = "ke"; private static final String topic = "flink_topic"; public static void main(String[] args) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<String> transction = env.addSource(new FlinkKafkaConsumer010<String>(topic, new SimpleStringSchema(), configByKafka())); transction.rebalance().map(new MapFunction<String, Object>() { private static final long serialVersionUID = 1L; public String map(String value) throws IOException { write2HBase(value); return value; } }).print(); try { env.execute(); } catch (Exception ex) { ex.printStackTrace(); } } public static Properties configByKafka() { Properties props = new Properties(); props.put("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092"); props.put("group.id", "kv_flink"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; } public static void write2HBase(String value) throws IOException { Configuration config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum", zkServer); config.set("hbase.zookeeper.property.clientPort", port); config.setInt("hbase.rpc.timeout", 30000); config.setInt("hbase.client.operation.timeout", 30000); config.setInt("hbase.client.scanner.timeout.period", 30000); Connection connect = ConnectionFactory.createConnection(config); Admin admin = connect.getAdmin(); if (!admin.tableExists(tableName)) { admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(cf))); } Table table = connect.getTable(tableName); TimeStamp ts = new TimeStamp(new Date()); Date date = ts.getDate(); Put put = new Put(Bytes.toBytes(date.getTime())); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("test"), Bytes.toBytes(value)); table.put(put); table.close(); connect.close(); } }
將該應用程序提交到Flink集羣,經過Flink消費Kafka集羣中的數據,成功執行該應用程序後,能夠到HBase集羣進行驗證,看數據是否有寫入成功。
進入到HBase集羣,執行hbase shell命令進入到Console界面,而後執行以下命令查看數據是否有寫入成功:
hbase(main):009:0> scan 'testflink',LIMIT=>2
執行上述命令,結果以下所示:
HBase2發佈的新特性頗有必要去研究和剖析,對於優化HBase集羣或多或少有些許幫助。經過研究這些新特性,來幫助咱們有效的應用到實戰項目中。
這篇博客就和你們分享到這裏,若是你們在研究學習的過程中有什麼問題,能夠加羣進行討論或發送郵件給我,我會盡我所能爲您解答,與君共勉!
另外,博主出書了《Kafka並不難學》,喜歡的朋友或同窗, 能夠在公告欄那裏點擊購買連接購買博主的書進行學習,在此感謝你們的支持。