Apache Flink在惟品會的實踐

做者:王新春docker

整理:郭旭策安全

本文來自於王新春在2018年7月29日 Flink China社區線下 Meetup·上海站的分享。王新春目前在惟品會負責實時平臺相關內容,主要包括實時計算框架和提供實時基礎數據,以及機器學習平臺的工做。以前在美團點評,也是負責大數據平臺工做。他已經在大數據實時處理方向積累了豐富的工做經驗。bash

本文主要內容主要包括如下幾個方面:微信

  1. 惟品會實時平臺現狀網絡

  2. Apache Flink(如下簡稱Flink)在惟品會的實踐架構

  3. Flink On K8S框架

  4. 後續規劃機器學習

1、惟品會實時平臺現狀

目前在惟品會實時平臺並非一個統一的計算框架,而是包括 Storm,Spark,Flink 在內的三個主要計算框架。因爲歷史緣由,當前在 Storm 平臺上的 job 數量是最多的,可是從去年開始,業務重心逐漸切換到 Flink 上面,因此今年在 Flink 上面的應用數量有了大幅增長。oop

實時平臺的核心業務包含八大部分:實時推薦做爲電商的重點業務,包含多個實時特徵;大促看板,包含各類維度的統計指標(例如:各類維度的訂單、UV、轉化率、漏斗等),供領導層、運營、產品決策使用;實時數據清洗,從用戶埋點收集來數據,進行實時清洗和關聯,爲下游的各個業務提供更好的數據;此外還有互聯網金融、安全風控、與友商比價等業務,以及 Logview、Mercury、Titan 做爲內部服務的監控系統、VDRC 實時數據同步系統等。性能

實時平臺的職責主要包括實時計算平臺和實時基礎數據。實時計算平臺在 Storm、Spark、Flink 等計算框架的基礎上,爲監控、穩定性提供了保障,爲業務開發提供了數據的輸入與輸出。實時基礎數據包含對上游埋點的定義和規範化,對用戶行爲數據、MySQL 的 Binlog 日誌等數據進行清洗、打寬等處理,爲下游提供質量保證的數據。

在架構設計上,包括兩大數據源。一種是在App、微信、H5等應用上的埋點數據,原始數據收集後發送到在kafka中;另外一種是線上實時數據的 MySQL Binlog 日誌。數據在計算框架裏面作清洗關聯,把原始的數據經過實時ETL爲下游的業務應用(包括離線寬表等)提供更易於使用的數據。

2、Flink在惟品會的實踐

場景一:Dataeye實時看板

Dataeye 實時看板是支持須要對全部的埋點數據、訂單數據等進行實時計算時,具備數據量大的特色,而且須要統計的維度有不少,例如全站、二級平臺、部類、檔期、人羣、活動、時間維度等,提升了計算的複雜程度,統計的數據輸出指標每秒鐘能夠達到幾十萬。

以 UV 計算爲例,首先對 Kafka 內的埋點數據進行清洗,而後與Redis數據進行關聯,關聯好的數據寫入Kafka中;後續 Flink 計算任務消費 Kafka 的關聯數據。一般任務的計算結果的量也很大(因爲計算維度和指標特別多,能夠達到上千萬),數據輸出經過也是經過 Kafka 做爲緩衝,最終使用同步任務同步到 HBase 中,做爲實時數據展現。同步任務會對寫入 HBase 的數據限流和同類型的指標合併,保護 HBase。與此同時還有另外一路計算方案做爲容災。

在以 Storm 進行計算引擎中進行計算時,須要使用 Redis 做爲中間狀態的存儲,而切換到 Flink 後,Flink 自身具有狀態存儲,節省了存儲空間;因爲不須要訪問 Redis,也提高了性能,總體資源消耗下降到了原來的1/3。

在將計算任務從 Storm 逐步遷移到Flink的過程當中,對兩路方案前後進行遷移,同時將計算任務和同步任務分離,緩解了數據寫入 HBase 的壓力。

切換到 Flink 後也須要對一些問題進行追蹤和改進。對於 FlinkKafkaConsumer,因爲業務緣由對 kafka 中的 Aotu Commit 進行修改,以及對 offset 的設定,須要本身實現支持 kafka 集羣切換的功能。對不帶 window 的state 數據須要手動清理。還有計算框架的通病——數據傾斜問題須要處理。同時對於同步任務追數問題,Storm能夠從 Redis 中取值,Flink 只能等待。

場景二:Kafka數據落地HDFS

以前都是經過 Spark Streaming 的方式去實現,如今正在逐步切換到 Flink 上面,經過 OrcBucketingTableSink 將埋點數據落地到 HDFS上 的 Hive 表中。在 Flink 處理中單 Task Write 可達到3.5K/s左右,使用 Flink 後資源消耗下降了90%,同時將延遲30s下降到了3s之內。目前還在作 Flink 對 Spark Bucket Table 的支持。

場景三:實時的ETL

