Flink Doris Connector設計方案
該方案首先感謝社區Spark Doris Connector的做者html
從Doris角度看,將其數據引入Flink,可使用Flink一系列豐富的生態產品,拓寬了產品的想象力,也使得Doris和其餘數據源的聯合查詢成爲可能java
從咱們業務架構出發和業務需求,咱們選擇了Flink做爲咱們架構的一部分,用於數據的ETL及實時計算框架,社區目前支持Spark doris connector,所以咱們參照Spark doris connector 設計開發了Flink doris Connector。node
技術選型
一開始咱們選型的時候,也是和Spark Doris Connector 同樣,開始考慮的是JDBC的方式,可是這種方式就像Spark doris connector那篇文章中說的,有優勢,可是缺點更明顯。後來咱們閱讀及測試了Spark的代碼,決定站在巨人的肩上來實現(備註:直接拷貝代碼修改)。git
如下內容來自Spark Doris Connector博客的,直接拷貝了github
因而咱們開發了針對Doris的新的Datasource,Spark-Doris-Connector。這種方案下,Doris能夠暴露Doris數據分佈給Spark。Spark的Driver訪問Doris的FE獲取Doris表的Schema和底層數據分佈。以後,依據此數據分佈,合理分配數據查詢任務給Executors。最後,Spark的Executors分別訪問不一樣的BE進行查詢。大大提高了查詢的效率
使用方法
在Doris的代碼庫的 extension/flink-doris-connector/ 目錄下編譯生成doris-flink-1.0.0-SNAPSHOT.jar,將這個jar包加入flink的ClassPath中,便可使用Flink-on-Doris功能了sql
SQL方式
CREATE TABLE flink_doris_source ( name STRING, age INT, price DECIMAL(5,2), sale DOUBLE ) WITH ( 'connector' = 'doris', 'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT', 'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME', 'username' = '$YOUR_DORIS_USERNAME', 'password' = '$YOUR_DORIS_PASSWORD' ); CREATE TABLE flink_doris_sink ( name STRING, age INT, price DECIMAL(5,2), sale DOUBLE ) WITH ( 'connector' = 'doris', 'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT', 'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME', 'username' = '$YOUR_DORIS_USERNAME', 'password' = '$YOUR_DORIS_PASSWORD' ); INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
DataStream方式
DorisOptions.Builder options = DorisOptions.builder() .setFenodes("$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT") .setUsername("$YOUR_DORIS_USERNAME") .setPassword("$YOUR_DORIS_PASSWORD") .setTableIdentifier("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME"); env.addSource(new DorisSourceFunction<>(options.build(),new SimpleListDeserializationSchema())).print();
適用場景
1.使用Flink對Doris中的數據和其餘數據源進行聯合分析
不少業務部門會將本身的數據放在不一樣的存儲系統上,好比一些在線分析、報表的數據放在Doris中,一些結構化檢索數據放在Elasticsearch中、一些須要事物的數據放在MySQL中,等等。業務每每須要跨多個存儲源進行分析,經過Flink Doris Connector打通Flink和Doris後,業務能夠直接使用Flink,將Doris中的數據與多個外部數據源作聯合查詢計算。數據庫
2.實時數據接入
Flink Doris Connector以前:針對業務不規則數據,常常須要針對消息作規範處理,空值過濾等寫入新的topic,而後再啓動Routine load寫入Doris。apache
Flink Doris Connector以後:flink讀取kafka,直接寫入doris。json
技術實現
架構圖
Doris對外提供更多能力
Doris FE
對外開放了獲取內部表的元數據信息、單表查詢規劃和部分統計信息的接口。api
全部的Rest API接口都須要進行HttpBasic認證,用戶名和密碼是登陸數據庫的用戶名和密碼,須要注意權限的正確分配。
// 獲取表schema元信息 GET api/{database}/{table}/_schema // 獲取對單表的查詢規劃模版 POST api/{database}/{table}/_query_plan { "sql": "select k1, k2 from {database}.{table}" } // 獲取表大小 GET api/{database}/{table}/_count
Doris BE
經過Thrift協議,直接對外提供數據的過濾、掃描和裁剪能力。
service TDorisExternalService { // 初始化查詢執行器 TScanOpenResult open_scanner(1: TScanOpenParams params); // 流式batch獲取數據,Apache Arrow數據格式 TScanBatchResult get_next(1: TScanNextBatchParams params); // 結束掃描 TScanCloseResult close_scanner(1: TScanCloseParams params); }
Thrift相關結構體定義可參考:
https://github.com/apache/incubator-doris/blob/master/gensrc/thrift/DorisExternalService.thrift
實現DataStream
繼承 org.apache.flink.streaming.api.functions.source.RichSourceFunction ,自定義DorisSourceFunction,初始化時,獲取相關表的執行計劃,獲取對應的分區。
重寫run方法,循環從分區中讀取數據。
public void run(SourceContext sourceContext){ //循環讀取各分區 for(PartitionDefinition partitions : dorisPartitions){ scalaValueReader = new ScalaValueReader(partitions, settings); while (scalaValueReader.hasNext()){ Object next = scalaValueReader.next(); sourceContext.collect(next); } } }
實現Flink SQL on Doris
參考了Flink自定義Source&Sink 和 Flink-jdbc-connector,實現了下面的效果,能夠實現用Flink SQL直接操做Doris的表,包括讀和寫。
實現細節
1.實現DynamicTableSourceFactory , DynamicTableSinkFactory 註冊 doris connector
2.自定義DynamicTableSource和DynamicTableSink 生成邏輯計劃
3.DorisRowDataInputFormat和DorisDynamicOutputFormat獲取到邏輯計劃後開始執行。
實現中最主要的是基於RichInputFormat和RichOutputFormat 定製的DorisRowDataInputFormat和DorisDynamicOutputFormat。
在DorisRowDataInputFormat中,將獲取到的dorisPartitions 在createInputSplits中 切分紅多個分片,用於並行計算。
public DorisTableInputSplit[] createInputSplits(int minNumSplits) { List<DorisTableInputSplit> dorisSplits = new ArrayList<>(); int splitNum = 0; for (PartitionDefinition partition : dorisPartitions) { dorisSplits.add(new DorisTableInputSplit(splitNum++,partition)); } return dorisSplits.toArray(new DorisTableInputSplit[0]); } public RowData nextRecord(RowData reuse) { if (!hasNext) { //已經讀完數據,返回null return null; } List next = (List)scalaValueReader.next(); GenericRowData genericRowData = new GenericRowData(next.size()); for(int i =0;i<next.size();i++){ genericRowData.setField(i, next.get(i)); } //判斷是否還有數據 hasNext = scalaValueReader.hasNext(); return genericRowData; }
在DorisRowDataOutputFormat中,經過streamload的方式向doris中寫數據。streamload程序參考org.apache.doris.plugin.audit.DorisStreamLoader
public void writeRecord(RowData row) throws IOException { //streamload 默認分隔符 \t StringJoiner value = new StringJoiner("\t"); GenericRowData rowData = (GenericRowData) row; for(int i = 0; i < row.getArity(); ++i) { value.add(rowData.getField(i).toString()); } //streamload 寫數據 DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value.toString()); System.out.println(loadResponse); }
後續Flink Doris Connector計劃
1.doris sink 批量寫入
2.doris sink 支持json 數據寫入