Flink 使用(一)——從kafka中讀取數據寫入到HBASE中

一、前言

  本文是在《如何計算實時熱門商品》[1]一文上作的擴展,僅在功能上驗證了利用Flink消費Kafka數據,把處理後的數據寫入到HBase的流程,其具體性能未作調優。此外,文中並未就Flink處理邏輯作過多的分析,只因引文(若不特殊說明,文中引文皆指《如何計算實時熱門商品》一文)中寫的很詳細了,故僅給出博主調試犯下的錯。文中如有錯誤,歡迎大夥留言指出,謝謝html

  源碼在GitHub上,地址:https://github.com/L-Wg/flinkExamplegit

  環境:Flink 1.6+Kafka 1.1+HBase 1.2github

       OpenJDK 1.8+Maven 3.5.2apache

二、獲取數據

  本文是將Kafka做爲數據源(目前業界比較流行的作法),數據的格式和引文的格式一致,數據類型爲POJO。爲添加源,通常是實現接口SourceFunction<T>,可是Flink與Kafka的連接器(connector),Flink社區已經作好了,咱們只需在pom文件中加入相應的依賴便可。這裏有值得注意的一點是:flink-connector-kafka-*.jar是有版本要求的,其具體的要求能夠參加Flink官網connector一節[2]。代碼以下:bootstrap

DataStream<UserBehaviorSchema> dataStream=env.addSource(new FlinkKafkaConsumer010<UserBehaviorSchema>(
                topic,
                new UserBehaviorSerial(),
                properties
        ).setStartFromEarliest());

 其中,在代碼中需指定的有:要消費的topic、數據序列化的對象以及配置,其中,配置可指定bootstrap.servers便可,其餘配置按需設置。調用setStarFromEarliest()是爲讓Flink從頭消費指定topic中數據,這樣寫的好處是:只要你Kafka topic中存在數據,測試時就不用從新往kafka裏寫數據了。固然調用該方法不單單是這個做用,其在業務上的使用需根據需求。此外,Flink中還有諸多指定消費kafka的方法,詳情請見官網[2]ide

這裏值得說的一點是獲取數據後,dataStream的值是不變的,不會由於作過flatmap等操做後就會改變。性能

三、數據轉換

  對Flink 代碼的分析過程見引文,此處僅有如下幾點需說明的:測試

  1.  如果kafka中的數據是本身按照由於數據格式隨機生成的,請不要按照博主代碼中customWaterExtractor()類的寫法去定義watermark和timestamp,由於代碼中的currentTimeStamp的值可能也是隨機的,因此就會形成程序不報錯可是卡死等待的狀況。spa

  2.  timestamp的值要和數據源中數據保持相同的數據級。調試

public static class customWaterExtractor implements AssignerWithPeriodicWatermarks<UserBehaviorSchema>{

        private static final long serialVersionUID = 298015256202705122L;

        private final long maxOutOrderness=3500;
        private long currentTimeStamp=Long.MIN_VALUE;

        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
            return new Watermark(currentTimeStamp-maxOutOrderness);
        }

        @Override
        public long extractTimestamp(UserBehaviorSchema element, long previousElementTimestamp) {
//          此處須要注意的點:timestamp獲得的值是否乘以1000轉換爲毫秒,須要根據消息中被指定爲timestamp字段的單位。
            long timeStamp=element.timestamp*1000;
            currentTimeStamp=Math.max(timeStamp,currentTimeStamp);
            return timeStamp;
        }
    }

  3.  在返回的結果類ResultEvent中,使用sinking字段去保存HotTopN的名次,其默認值爲0。  

 四、數據存儲

  本文中是經過extends RichSinkFunction來實現將數據寫入HBase中,其中,@Override的invoke()方法是針對每條數據都會調用的,其他的open()、close()方法,從日誌上看是否是針對每條數據都會調用。對open()方法用於打開連接,最好實現鏈接池避免連接過多,此處HBase的connection已自身實現不用單獨實現。

  數據寫入HBase時,有兩點建議:

  1.  將數據寫入HBase的表中時,最好先作好表的預分區工做,避免後期由於表的split形成性能降低以及維護上的困難;

  2.  爲加快HBase的查詢速度,能夠將制定字段做爲HBase表的rowkey,文中是指定時間戳和排名做爲表的rowkey,至於二級索引等暫不在此處討論。

五、參考文獻連接:

  [1]http://wuchong.me/blog/2018/11/07/use-flink-calculate-hot-items/

  [2]https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html

相關文章
相關標籤/搜索