對於 ETL 處理工做而言,存在的一個痛點就是字典表存儲在 HDFS 中,而且是不斷變化的,而實時的數據流須要與字典表進行 join。字典表的變化是由離線批處理任務引發的,目前的作法是使用ContinuousFileMonitoringFunctionContinuousFileReaderOperator 定時監聽 HDFS 數據變化,不斷地將新數據刷入,使用最新的數據去作 join 實時數據。

咱們計劃作更加通用的方式,去支持 Hive 表和 stream 的 join,實現Hive表數據變化以後,數據自動推送的效果。

3、Flink On K8S

在惟品會內部有一些不一樣的計算框架,有實時計算的,有機器學習的,還有離線計算的,因此須要一個統一的底層框架來進行管理,所以將 Flink 遷移到了 K8S 上。

在 K8S 上使用了思科的網絡組件,每一個docker容器都有獨立的 ip,對外也是可見的。實時平臺的融合器總體架構以下圖所示。

惟品會在K8S上的實現方案與 Flink 社區提供的方案差別仍是很大的。惟品會使用 K8S StatefulSet 模式部署,內部實現了cluster相關的一些接口。一個job對應一個mini cluster,而且支持HA。對於Flink來講,使用 StatefulSet 的最大的緣由是 pod 的 hostname 是有序的;這樣潛在的好處有:

1.hostname爲-0和-1的pod能夠直接指定爲jobmanager;可使用一個statefulset啓動一個cluster,而deployment必須2個;Jobmanager和TaskManager分別獨立的deployment。

  1. pod因爲各類緣由fail後,因爲StatefulSet從新拉起的pod的hostname不變,集羣recover的速度理論上能夠比deployment更快(deployment每次主機名隨機)。

容器的entrypoint

因爲要由主機名來判斷是啓動jobmanager仍是taskmanager,所以須要在entrypoint中去匹配設置的jobmanager的主機名是否有一致。 傳入參數爲:"cluster ha";則自動根據主機名判斷啓動那個角色;也能夠直接指定角色名稱 docker-entrypoint.sh的腳本內容以下:

#!/bin/sh 
# If unspecified, the hostname of the container is taken as the JobManager address

ACTION_CMD="$1"

