做者:肖強(TalkingData 資深工程師)java
在 2017 年上半年之前,TalkingData 的 App Analytics 和 Game Analytics 兩個產品,流式框架使用的是自研的 td-etl-framework。該框架下降了開發流式任務的複雜度,對於不一樣的任務只須要實現一個 changer 鏈便可,而且支持水平擴展,性能尚可,曾經能夠知足業務需求。spring
可是到了 2016 年末和 2017 年上半年,發現這個框架存在如下重要侷限:apache
TalkingData 這兩款產品主要爲各種移動端 App 和遊戲提供數據分析服務,隨着近幾年業務量不斷擴大,須要選擇一個性能更強、功能更完善的流式引擎來逐步升級咱們的流式服務。調研從 2016 年末開始,主要是從 Flink、Heron、Spark streaming 中做選擇。api
最終,咱們選擇了 Flink,主要基於如下幾點考慮:網絡
咱們最開始是以 standalone cluster 的模式部署。從 2017 年上半年開始,咱們逐步把 Game Analytics 中一些小流量的 etl-job 遷移到 Flink,到 4 月份時,已經將產品接收各版本 SDK 數據的 etl-job 徹底遷移至 Flink,並整合成了一個 job。造成了以下的數據流和 stream graph:框架
圖1. Game Analytics-etl-adaptor 遷移至 Flink 後的數據流圖ide
圖2. Game Analytics-etl 的 stream graph性能
在上面的數據流圖中,flink-job 經過 Dubbo 來調用 etl-service,從而將訪問外部存儲的邏輯都抽象到了 etl-service 中,flink-job 則不需考慮複雜的訪存邏輯以及在 job 中自建 Cache,這樣既完成了服務的共用,又減輕了 job 自身的 GC 壓力。測試
此外咱們自構建了一個 monitor 服務,由於當時的 1.1.3 版本的 Flink 可提供的監控 metric 少,並且因爲其 Kafka-connector 使用的是 Kafka08 的低階 API,Kafka 的消費 offset 並無提交的 ZK 上,所以咱們須要構建一個 monitor 來監控 Flink 的 job 的活性、瞬時速度、消費淤積等 metric,並接入公司 owl 完成監控告警。優化
這時候,Flink 的 standalone cluster 已經承接了來自 Game Analytics 的全部流量,日均處理消息約 10 億條,總吞吐量達到 12 TB 每日。到了暑假的時候,日均日誌量上升到了 18 億條天天,吞吐量達到了約 20 TB 每日,TPS 峯值爲 3 萬。
在這個過程當中,咱們又遇到了 Flink 的 job 消費不均衡、在 standalone cluster 上 job 的 deploy 不均衡等問題,而形成線上消費淤積,以及集羣無端自動重啓而自動重啓後 job 沒法成功重啓。(咱們將在第三章中詳細介紹這些問題中的典型表現及當時的解決方案。)
通過一個暑假後,咱們認爲 Flink 經受了考驗,所以開始將 App Analytics 的 etl-job 也遷移到 Flink 上。造成了以下的數據流圖:
圖3. App Analytics-etl-adaptor 的標準 SDK 處理工做遷移到 Flink 後的數據流圖
圖4. App Analytics-etl-flink job 的 stream graph
2017 年 3 月開始有大量用戶開始遷移至統一的 JSON SDK,新版 SDK 的 Kafka topic 的峯值流量從年中的 8 K/s 上漲至了年末的 3 W/s。此時,整個 Flink standalone cluster 上一共部署了兩款產品的 4 個 job,日均吞吐量達到了 35 TB。
這時遇到了兩個很是嚴重的問題:
這些問題逼迫着咱們不得不將兩款產品的 job 拆分到兩個 standalone cluster 中,並對 Flink 作一次較大的版本升級,從 1.1.3(中間過分到 1.1.5)升級成 1.3.2。最終升級至 1.3.2 在 18 年的 Q1 完成,1.3.2 版本引入了增量式的 checkpoint 提交而且在性能和穩定性上比 1.1.x 版本作了巨大的改進。升級以後,Flink 集羣基本穩定,儘管還有消費不均勻等問題,可是基本能夠在業務量增長時經過擴容機器來解決。
由於 standalone cluster 的資源隔離作的並不優秀,並且還有 deploy job 不均衡等問題,加上社區上使用 Flink on yarn 已經很是成熟,所以咱們在 18 年的 Q4 就開始計劃將 Flink 的 standalone cluster 遷移至 Flink on yarn 上,而且 Flink 在最近的版本中對於 batch 的提高較多,咱們還規劃逐步使用 Flink 來逐步替換如今的批處理引擎。
圖5. Flink on yarn cluster 規劃
如圖 5,將來的 Flink on yarn cluster 將能夠完成流式計算和批處理計算,集羣的使用者能夠經過一個構建 service 來完成 stream/batch job 的構建、優化和提交,job 提交後,根據使用者所在的業務團隊及服務客戶的業務量分發到不一樣的 yarn 隊列中,此外,集羣須要一個完善的監控系統,採集用戶的提交記錄、各個隊列的流量及負載、各個 job 的運行時指標等等,並接入公司的 OWL。
從 19 年的 Q1 開始,咱們將 App Analytics 的部分 stream job 遷移到了 Flink on yarn 1.7 中,又在 19 年 Q2 前完成了 App Analytics 全部處理統一 JSON SDK 的流任務遷移。當前的 Flink on yarn 集羣的峯值處理的消息量達到 30 W/s,日均日誌吞吐量達約到 50 億條,約 60 TB。在 Flink 遷移到 on yarn 以後,由於版本的升級性能有所提高,且 job 之間的資源隔離確實優於 standalone cluster。遷移後咱們使用 Prometheus+Grafana 的監控方案,監控更方便和直觀。
咱們將在後續將 Game Analytics 的 Flink job 和日誌導出的 job 也遷移至該 on yarn 集羣,預計能夠節約 1/4 的機器資源。
在 Flink 實踐的過程當中,咱們一路上遇到了很多坑,咱們挑出其中幾個重點坑作簡要講解。
在咱們實現 Flink 的 operator 的 function 時,通常均可以繼承 AbstractRichFunction,其已提供生命週期方法 open()/close(),因此 operator 依賴的資源的初始化和釋放應該經過重寫這些方法執行。當咱們初始化一些資源,如 spring context、dubbo config 時,應該儘量使用單例對象持有這些資源且(在一個 TaskManager 中)只初始化 1 次,一樣的,咱們在 close 方法中應當(在一個 TaskManager 中)只釋放一次。
static 的變量應該慎重使用,不然很容易引發 job cancel 而相應的資源沒有釋放進而致使 job 重啓遇到問題。規避 static 變量來初始化可使用 org.apache.flink.configuration.Configuration(1.3)或者 org.apache.flink.api.java.utils.ParameterTool(1.7)來保存咱們的資源配置,而後經過 ExecutionEnvironment 來存放(Job提交時)和獲取這些配置(Job運行時)。
示例代碼:
Flink 1.3 設置及註冊配置:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Configuration parameters = new Configuration(); parameters.setString("zkConnects", zkConnects); parameters.setBoolean("debug", debug); env.getConfig().setGlobalJobParameters(parameters);
獲取配置(在 operator 的 open 方法中)。
@Override public void open(Configuration parameters) throws Exception { super.open(parameters); ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); Configuration globConf = (Configuration) globalParams; debug = globConf.getBoolean("debug", false); String zks = globConf.getString("zkConnects", ""); //.. do more .. }
Flink 1.7 設置及註冊配置:
ParameterTool parameters = ParameterTool.fromArgs(args); // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(parameters);
獲取配置:
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); parameters.getRequired("input"); // .. do more ..
如前文所述,當 Flink 的 job 的上下游 Task(的 subTask)分佈在不一樣的 TaskManager 節點上時(也就是上下游 operator 沒有 chained 在一塊兒,且相對應的 subTask 分佈在了不一樣的 TaskManager 節點上),就須要在 operator 的數據傳遞時申請和釋放 network buffer 並經過網絡 I/O 傳遞數據。
其過程簡述以下:上游的 operator 產生的結果會經過 RecordWriter 序列化,而後申請 BufferPool 中的 Buffer 並將序列化後的結果寫入 Buffer,此後 Buffer 會被加入 ResultPartition 的 ResultSubPartition 中。ResultSubPartition 中的 Buffer 會經過 Netty 傳輸至下一級的 operator 的 InputGate 的 InputChannel 中,一樣的,Buffer 進入 InputChannel 前一樣須要到下一級 operator 所在的 TaskManager 的 BufferPool 申請,RecordReader 讀取 Buffer 並將其中的數據反序列化。BufferPool 是有限的,在 BufferPool 爲空時 RecordWriter / RecordReader 所在的線程會在申請 Buffer 的過程當中等待一段時間,具體原理能夠參考:[1], [2]。
簡要截圖以下:
圖6. Flink 的網絡棧, 其中 RP 爲 ResultPartition、RS 爲 ResultSubPartition、IG 爲 InputGate、IC 爲 inputChannel
在使用 Flink 1.1.x 和 1.3.x 版本時,若是咱們的 network buffer 的數量配置的不充足且數據的吞吐量變大的時候,就會遇到以下現象:
圖7. 上游 operator 阻塞在獲取 network buffer 的 requestBuffer() 方法中
圖8. 下游的 operator 阻塞在等待新數據輸入
圖9. 下游的 operator 阻塞在等待新數據輸入
咱們的工做線程(RecordWriter 和 RecordReader 所在的線程)的大部分時間都花在了向 BufferPool 申請 Buffer 上,這時候 CPU 的使用率會劇烈的抖動,使得 Job 的消費速度降低,在 1.1.x 版本中甚至會阻塞很長的一段時間,觸發整個 job 的背壓,從而形成較嚴重的業務延遲。
這時候,咱們就須要經過上下游 operator 的並行度來計算 ResultPartition 和 InputGate 中所須要的 buffer 的個數,以配置充足的 taskmanager.network.numberOfBuffers。
圖10. 不一樣的 network buffer 對 CPU 使用率的影響
當配置了充足的 network buffer 數時,CPU 抖動能夠減小,Job 消費速度有所提升。
在 Flink 1.5 以後,在其 network stack 中引入了基於信用度的流量傳輸控制(credit-based flow control)機制[2],該機制大限度的避免了在向 BufferPool 申請 Buffer 的阻塞現象,咱們初步測試 1.7 的 network stack 的性能確實比 1.3 要高。
但這畢竟還不是最優的狀況,由於若是藉助 network buffer 來完成上下游的 operator 的數據傳遞不能夠避免的要通過序列化/反序列化的過程,並且信用度的信息傳遞有必定的延遲性和開銷,而這個過程能夠經過將上下游的 operator 鏈成一條 operator chain 而避免。
所以咱們在構建咱們流任務的執行圖時,應該儘量多的讓 operator 都 chain 在一塊兒,在 Kafka 資源容許的狀況下能夠擴大 Kafka 的 partition 而使得 source operator 和後繼的 operator 鏈在一塊兒,但也不能一味擴大 Kafka topic 的 partition,應根據業務量和機器資源作好取捨。更詳細的關於 operator 的 training 和 task slot 的調優能夠參考: [4]。
在上一節中咱們知道,Flink 的分佈在不一樣節點上的 Task 的數據傳輸必須通過序列化/反序列化,所以序列化/反序列化也是影響 Flink 性能的一個重要因素。Flink 自有一套類型體系,即 Flink 有本身的類型描述類(TypeInformation)。Flink 但願可以掌握儘量多的進出 operator 的數據類型信息,並使用 TypeInformation 來描述,這樣作主要有如下 2 個緣由:
整體上來講,Flink 推薦咱們在 operator 間傳遞的數據是 POJOs 類型,對於 POJOs 類型,Flink 默認會使用 Flink 自身的 PojoSerializer 進行序列化,而對於 Flink 沒法本身描述或推斷的數據類型,Flink 會將其識別爲 GenericType,並使用 Kryo 進行序列化。Flink 在處理 POJOs 時更高效,此外 POJOs 類型會使得 stream 的 grouping/joining/aggregating 等操做變得簡單,由於可使用如: dataSet.keyBy("username") 這樣的方式直接操做數據流中的數據字段。
除此以外,咱們還能夠作進一步的優化:
dataStream.flatMap(new MyOperator()).returns(MyClass.class)
returns 方法最終會調用 TypeExtractor.createTypeInfo(typeClass) ,用以構建咱們自定義的類型的 TypeInformation。createTypeInfo 方法在構建 TypeInformation 時,若是咱們的類型知足 POJOs 的規則或 Flink 中其餘的基本類型的規則,會盡量的將咱們的類型「翻譯」成 Flink 熟知的類型如 POJOs 類型或其餘基本類型,便於 Flink 自行使用更高效的序列化方式。
//org.apache.flink.api.java.typeutils.PojoTypeInfo @Override @PublicEvolving @SuppressWarnings("unchecked") public TypeSerializer<T> createSerializer(ExecutionConfig config) { if (config.isForceKryoEnabled()) { return new KryoSerializer<>(getTypeClass(), config); } if (config.isForceAvroEnabled()) { return AvroUtils.getAvroUtils().createAvroSerializer(getTypeClass()); } return createPojoSerializer(config); }
對於 Flink 沒法「翻譯」的類型,則返回 GenericTypeInfo,並使用 Kryo 序列化:
//org.apache.flink.api.java.typeutils.TypeExtractor @SuppressWarnings({ "unchecked", "rawtypes" }) private <OUT,IN1,IN2> TypeInformation<OUT> privateGetForClass(Class<OUT> clazz, ArrayList<Type> typeHierarchy, ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { checkNotNull(clazz); // 嘗試將 clazz轉換爲 PrimitiveArrayTypeInfo, BasicArrayTypeInfo, ObjectArrayTypeInfo // BasicTypeInfo, PojoTypeInfo 等,具體源碼已省略 //... //若是上述嘗試不成功 , 則return a generic type return new GenericTypeInfo<OUT>(clazz); }
在咱們的實踐中,最初爲了擴展性,在 operator 之間傳遞的數據爲 JsonNode,可是咱們發現性能達不到預期,所以將 JsonNode 改爲了符合 POJOs 規範的類型,在 1.1.x 的 Flink 版本上直接得到了超過 30% 的性能提高。在咱們調用了 Flink 的 Type Hint 和 env.getConfig().enableForceAvro() 後,性能獲得進一步提高。這些方法一直沿用到了 1.3.x 版本。
在升級至 1.7.x 時,若是使用 env.getConfig().enableForceAvro() 這個配置,咱們的代碼會引發校驗空字段的異常。所以咱們取消了這個配置,並嘗試使用 Kyro 進行序列化,而且註冊咱們的類型的全部子類到 Flink 的 ExecutionEnvironment 中,目前看性能尚可,並優於舊版本使用 Avro 的性能。可是最佳實踐還須要通過比較和壓測 KryoSerializerAvroUtils.getAvroUtils().createAvroSerializerPojoSerializer 才能總結出來,你們仍是應該根據本身的業務場景和數據類型來合理挑選適合本身的 serializer。
結合咱們以前的使用經驗,Flink 的 standalone cluster 在發佈具體的 job 時,會有必定的隨機性。舉個例子,若是當前集羣總共有 2 臺 8 核的機器用以部署 TaskManager,每臺機器上一個 TaskManager 實例,每一個 TaskManager 的 TaskSlot 爲 8,而咱們的 job 的並行度爲 12,那麼就有可能會出現下圖的現象:
第一個 TaskManager 的 slot 全被佔滿,而第二個 TaskManager 只使用了一半的資源!資源嚴重不平衡,隨着 job 處理的流量加大,必定會形成 TM1 上的 task 消費速度慢,而 TM2 上的 task 消費速度遠高於 TM1 的 task 的狀況。假設業務量的增加迫使咱們不得不擴大 job 的並行度爲 24,而且擴容2臺性能更高的機器(12核),在新的機器上,咱們分別部署 slot 數爲 12 的 TaskManager。通過擴容後,集羣的 TaskSlot 的佔用可能會造成下圖:
新擴容的配置高的機器並無去承擔更多的 Task,老機器的負擔仍然比較嚴重,資源本質上仍是不均勻!
除了 standalone cluster 模式下 job 的發佈策略形成不均衡的狀況外,還有資源隔離差的問題。由於咱們在一個 cluster 中每每會部署不止一個 job,而這些 job 在每臺機器上都共用 JVM,天然會形成資源的競爭。起初,咱們爲了解決這些問題,採用了以下的解決方法:
這些解決方法增長了實例數和集羣數,進而增長了維護成本。所以咱們決定要遷移到 on yarn 上,目前看 Flink on yarn 的資源分配和資源隔離確實比 standalone 模式要優秀一些。
Flink 在 2016 年時僅爲星星之火,而只用短短兩年的時間就成長爲了當前最爲煊赫一時的流處理平臺,並且大有統一批與流之勢。通過兩年的實踐,Flink 已經證實了它可以承接 TalkingData 的 App Analytics 和 Game Analytics 兩個產品的流處理需求。接下來咱們會將更復雜的業務和批處理遷移到 Flink 上,完成集羣部署和技術棧的統一,最終實現圖 5 中 Flink on yarn cluster 的規劃,以更少的成原本支撐更大的業務量。
本文爲雲棲社區原創內容,未經容許不得轉載。