Flink Doris Connector設計方案

Flink Doris Connector設計方案

該方案首先感謝社區Spark Doris Connector的做者html

從Doris角度看,將其數據引入Flink,可使用Flink一系列豐富的生態產品,拓寬了產品的想象力,也使得Doris和其餘數據源的聯合查詢成爲可能java

從咱們業務架構出發和業務需求,咱們選擇了Flink做爲咱們架構的一部分,用於數據的ETL及實時計算框架,社區目前支持Spark doris connector,所以咱們參照Spark doris connector 設計開發了Flink doris Connector。node

技術選型

一開始咱們選型的時候,也是和Spark Doris Connector 同樣,開始考慮的是JDBC的方式,可是這種方式就像Spark doris connector那篇文章中說的,有優勢,可是缺點更明顯。後來咱們閱讀及測試了Spark的代碼,決定站在巨人的肩上來實現(備註:直接拷貝代碼修改)。git

如下內容來自Spark Doris Connector博客的,直接拷貝了github

因而咱們開發了針對Doris的新的Datasource,Spark-Doris-Connector。這種方案下,Doris能夠暴露Doris數據分佈給Spark。Spark的Driver訪問Doris的FE獲取Doris表的Schema和底層數據分佈。以後,依據此數據分佈,合理分配數據查詢任務給Executors。最後,Spark的Executors分別訪問不一樣的BE進行查詢。大大提高了查詢的效率

使用方法

在Doris的代碼庫的 extension/flink-doris-connector/ 目錄下編譯生成doris-flink-1.0.0-SNAPSHOT.jar,將這個jar包加入flink的ClassPath中,便可使用Flink-on-Doris功能了sql

SQL方式

CREATE TABLE flink_doris_source (
    name STRING,
    age INT,
    price DECIMAL(5,2),
    sale DOUBLE
    ) 
    WITH (
      'connector' = 'doris',
      'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT',
      'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME',
      'username' = '$YOUR_DORIS_USERNAME',
      'password' = '$YOUR_DORIS_PASSWORD'
);

CREATE TABLE flink_doris_sink (
    name STRING,
    age INT,
    price DECIMAL(5,2),
    sale DOUBLE
    ) 
    WITH (
      'connector' = 'doris',
      'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT',
      'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME',
      'username' = '$YOUR_DORIS_USERNAME',
      'password' = '$YOUR_DORIS_PASSWORD'
);

INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source

DataStream方式