# if use cluster model, pod ${JOB_CLUSTER_NAME}-0,${JOB_CLUSTER_NAME}-1 as jobmanager
if [ ${ACTION_CMD} == "cluster" ]; then
  jobmanagers=(${JOB_MANGER_HOSTS//,/ })
  ACTION_CMD="taskmanager"
  for i in ${!jobmanagers[@]}
  do
      if [ "$(hostname -s)" == "${jobmanagers[i]}" ]; then
          ACTION_CMD="jobmanager"
          echo "pod hostname match jobmanager config host, change action to jobmanager."
      fi
  done
fi

# if ha model, replace ha configuration
if [ "$2" == "ha" ]; then
  sed -i -e "s|high-availability.cluster-id: cluster-id|high-availability.cluster-id: ${FLINK_CLUSTER_IDENT}|g" "$FLINK_CONF_DIR/flink-conf.yaml"
  sed -i -e "s|high-availability.zookeeper.quorum: localhost:2181|high-availability.zookeeper.quorum: ${FLINK_ZK_QUORUM}|g" "$FLINK_CONF_DIR/flink-conf.yaml"
  sed -i -e "s|state.backend.fs.checkpointdir: checkpointdir|state.backend.fs.checkpointdir: hdfs:///user/flink/flink-checkpoints/${FLINK_CLUSTER_IDENT}|g" "$FLINK_CONF_DIR/flink-conf.yaml"
  sed -i -e "s|high-availability.storageDir: hdfs:///flink/ha/|high-availability.storageDir: hdfs:///user/flink/ha/${FLINK_CLUSTER_IDENT}|g" "$FLINK_CONF_DIR/flink-conf.yaml"
fi

if [ ${ACTION_CMD} == "help" ]; then
    echo "Usage: $(basename "$0") (cluster ha|jobmanager|taskmanager|local|help)"
    exit 0
elif [ ${ACTION_CMD} == "jobmanager" ]; then
    JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
    echo "Starting Job Manager"
    sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_CONF_DIR/flink-conf.yaml"
    sed -i -e "s/jobmanager.heap.mb: 1024/jobmanager.heap.mb: ${JOB_MANAGER_HEAP_MB}/g" "$FLINK_CONF_DIR/flink-conf.yaml"

    echo "config file: " && grep '^[^\n#]' "$FLINK_CONF_DIR/flink-conf.yaml"
    exec "$FLINK_HOME/bin/jobmanager.sh" start-foreground cluster

elif [ ${ACTION_CMD} == "taskmanager" ]; then
    TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}
    echo "Starting Task Manager"

    sed -i -e "s/taskmanager.heap.mb: 1024/taskmanager.heap.mb: ${TASK_MANAGER_HEAP_MB}/g" "$FLINK_CONF_DIR/flink-conf.yaml"
    sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g" "$FLINK_CONF_DIR/flink-conf.yaml"

    echo "config file: " && grep '^[^\n#]' "$FLINK_CONF_DIR/flink-conf.yaml"
    exec "$FLINK_HOME/bin/taskmanager.sh" start-foreground
elif [ ${ACTION_CMD} == "local" ]; then
    echo "Starting local cluster"
    exec "$FLINK_HOME/bin/jobmanager.sh" start-foreground local
fi

exec "$@"
複製代碼

entrypoint變量說明

鏡像的docker entrypoint腳本里面須要設置的環境變量設置說明:

環境變量名稱 參數 示例****內容 說明
JOB_MANGER_HOSTS StatefulSet.name-0,StatefulSet.name-1 flink-cluster-0,flink-cluster-1 JM的主機名,短主機名;能夠不用FQDN
FLINK_CLUSTER_IDENT namespace/StatefulSet.name default/flink-cluster 用來作zk ha設置和hdfs checkpiont的根目錄
TASK_MANAGER_NUMBER_OF_TASK_SLOTS containers.resources.cpu.limits 2 TM的slot數量,根據resources.cpu.limits來設置
FLINK_ZK_QUORUM env:FLINK_ZK_QUORUM ...:2181 HA ZK的地址
JOB_MANAGER_HEAP_MB env:JOB_MANAGER_HEAP_MBvalue:containers.resources.memory.limit -1024 4096 JM的Heap大小,因爲存在堆外內存,須要小於container.resources.memory.limits;不然容易OOM kill
TASK_MANAGER_HEAP_MB env:TASK_MANAGER_HEAP_MB value: containers.resources.memory.limit -1024 4096 TM的Heap大小,因爲存在Netty的堆外內存,須要小於container.resources.memory.limits;不然容易OOM kill

使用ConfigMap維護配置

對應 Flink 集羣所依賴的 HDFS 等其餘配置,則經過建立 configmap 來管理和維護。

kubectl create configmap hdfs-conf --from-file=hdfs-site.xml --from-file=core-site.xml

[hadoop@flink-jm-0 hadoop]$ ll /home/vipshop/conf/hadoop
total 0
lrwxrwxrwx. 1 root root 20 Apr  9 06:54 core-site.xml -> ..data/core-site.xml
lrwxrwxrwx. 1 root root 20 Apr  9 06:54 hdfs-site.xml -> ..data/hdfs-site.xml
複製代碼

4、後續計劃

當前實時系統,機器學習平臺要處理的數據分佈在各類數據存儲組件中,如Kafka、Redis、Tair和HDFS等,如何方便高效的訪問,處理,共享這些數據是一個很大的挑戰,對於當前的數據訪問和解析經常須要耗費不少的精力,主要的痛點包括:

  1. 對於Kafka,Redis,Tair中的 binary(PB/Avro等格式)數據,使用者沒法快速直接的瞭解數據的 schema 與數據內容,採集數據內容及與寫入者的溝通成本很高。

  2. 因爲缺乏獨立的統一數據系統服務,對Kafka,Redis,Tair等中的binary數據訪問須要依賴寫入者提供的信息,如proto生成類,數據格式wiki定義等,維護成本高,容易出錯。

  3. 缺少 relational schema 使得使用者沒法直接基於更高效易用的 SQL 或 LINQ 層 API 開發業務。

  4. 沒法經過一個獨立的服務方便的發佈和共享數據。

  5. 實時數據沒法直接提供給Batch SQL引擎使用。

  6. 此外,對於當前大部分的數據源的訪問也缺乏審計,權限管理,訪問監控,跟蹤等特性。

UDM(統一數據管理系統) 包括 Location Manager, Schema Metastore 以及 Client Proxy 等模塊,主要的功能包括:

  1. 提供從名字到地址的映射服務,使用者經過抽象名字而不是具體地址訪問數據。

  2. 用戶能夠方便的經過Web GUI界面方便的查看數據Schema,探查數據內容。

  3. 提供支持審計,監控,溯源等附加功能的Client API Proxy。

  4. 在Spark/Flink/Storm等框架中,以最適合使用的形式提供這些數據源的封裝。

UDM的總體架構以下圖所示。

UDM的使用者包括實時,機器學習以及離線平臺中數據的生產者和使用者。在使用Sql API或Table API的時候,首先完成Schema的註冊,以後使用Sql進行開發,下降了開發代碼量。

以Spark訪問Kafka PB數據的時序圖來講明UDM的內部流程

在Flink中,使用UDMExternalCatalog來打通Flink計算框架和UDM之間的橋樑,經過實現ExternalCatalog的各個接口,以及實現各自數據源的TableSourceFactory,完成Schema和接入管控等各項功能。同時加強Flink的SQL Client的各項功能,能夠經過調用API查詢UDM的Schema,完成SQL任務的生成和提交。

更多資訊請訪問 Apache Flink 中文社區網站

相關文章
相關標籤/搜索