在2017年上半年之前,TalkingData的App Analytics和Game Analytics兩個產品,流式框架使用的是自研的td-etl-framework。該框架下降了開發流式任務的複雜度,對於不一樣的任務只須要實現一個changer鏈便可,而且支持水平擴展,性能尚可,曾經能夠知足業務需求。html
可是到了2016年末和2017年上半年,發現這個框架存在如下重要侷限:java
TalkingData這兩款產品主要爲各種移動端App和遊戲提供數據分析服務,隨着近幾年業務量不斷擴大,須要選擇一個性能更強、功能更完善的流式引擎來逐步升級咱們的流式服務。調研從2016年末開始,主要是從Flink、Heron、Spark streaming中做選擇。spring
最終,咱們選擇了Flink,主要基於如下幾點考慮:apache
咱們最開始是以standalone cluster的模式部署。從2017年上半年開始,咱們逐步把Game Analytics中一些小流量的etl-job遷移到Flink,到4月份時,已經將產品接收各版本SDK數據的etl-job徹底遷移至Flink,並整合成了一個job。造成了以下的數據流和stream graph:api
圖1. Game Analytics-etl-adaptor遷移至Flink後的數據流圖網絡
圖2. Game Analytics-etl的stream graph框架
在上面的數據流圖中,flink-job經過Dubbo來調用etl-service,從而將訪問外部存儲的邏輯都抽象到了etl-service中,flink-job則不需考慮複雜的訪存邏輯以及在job中自建Cache,這樣既完成了服務的共用,又減輕了job自身的GC壓力。分佈式
此外咱們自構建了一個monitor服務,由於當時的1.1.3版本的Flink可提供的監控metric少,並且因爲其Kafka-connector使用的是Kafka08的低階API,Kafka的消費offset並無提交的ZK上,所以咱們須要構建一個monitor來監控Flink的job的活性、瞬時速度、消費淤積等metric,並接入公司owl完成監控告警。ide
這時候,Flink的standalone cluster已經承接了來自Game Analytics的全部流量,日均處理消息約10億條,總吞吐量達到12TB每日。到了暑假的時候,日均日誌量上升到了18億條天天,吞吐量達到了約20TB每日,TPS峯值爲3萬。性能
在這個過程當中,咱們又遇到了Flink的job消費不均衡、在standalone cluster上job的deploy不均衡等問題,而形成線上消費淤積,以及集羣無端自動重啓而自動重啓後job沒法成功重啓。(咱們將在第三章中詳細介紹這些問題中的典型表現及當時的解決方案。)
通過一個暑假後,咱們認爲Flink經受了考驗,所以開始將App Analytics的etl-job也遷移到Flink上。造成了以下的數據流圖:
圖3. App Analytics-etl-adaptor的標準SDK處理工做遷移到Flink後的數據流圖
圖4. App Analytics-etl-flink job的stream graph
2017年3月開始有大量用戶開始遷移至統一的JSON SDK,新版SDK的Kafka topic的峯值流量從年中的8K/s 上漲至了年末的 3W/s。此時,整個Flink standalone cluster上一共部署了兩款產品的4個job,日均吞吐量達到了35TB。
這時遇到了兩個很是嚴重的問題:
1) 同一個standalone cluster中的job相互搶佔資源,而standalone cluster的模式僅僅只能經過task slot在task manager的堆內內存上作到資源隔離。同時因爲前文提到過的Flink在standalone cluster中deploy job的方式原本就會形成資源分配不均衡,從而會致使App Analytics線流量大時而引發Game Analytics線淤積的問題;
2) 咱們的source operator的並行度等同於所消費Kafka topic的partition數量,而中間作etl的operator的並行度每每會遠大於Kafka的partition數量。所以最後的job graph不可能徹底被鏈成一條operator chain,operator之間的數據傳輸必須經過Flink的network buffer的申請和釋放,而1.1.x 版本的network buffer在數據量大的時候很容易在其申請和釋放時形成死鎖,而致使Flink明明有許多消息要處理,可是大部分線程處於waiting的狀態致使業務的大量延遲。
這些問題逼迫着咱們不得不將兩款產品的job拆分到兩個standalone cluster中,並對Flink作一次較大的版本升級,從1.1.3(中間過分到1.1.5)升級成1.3.2。最終升級至1.3.2在18年的Q1完成,1.3.2版本引入了增量式的checkpoint提交而且在性能和穩定性上比1.1.x版本作了巨大的改進。升級以後,Flink集羣基本穩定,儘管還有消費不均勻等問題,可是基本能夠在業務量增長時經過擴容機器來解決。
由於standalone cluster的資源隔離作的並不優秀,並且還有deploy job不均衡等問題,加上社區上使用Flink on yarn已經很是成熟,所以咱們在18年的Q4就開始計劃將Flink的standalone cluster遷移至Flink on yarn上,而且Flink在最近的版本中對於batch的提高較多,咱們還規劃逐步使用Flink來逐步替換如今的批處理引擎。
圖5. Flink on yarn cluster規劃
如圖5,將來的Flink on yarn cluster將能夠完成流式計算和批處理計算,集羣的使用者能夠經過一個構建service來完成stream/batch job的構建、優化和提交,job提交後,根據使用者所在的業務團隊及服務客戶的業務量分發到不一樣的yarn隊列中,此外,集羣須要一個完善的監控系統,採集用戶的提交記錄、各個隊列的流量及負載、各個job的運行時指標等等,並接入公司的OWL。
從19年的Q1開始,咱們將App Analytics的部分stream job遷移到了Flink on yarn 1.7中,又在19年Q2前完成了App Analytics全部處理統一JSON SDK的流任務遷移。當前的Flink on yarn集羣的峯值處理的消息量達到30W/s,日均日誌吞吐量達約到50億條,約60TB。在Flink遷移到on yarn以後,由於版本的升級性能有所提高,且job之間的資源隔離確實優於standalone cluster。遷移後咱們使用Prometheus+Grafana的監控方案,監控更方便和直觀。
咱們將在後續將Game Analytics的Flink job和日誌導出的job也遷移至該on yarn集羣,預計能夠節約1/4的機器資源。
在Flink實踐的過程當中,咱們一路上遇到了很多坑,咱們挑出其中幾個重點坑作簡要講解。
在咱們實現Flink的operator的function時,通常均可以繼承AbstractRichFunction,其已提供生命週期方法open()/close(),因此operator依賴的資源的初始化和釋放應該經過重寫這些方法執行。當咱們初始化一些資源,如spring context、dubbo config時,應該儘量使用單例對象持有這些資源且(在一個TaskManager中)只初始化1次,一樣的,咱們在close方法中應當(在一個TaskManager中)只釋放一次。
static的變量應該慎重使用,不然很容易引發job cancel而相應的資源沒有釋放進而致使job重啓遇到問題。規避static變量來初始化可使用org.apache.flink.configuration.Configuration(1.3)或者org.apache.flink.api.java.utils.ParameterTool(1.7)來保存咱們的資源配置,而後經過ExecutionEnvironment來存放(Job提交時)和獲取這些配置(Job運行時)。
示例代碼:
Flink 1.3
設置及註冊配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration parameters = new Configuration();
parameters.setString("zkConnects", zkConnects);
parameters.setBoolean("debug", debug);
env.getConfig().setGlobalJobParameters(parameters);
複製代碼
獲取配置(在operator的open方法中)
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
Configuration globConf = (Configuration) globalParams;
debug = globConf.getBoolean("debug", false);
String zks = globConf.getString("zkConnects", "");
//.. do more ..
}
複製代碼
Flink 1.7
設置及註冊配置
ParameterTool parameters = ParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
複製代碼
獲取配置
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
ParameterTool parameters = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
parameters.getRequired("input");
// .. do more ..
複製代碼
如前文所述,當Flink的job 的上下游Task(的subTask)分佈在不一樣的TaskManager節點上時(也就是上下游operator沒有chained在一塊兒,且相對應的subTask分佈在了不一樣的TaskManager節點上),就須要在operator的數據傳遞時申請和釋放network buffer並經過網絡I/O傳遞數據。
其過程簡述以下:上游的operator產生的結果會經過RecordWriter序列化,而後申請BufferPool中的Buffer並將序列化後的結果寫入Buffer,此後Buffer會被加入ResultPartition的ResultSubPartition中。ResultSubPartition中的Buffer會經過Netty傳輸至下一級的operator的InputGate的InputChannel中,一樣的,Buffer進入InputChannel前一樣須要到下一級operator所在的TaskManager的BufferPool申請,RecordReader讀取Buffer並將其中的數據反序列化。BufferPool是有限的,在BufferPool爲空時RecordWriter/RecordReader所在的線程會在申請Buffer的過程當中wait一段時間,具體原理能夠參考:[1], [2]。
簡要截圖以下:
圖6. Flink的網絡棧, 其中RP爲ResultPartition、RS爲ResultSubPartition、IG爲InputGate、IC爲inputChannel。
在使用Flink 1.1.x和1.3.x版本時,若是咱們的network buffer的數量配置的不充足且數據的吞吐量變大的時候,就會遇到以下現象:
圖7. 上游operator阻塞在獲取network buffer的requestBuffer()方法中
圖8. 下游的operator阻塞在等待新數據輸入
圖9. 下游的operator阻塞在等待新數據輸入
咱們的工做線程(RecordWriter和RecordReader所在的線程)的大部分時間都花在了向BufferPool申請Buffer上,這時候CPU的使用率會劇烈的抖動,使得Job的消費速度降低,在1.1.x版本中甚至會阻塞很長的一段時間,觸發整個job的背壓,從而形成較嚴重的業務延遲。
這時候,咱們就須要經過上下游operator的並行度來計算ResultPartition和InputGate中所須要的buffer的個數,以配置充足的taskmanager.network.numberOfBuffers。
圖10. 不一樣的network buffer對CPU使用率的影響
當配置了充足的network buffer數時,CPU抖動能夠減小,Job消費速度有所提升。
在Flink 1.5以後,在其network stack中引入了基於信用度的流量傳輸控制(credit-based flow control)機制[2],該機制大限度的避免了在向BufferPool申請Buffer的阻塞現象,咱們初步測試1.7的network stack的性能確實比1.3要高。
但這畢竟還不是最優的狀況,由於若是藉助network buffer來完成上下游的operator的數據傳遞不能夠避免的要通過序列化/反序列化的過程,並且信用度的信息傳遞有必定的延遲性和開銷,而這個過程能夠經過將上下游的operator鏈成一條operator chain而避免。
所以咱們在構建咱們流任務的執行圖時,應該儘量多的讓operator都chain在一塊兒,在Kafka資源容許的狀況下能夠擴大Kafka的partition而使得source operator和後繼的operator 鏈在一塊兒,但也不能一味擴大Kafka topic的partition,應根據業務量和機器資源作好取捨。更詳細的關於operator的training和task slot的調優能夠參考: [4]。
在上一節中咱們知道,Flink的分佈在不一樣節點上的Task的數據傳輸必須通過序列化/反序列化,所以序列化/反序列化也是影響Flink性能的一個重要因素。Flink自有一套類型體系,即Flink有本身的類型描述類(TypeInformation)。Flink但願可以掌握儘量多的進出operator的數據類型信息,並使用TypeInformation來描述,這樣作主要有如下2個緣由:
整體上來講,Flink推薦咱們在operator間傳遞的數據是POJOs類型,對於POJOs類型,Flink默認會使用Flink自身的PojoSerializer進行序列化,而對於Flink沒法本身描述或推斷的數據類型,Flink會將其識別爲GenericType,並使用Kryo進行序列化。Flink在處理POJOs時更高效,此外POJOs類型會使得stream的grouping/joining/aggregating等操做變得簡單,由於可使用如:dataSet.keyBy("username") 這樣的方式直接操做數據流中的數據字段。
除此以外,咱們還能夠作進一步的優化:
1) 顯示調用returns方法,從而觸發Flink的Type Hint:
dataStream.flatMap(new MyOperator()).returns(MyClass.class)
returns方法最終會調用TypeExtractor.createTypeInfo(typeClass) ,用以構建咱們自定義的類型的TypeInformation。createTypeInfo方法在構建TypeInformation時,若是咱們的類型知足POJOs的規則或Flink中其餘的基本類型的規則,會盡量的將咱們的類型「翻譯」成Flink熟知的類型如POJOs類型或其餘基本類型,便於Flink自行使用更高效的序列化方式。
//org.apache.flink.api.java.typeutils.PojoTypeInfo
@Override
@PublicEvolving
@SuppressWarnings("unchecked")
public TypeSerializer<T> createSerializer(ExecutionConfig config) {
if (config.isForceKryoEnabled()) {
return new KryoSerializer<>(getTypeClass(), config);
}
if (config.isForceAvroEnabled()) {
return AvroUtils.getAvroUtils().createAvroSerializer(getTypeClass());
}
return createPojoSerializer(config);
}
複製代碼
對於Flink沒法「翻譯」的類型,則返回GenericTypeInfo,並使用Kryo序列化:
//org.apache.flink.api.java.typeutils.TypeExtractor
@SuppressWarnings({ "unchecked", "rawtypes" })
private <OUT,IN1,IN2> TypeInformation<OUT> privateGetForClass(Class<OUT> clazz, ArrayList<Type> typeHierarchy, ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
checkNotNull(clazz);
// 嘗試將 clazz轉換爲 PrimitiveArrayTypeInfo, BasicArrayTypeInfo, ObjectArrayTypeInfo
// BasicTypeInfo, PojoTypeInfo 等,具體源碼已省略
//...
//若是上述嘗試不成功 , 則return a generic type
return new GenericTypeInfo<OUT>(clazz);
}
複製代碼
2) 註冊subtypes: 經過StreamExecutionEnvironment或ExecutionEnvironment的實例的registerType(clazz)方法註冊咱們的數據類及其子類、其字段的類型。若是Flink對類型知道的越多,性能會更好;
3) 若是還想作進一步的優化,Flink還容許用戶註冊本身定製的序列化器,手動建立本身類型的TypeInformation,具體能夠參考Flink官網:[3];
在咱們的實踐中,最初爲了擴展性,在operator之間傳遞的數據爲JsonNode,可是咱們發現性能達不到預期,所以將JsonNode改爲了符合POJOs規範的類型,在1.1.x的Flink版本上直接得到了超過30%的性能提高。在咱們調用了Flink的Type Hint和env.getConfig().enableForceAvro()後,性能獲得進一步提高。這些方法一直沿用到了1.3.x版本。
在升級至1.7.x時,若是使用env.getConfig().enableForceAvro()這個配置,咱們的代碼會引發校驗空字段的異常。所以咱們取消了這個配置,並嘗試使用Kyro進行序列化,而且註冊咱們的類型的全部子類到Flink的ExecutionEnvironment中,目前看性能尚可,並優於舊版本使用Avro的性能。可是最佳實踐還須要通過比較和壓測KryoSerializerAvroUtils.getAvroUtils().createAvroSerializerPojoSerializer才能總結出來,你們仍是應該根據本身的業務場景和數據類型來合理挑選適合本身的serializer。
結合咱們以前的使用經驗,Flink的standalone cluster在發佈具體的job時,會有必定的隨機性。舉個例子,若是當前集羣總共有2臺8核的機器用以部署TaskManager,每臺機器上一個TaskManager實例,每一個TaskManager的TaskSlot爲8,而咱們的job的並行度爲12,那麼就有可能會出現下圖的現象:
第一個TaskManager的slot全被佔滿,而第二個TaskManager只使用了一半的資源!資源嚴重不平衡,隨着job處理的流量加大,必定會形成TM1上的task消費速度慢,而TM2上的task消費速度遠高於TM1的task的狀況。假設業務量的增加迫使咱們不得不擴大job的並行度爲24,而且擴容2臺性能更高的機器(12核),在新的機器上,咱們分別部署slot數爲12的TaskManager。通過擴容後,集羣的TaskSlot的佔用可能會造成下圖:
新擴容的配置高的機器並無去承擔更多的Task,老機器的負擔仍然比較嚴重,資源本質上仍是不均勻!
除了standalone cluster模式下job的發佈策略形成不均衡的狀況外,還有資源隔離差的問題。由於咱們在一個cluster中每每會部署不止一個job,而這些job在每臺機器上都共用JVM,天然會形成資源的競爭。起初,咱們爲了解決這些問題,採用了以下的解決方法:
這些解決方法增長了實例數和集羣數,進而增長了維護成本。所以咱們決定要遷移到on yarn上,目前看Flink on yarn的資源分配和資源隔離確實比standalone模式要優秀一些。
Flink在2016年時僅爲星星之火,而只用短短兩年的時間就成長爲了當前最爲煊赫一時的流處理平臺,並且大有統一批與流之勢。通過兩年的實踐,Flink已經證實了它可以承接TalkingData的App Analytics和Game Analytics兩個產品的流處理需求。接下來咱們會將更復雜的業務和批處理遷移到Flink上,完成集羣部署和技術棧的統一,最終實現圖5 中Flink on yarn cluster 的規劃,以更少的成原本支撐更大的業務量。
參考資料:
[1] cwiki.apache.org/confluence/…
[2] flink.apache.org/2019/06/05/…
[3] ci.apache.org/projects/fl…
[4] mp.weixin.qq.com/s/XROoLEu38…
做者簡介:肖強:TalkingData資深工程師,TalkingData統計分析產品App Analytics和Game Analytics技術負責人。碩士畢業於北京航空航天大學,主要從事大數據平臺開發,對流式計算和分佈式存儲有必定研究。