DorisOptions.Builder options = DorisOptions.builder()
                .setFenodes("$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
                .setUsername("$YOUR_DORIS_USERNAME")
                .setPassword("$YOUR_DORIS_PASSWORD")
                .setTableIdentifier("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME");
env.addSource(new DorisSourceFunction<>(options.build(),new SimpleListDeserializationSchema())).print();

適用場景

1616987965864

1.使用Flink對Doris中的數據和其餘數據源進行聯合分析

不少業務部門會將本身的數據放在不一樣的存儲系統上,好比一些在線分析、報表的數據放在Doris中,一些結構化檢索數據放在Elasticsearch中、一些須要事物的數據放在MySQL中,等等。業務每每須要跨多個存儲源進行分析,經過Flink Doris Connector打通Flink和Doris後,業務能夠直接使用Flink,將Doris中的數據與多個外部數據源作聯合查詢計算。數據庫

2.實時數據接入

Flink Doris Connector以前:針對業務不規則數據,常常須要針對消息作規範處理,空值過濾等寫入新的topic,而後再啓動Routine load寫入Doris。apache

1616988281677

Flink Doris Connector以後:flink讀取kafka,直接寫入doris。json

1616988514873

技術實現

架構圖

1616997396610

Doris對外提供更多能力

Doris FE

對外開放了獲取內部表的元數據信息、單表查詢規劃和部分統計信息的接口。api

全部的Rest API接口都須要進行HttpBasic認證,用戶名和密碼是登陸數據庫的用戶名和密碼,須要注意權限的正確分配。

// 獲取表schema元信息
GET api/{database}/{table}/_schema

// 獲取對單表的查詢規劃模版
POST api/{database}/{table}/_query_plan
{
"sql": "select k1, k2 from {database}.{table}"
}

// 獲取表大小
GET api/{database}/{table}/_count

Doris BE

經過Thrift協議,直接對外提供數據的過濾、掃描和裁剪能力。

service TDorisExternalService {
    // 初始化查詢執行器
TScanOpenResult open_scanner(1: TScanOpenParams params);

// 流式batch獲取數據,Apache Arrow數據格式
    TScanBatchResult get_next(1: TScanNextBatchParams params);

// 結束掃描
    TScanCloseResult close_scanner(1: TScanCloseParams params);
}

Thrift相關結構體定義可參考:

https://github.com/apache/incubator-doris/blob/master/gensrc/thrift/DorisExternalService.thrift

實現DataStream

繼承 org.apache.flink.streaming.api.functions.source.RichSourceFunction ,自定義DorisSourceFunction,初始化時,獲取相關表的執行計劃,獲取對應的分區。

重寫run方法,循環從分區中讀取數據。

public void run(SourceContext sourceContext){
       //循環讀取各分區
        for(PartitionDefinition partitions : dorisPartitions){
            scalaValueReader = new ScalaValueReader(partitions, settings);
            while (scalaValueReader.hasNext()){
                Object next = scalaValueReader.next();
                sourceContext.collect(next);
            }
        }
}

實現Flink SQL on Doris

參考了Flink自定義Source&Sink 和 Flink-jdbc-connector,實現了下面的效果,能夠實現用Flink SQL直接操做Doris的表,包括讀和寫。

實現細節

1.實現DynamicTableSourceFactory , DynamicTableSinkFactory 註冊 doris connector

2.自定義DynamicTableSource和DynamicTableSink 生成邏輯計劃

3.DorisRowDataInputFormat和DorisDynamicOutputFormat獲取到邏輯計劃後開始執行。

1616747472136

實現中最主要的是基於RichInputFormat和RichOutputFormat 定製的DorisRowDataInputFormat和DorisDynamicOutputFormat。

在DorisRowDataInputFormat中,將獲取到的dorisPartitions 在createInputSplits中 切分紅多個分片,用於並行計算。

public DorisTableInputSplit[] createInputSplits(int minNumSplits) {
		List<DorisTableInputSplit> dorisSplits = new ArrayList<>();
		int splitNum = 0;
		for (PartitionDefinition partition : dorisPartitions) {
			dorisSplits.add(new DorisTableInputSplit(splitNum++,partition));
		}
		return dorisSplits.toArray(new DorisTableInputSplit[0]);
}
 

public RowData nextRecord(RowData reuse)  {
		if (!hasNext) {
            //已經讀完數據,返回null
			return null;
		}
		List next = (List)scalaValueReader.next();
		GenericRowData genericRowData = new GenericRowData(next.size());
		for(int i =0;i<next.size();i++){
			genericRowData.setField(i, next.get(i));
		}
		//判斷是否還有數據
		hasNext = scalaValueReader.hasNext();
		return genericRowData;
}

在DorisRowDataOutputFormat中,經過streamload的方式向doris中寫數據。streamload程序參考org.apache.doris.plugin.audit.DorisStreamLoader

public  void writeRecord(RowData row) throws IOException {
       //streamload 默認分隔符 \t
        StringJoiner value = new StringJoiner("\t");
        GenericRowData rowData = (GenericRowData) row;
        for(int i = 0; i < row.getArity(); ++i) {
            value.add(rowData.getField(i).toString());
        }
        //streamload 寫數據
        DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value.toString());
        System.out.println(loadResponse);
}

後續Flink Doris Connector計劃

1.doris sink 批量寫入

2.doris sink 支持json 數據寫入

相關文章
相關標籤/搜索