摘要:本文所介紹 Nebula Graph 鏈接器 Nebula Flink Connector,採用相似 Flink 提供的 Flink Connector 形式,支持 Flink 讀寫分佈式圖數據庫 Nebula Graph。html
文章首發 Nebula Graph 官網博客:https://nebula-graph.com.cn/posts/nebula-flink-connector/java
在關係網絡分析、關係建模、實時推薦等場景中應用圖數據庫做爲後臺數據支撐已相對普及,且部分應用場景對圖數據的實時性要求較高,如推薦系統、搜索引擎。爲了提高數據的實時性,業界普遍應用流式計算對更新的數據進行增量實時處理。爲了支持對圖數據的流式計算,Nebula Graph 團隊開發了 Nebula Flink Connector,支持利用 Flink 進行 Nebula Graph 圖數據的流式處理和計算。git
Flink 是新一代流批統一的計算引擎,它從不一樣的第三方存儲引擎中讀取數據,並進行處理,再寫入另外的存儲引擎中。Flink Connector 的做用就至關於一個鏈接器,鏈接 Flink 計算引擎跟外界存儲系統。github
與外界進行數據交換時,Flink 支持如下 4 種方式:數據庫
- Flink 源碼內部預約義 Source 和 Sink 的 API;
- Flink 內部提供了 Bundled Connectors,如 JDBC Connector。
- Apache Bahir 項目中提供鏈接器 Apache Bahir 最初是從 Apache Spark 中獨立出來的項目,以提供不限於 Spark 相關的擴展/插件、鏈接器和其餘可插入組件的實現。
- 經過異步 I/O 方式。
流計算中常常須要與外部存儲系統交互,好比須要關聯 MySQL 中的某個表。通常來講,若是用同步 I/O 的方式,會形成系統中出現大的等待時間,影響吞吐和延遲。異步 I/O 則能夠併發處理多個請求,提升吞吐,減小延遲。apache
本文所介紹 Nebula Graph 鏈接器 Nebula Flink Connector,採用相似 Flink 提供的 Flink Connector 形式,支持 Flink 讀寫分佈式圖數據庫 Nebula Graph。緩存
1、Connector Source
Flink 做爲一款流式計算框架,它可處理有界數據,也可處理無界數據。所謂無界,即源源不斷的數據,不會有終止,實時流處理所處理的數據即是無界數據;批處理的數據,即有界數據。而 Source 即是 Flink 處理數據的數據來源。安全
Nebula Flink Connector 中的 Source 即是圖數據庫 Nebula Graph。Flink 提供了豐富的 Connector 組件容許用戶自定義數據源來鏈接外部數據存儲系統。微信
1.1 Source 簡介
Flink 的 Source 主要負責外部數據源的接入,Flink 的 Source 能力主要是經過 read 相關的 API 和 addSource 方法這 2 種方式來實現數據源的讀取,使用 addSource 方法對接外部數據源時,可使用 Flink Bundled Connector,也能夠自定義 Source。網絡
Flink Source 的幾種使用方式以下:
本章主要介紹如何經過自定義 Source 方式實現 Nebula Graph Source。
1.2 自定義 Source
在 Flink 中可使用 StreamExecutionEnvironment.addSource(sourceFunction)
和 ExecutionEnvironment.createInput(inputFormat)
兩種方式來爲你的程序添加數據來源。
Flink 已經提供多個內置的 source functions
,開發者能夠經過繼承 RichSourceFunction
來自定義非並行的 source
,經過繼承 RichParallelSourceFunction
來自定義並行的 Source
。RichSourceFunction
和 RichParallelSourceFunction
是 SourceFunction
和 RichFunction
特性的結合。 其中SourceFunction
負責數據的生成, RichFunction
負責資源的管理。固然,也能夠只實現 SourceFunction
接口來定義最簡單的只具有獲取數據功能的 dataSource
。
一般自定義一個完善的 Source 節點是經過實現 RichSourceFunction
類來完成的,該類兼具 RichFunction
和 SourceFunction
的能力,所以自定義 Flink 的 Nebula Graph Source 功能咱們須要實現 RichSourceFunction
中提供的方法。
1.3 自定義 Nebula Graph Source 實現原理
Nebula Flink Connector 中實現的自定義 Nebula Graph Source 數據源提供了兩種使用方式,分別是 addSource 和 createInput 方式。
Nebula Graph Source 實現類圖以下:
(1)addSource
該方式是經過 NebulaSourceFunction 類實現的,該類繼承自 RichSourceFunction 並實現瞭如下方法:
- open 準備 Nebula Graph 鏈接信息,並獲取 Nebula Graph Meta 服務和 Storage 服務的鏈接。
- close 數據讀取完成,釋放資源。關閉 Nebula Graph 服務的鏈接。
- run 開始讀取數據,並將數據填充到 sourceContext。
- cancel 取消 Flink 做業時調用,關閉資源。
(2)createInput
該方式是經過 NebulaInputFormat 類實現的,該類繼承自 RichInputFormat 並實現瞭如下方法:
- openInputFormat 準備 inputFormat,獲取鏈接。
- closeInputFormat 數據讀取完成,釋放資源,關閉 Nebula Graph 服務的鏈接。
- getStatistics 獲取數據源的基本統計信息。
- createInputSplits 基於配置的 partition 參數建立 GenericInputSplit。
- getInputSplitAssigner 返回輸入的 split 分配器,按原始計算的順序返回 Source 的全部 split。
- open 開始 inputFormat 的數據讀取,將讀取的數據轉換 Flink 的數據格式,構造迭代器。
- close 數據讀取完成,打印讀取日誌。
- reachedEnd 是否讀取完成
- nextRecord 經過迭代器獲取下一條數據
經過 addSource 讀取 Source 數據獲得的是 Flink 的 DataStreamSource,表示 DataStream 的起點。
經過 createInput 讀取數據獲得的是 Flink 的 DataSource,DataSource 是一個建立新數據集的 Operator,這個 Operator 可做爲進一步轉換的數據集。DataSource 能夠經過 withParameters 封裝配置參數進行其餘的操做。
1.4 自定義 Nebula Graph Source 應用實踐
使用 Flink 讀取 Nebula Graph 圖數據時,須要構造 NebulaSourceFunction 和 NebulaOutputFormat,並經過 Flink 的 addSource 或 createInput 方法註冊數據源進行 Nebula Graph 數據讀取。
構造 NebulaSourceFunction 和 NebulaOutputFormat 時須要進行客戶端參數的配置和執行參數的配置,說明以下:
配置項說明:
- NebulaClientOptions
- 配置 address,NebulaSource 須要配置 Nebula Graph Metad 服務的地址。
- 配置 username
- 配置 password
- VertexExecutionOptions
- 配置 GraphSpace
- 配置要讀取的 tag
- 配置要讀取的字段集
- 配置是否讀取全部字段,默認爲 false, 若配置爲 true 則字段集配置無效
- 配置每次讀取的數據量 limit,默認 2000
- EdgeExecutionOptions
- 配置 GraphSpace
- 配置要讀取的 edge
- 配置要讀取的字段集
- 配置是否讀取全部字段,默認爲 false, 若配置爲 true 則字段集配置無效
- 配置每次讀取的數據量 limit,默認 2000
// 構造 Nebula Graph 客戶端鏈接須要的參數 NebulaClientOptions nebulaClientOptions = new NebulaClientOptions .NebulaClientOptionsBuilder() .setAddress("127.0.0.1:45500") .build(); // 建立 connectionProvider NebulaConnectionProvider metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions); // 構造 Nebula Graph 數據讀取須要的參數 List<String> cols = Arrays.asList("name", "age"); VertexExecutionOptions sourceExecutionOptions = new VertexExecutionOptions.ExecutionOptionBuilder() .setGraphSpace("flinkSource") .setTag(tag) .setFields(cols) .setLimit(100) .builder(); // 構造 NebulaInputFormat NebulaInputFormat inputFormat = new NebulaInputFormat(metaConnectionProvider) .setExecutionOptions(sourceExecutionOptions); // 方式 1 使用 createInput 方式註冊 Nebula Graph 數據源 DataSource<Row> dataSource1 = ExecutionEnvironment.getExecutionEnvironment() .createInput(inputFormat); // 方式 2 使用 addSource 方式註冊 Nebula Graph 數據源 NebulaSourceFunction sourceFunction = new NebulaSourceFunction(metaConnectionProvider) .setExecutionOptions(sourceExecutionOptions); DataStreamSource<Row> dataSource2 = StreamExecutionEnvironment.getExecutionEnvironment() .addSource(sourceFunction);
Nebula Source Demo 編寫完成後能夠打包提交到 Flink 集羣執行。
示例程序讀取 Nebula Graph 的點數據並打印,該做業以 Nebula Graph 做爲 Source,以 print 做爲 Sink,執行結果以下:
Source sent 數據爲 59,671,064 條,Sink received 數據爲 59,671,064 條。
2、Connector Sink
Nebula Flink Connector 中的 Sink 即 Nebula Graph 圖數據庫。Flink 提供了豐富的 Connector 組件容許用戶自定義數據池來接收 Flink 所處理的數據流。
2.1 Sink 簡介
Sink 是 Flink 處理完 Source 後數據的輸出,主要負責實時計算結果的輸出和持久化。好比:將數據流寫入標準輸出、寫入文件、寫入 Sockets、寫入外部系統等。
Flink 的 Sink 能力主要是經過調用數據流的 write 相關 API 和 DataStream.addSink 兩種方式來實現數據流的外部存儲。
相似於 Flink Connector 的 Source,Sink 也容許用戶自定義來支持豐富的外部數據系統做爲 Flink 的數據池。
Flink Sink 的使用方式以下:
本章主要介紹如何經過自定義 Sink 的方式實現 Nebula Graph Sink。
2.2 自定義 Sink
在 Flink 中可使用 DataStream.addSink
和 DataStream.writeUsingOutputFormat
的方式將 Flink 數據流寫入外部自定義數據池。
Flink 已經提供了若干實現好了的 Sink Functions
,也能夠經過實現 SinkFunction
以及繼承 RichOutputFormat
來實現自定義的 Sink。
2.3 自定義 Nebula Graph Sink 實現原理
Nebula Flink Connector 中實現了自定義的 NebulaSinkFunction,開發者經過調用 DataSource.addSink 方法並將 NebulaSinkFunction 對象做爲參數傳入便可實現將 Flink 數據流寫入 Nebula Graph。
Nebula Flink Connector 使用的是 Flink 的 1.11-SNAPSHOT 版本,該版本中已經廢棄了使用 writeUsingOutputFormat 方法來定義輸出端的接口。
源碼以下,因此請注意在使用自定義 Nebula Graph Sink 時請採用 DataStream.addSink 的方式。
/** @deprecated */ @Deprecated @PublicEvolving public DataStreamSink<T> writeUsingOutputFormat(OutputFormat<T> format) { return this.addSink(new OutputFormatSinkFunction(format)); }
Nebula Graph Sink 實現類圖以下:
其中最重要的兩個類是 NebulaSinkFunction 和 NebulaBatchOutputFormat。
NebulaSinkFunction 繼承自 AbstractRichFunction 並實現瞭如下方法:
- open 調用 NebulaBatchOutputFormat 的 open 方法,進行資源準備。
- close 調用 NebulaBatchOutputFormat 的 close 方法,進行資源釋放。
- invoke 是 Sink 中的核心方法, 調用 NebulaBatchOutputFormat 中的 write 方法進行數據寫入。
- flush 調用 NebulaBatchOutputFormat 的 flush 方法進行數據的提交。
NebulaBatchOutputFormat 繼承自 AbstractNebulaOutPutFormat,AbstractNebulaOutPutFormat 繼承自 RichOutputFormat,主要實現的方法有:
- open 準備圖數據庫 Nebula Graph 的 Graphd 服務的鏈接,並初始化數據寫入執行器 nebulaBatchExecutor
- close 提交最後批次數據,等待最後提交的回調結果並關閉服務鏈接等資源。
- writeRecord 核心方法,將數據寫入 nebulaBufferedRow 中,並在達到配置的批量寫入 Nebula Graph 上限時提交寫入。Nebula Graph Sink 的寫入操做是異步的,因此須要執行回調來獲取執行結果。
- flush 當 bufferRow 存在數據時,將數據提交到 Nebula Graph 中。
在 AbstractNebulaOutputFormat 中調用了 NebulaBatchExecutor 進行數據的批量管理和批量提交,並經過定義回調函數接收批量提交的結果,代碼以下:
/** * write one record to buffer */ @Override public final synchronized void writeRecord(T row) throws IOException { nebulaBatchExecutor.addToBatch(row); if (numPendingRow.incrementAndGet() >= executionOptions.getBatch()) { commit(); } } /** * put record into buffer * * @param record represent vertex or edge */ void addToBatch(T record) { boolean isVertex = executionOptions.getDataType().isVertex(); NebulaOutputFormatConverter converter; if (isVertex) { converter = new NebulaRowVertexOutputFormatConverter((VertexExecutionOptions) executionOptions); } else { converter = new NebulaRowEdgeOutputFormatConverter((EdgeExecutionOptions) executionOptions); } String value = converter.createValue(record, executionOptions.getPolicy()); if (value == null) { return; } nebulaBufferedRow.putRow(value); } /** * commit batch insert statements */ private synchronized void commit() throws IOException { graphClient.switchSpace(executionOptions.getGraphSpace()); future = nebulaBatchExecutor.executeBatch(graphClient); // clear waiting rows numPendingRow.compareAndSet(executionOptions.getBatch(),0); } /** * execute the insert statement * * @param client Asynchronous graph client */ ListenableFuture executeBatch(AsyncGraphClientImpl client) { String propNames = String.join(NebulaConstant.COMMA, executionOptions.getFields()); String values = String.join(NebulaConstant.COMMA, nebulaBufferedRow.getRows()); // construct insert statement String exec = String.format(NebulaConstant.BATCH_INSERT_TEMPLATE, executionOptions.getDataType(), executionOptions.getLabel(), propNames, values); // execute insert statement ListenableFuture<Optional<Integer>> execResult = client.execute(exec); // define callback function Futures.addCallback(execResult, new FutureCallback<Optional<Integer>>() { @Override public void onSuccess(Optional<Integer> integerOptional) { if (integerOptional.isPresent()) { if (integerOptional.get() == ErrorCode.SUCCEEDED) { LOG.info("batch insert Succeed"); } else { LOG.error(String.format("batch insert Error: %d", integerOptional.get())); } } else { LOG.error("batch insert Error"); } } @Override public void onFailure(Throwable throwable) { LOG.error("batch insert Error"); } }); nebulaBufferedRow.clean(); return execResult; }
因爲 Nebula Graph Sink 的寫入是批量、異步的,因此在最後業務結束 close 資源以前須要將緩存中的批量數據提交且等待寫入操做的完成,以防在寫入提交以前提早把 Nebula Graph Client 關閉,代碼以下:
/** * commit the batch write operator before release connection */ @Override public final synchronized void close() throws IOException { if(numPendingRow.get() > 0){ commit(); } while(!future.isDone()){ try { Thread.sleep(10); } catch (InterruptedException e) { LOG.error("sleep interrupted, ", e); } } super.close(); }
2.4 自定義 Nebula Graph Sink 應用實踐
Flink 將處理完成的數據 Sink 到 Nebula Graph 時,須要將 Flink 數據流進行 map 轉換成 Nebula Graph Sink 可接收的數據格式。自定義 Nebula Graph Sink 的使用方式是經過 addSink 形式,將 NebulaSinkFunction 做爲參數傳給 addSink 方法來實現 Flink 數據流的寫入。
- NebulaClientOptions
- 配置 address,NebulaSource 須要配置 Nebula Graph Graphd 服務的地址。
- 配置 username
- 配置 password
- VertexExecutionOptions
- 配置 GraphSpace
- 配置要寫入的 tag
- 配置要寫入的字段集
- 配置寫入的點 ID 所在 Flink 數據流 Row 中的索引
- 配置批量寫入 Nebula Graph 的數量,默認 2000
- EdgeExecutionOptions
- 配置 GraphSpace
- 配置要寫入的 edge
- 配置要寫入的字段集
- 配置寫入的邊 src-id 所在 Flink 數據流 Row 中的索引
- 配置寫入的邊 dst-id 所在 Flink 數據流 Row 中的索引
- 配置寫入的邊 rank 所在 Flink 數據流 Row 中的索引,不配則無 rank
- 配置批量寫入 Nebula Graph 的數量,默認 2000
/// 構造 Nebula Graphd 客戶端鏈接須要的參數 NebulaClientOptions nebulaClientOptions = new NebulaClientOptions .NebulaClientOptionsBuilder() .setAddress("127.0.0.1:3699") .build(); NebulaConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions); // 構造 Nebula Graph 寫入操做參數 List<String> cols = Arrays.asList("name", "age") ExecutionOptions sinkExecutionOptions = new VertexExecutionOptions.ExecutionOptionBuilder() .setGraphSpace("flinkSink") .setTag(tag) .setFields(cols) .setIdIndex(0) .setBatch(20) .builder(); // 寫入 Nebula Graph dataSource.addSink(nebulaSinkFunction);
Nebula Graph Sink 的 Demo 程序以 Nebula Graph 的 space:flinkSource 做爲 Source 讀取數據,進行 map 類型轉換後 Sink 入 Nebula Graph 的 space:flinkSink,對應的應用場景爲將 Nebula Graph 中一個 space 的數據流入另外一個 space 中。
3、 Catalog
Flink 1.11.0 以前,用戶若是依賴 Flink 的 Source/Sink 讀寫外部數據源時,必需要手動讀取對應數據系統的 Schema。好比,要讀寫 Nebula Graph,則必須先保證實確地知曉在 Nebula Graph 中的 Schema 信息。可是這樣會有一個問題,當 Nebula Graph 中的 Schema 發生變化時,也須要手動更新對應的 Flink 任務以保持類型匹配,任何不匹配都會形成運行時報錯使做業失敗。這個操做冗餘且繁瑣,體驗極差。
1.11.0 版本後,用戶使用 Flink Connector 時能夠自動獲取表的 Schema。能夠在不瞭解外部系統數據 Schema 的狀況下進行數據匹配。
目前 Nebula Flink Connector 中已支持數據的讀寫,要實現 Schema 的匹配則須要爲 Flink Connector 實現 Catalog 的管理。但爲了確保 Nebula Graph 中數據的安全性,Nebula Flink Connector 只支持 Catalog 的讀操做,不容許進行 Catalog 的修改和寫入。
訪問 Nebula Graph 指定類型的數據時,完整路徑應該是如下格式:<graphSpace>.<VERTEX.tag>
或者 <graphSpace>.<EDGE.edge>
具體使用方式以下:
String catalogName = "testCatalog"; String defaultSpace = "flinkSink"; String username = "root"; String password = "nebula"; String address = "127.0.0.1:45500"; String table = "VERTEX.player" // define Nebula catalog Catalog catalog = NebulaCatalogUtils.createNebulaCatalog(catalogName,defaultSpace, address, username, password); // define Flink table environment StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); tEnv = StreamTableEnvironment.create(bsEnv); // register customed nebula catalog tEnv.registerCatalog(catalogName, catalog); // use customed nebula catalog tEnv.useCatalog(catalogName); // show graph spaces of Nebula Graph String[] spaces = tEnv.listDatabases(); // show tags and edges of Nebula Graph tEnv.useDatabase(defaultSpace); String[] tables = tEnv.listTables(); // check tag player exist in defaultSpace ObjectPath path = new ObjectPath(defaultSpace, table); assert catalog.tableExists(path) == true // get nebula tag schema CatalogBaseTable table = catalog.getTable(new ObjectPath(defaultSpace, table)); table.getSchema();
Nebula Flink Connector 支持的其餘 Catalog 接口請查看 GitHub 代碼 NebulaCatalog.java。
4、 Exactly-once
Flink Connector 的 Exactly-once 是指 Flink 藉助於 checkpoint 機制保證每一個輸入事件只對最終結果影響一次,在數據處理過程當中即便出現故障,也不會存在數據重複和丟失的狀況。
爲了提供端到端的 Exactly-once 語義,Flink 的外部數據系統也必須提供提交或回滾的方法,而後經過 Flink 的 checkpoint 機制協調。Flink 提供了實現端到端的 Exactly-once 的抽象,即實現二階段提交的抽象類 TwoPhaseCommitSinkFunction。
想爲數據輸出端實現 Exactly-once,則須要實現四個函數:
- beginTransaction 在事務開始前,在目標文件系統的臨時目錄建立一個臨時文件,隨後能夠在數據處理時將數據寫入此文件。
- preCommit 在預提交階段,關閉文件再也不寫入。爲下一個 checkpoint 的任何後續文件寫入啓動一個新事務。
- commit 在提交階段,將預提交階段的文件原子地移動到真正的目標目錄。二階段提交過程會增長輸出數據可見性的延遲。
- abort 在終止階段,刪除臨時文件。
根據上述函數可看出,Flink 的二階段提交對外部數據源有要求,即 Source 數據源必須具有重發功能,Sink 數據池必須支持事務提交和冪等寫。
Nebula Graph v1.1.0 雖然不支持事務,但其寫入操做是冪等的,即同一條數據的屢次寫入結果是一致的。所以能夠經過 checkpoint 機制實現 Nebula Flink Connector 的 At-least-Once 機制,根據屢次寫入的冪等性能夠間接實現 Sink 的 Exactly-once。
要使用 Nebula Graph Sink 的容錯性,請確保在 Flink 的執行環境中開啓了 checkpoint 配置:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(10000) // checkpoint every 10000 msecs .getCheckpointConfig() .setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
Reference
- Nebula Source Demo [testNebulaSource]:https://github.com/vesoft-inc/nebula-java/blob/master/examples/src/main/java/org/apache/flink/FlinkDemo.java
- Nebula Sink Demo [testSourceSink]:https://github.com/vesoft-inc/nebula-java/blob/master/examples/src/main/java/org/apache/flink/FlinkDemo.java
- Apache Flink 源碼:https://github.com/apache/flink
- ApacheFlink 零基礎入門:https://www.infoq.cn/theme/28
- Flink 文檔:https://flink.apache.org/flink-architecture.html
- Flink 實踐文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.12/
- flink-connector-jdbc 源碼:https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-jdbc
- Flink JDBC Catalog 詳解:https://cloud.tencent.com/developer/article/1697913
喜歡這篇文章?來來來,給咱們的 GitHub 點個 star 表鼓勵啦~~ 🙇♂️🙇♀️ [手動跪謝]
交流圖數據庫技術?交個朋友,Nebula Graph 官方小助手微信:NebulaGraphbot 拉你進交流羣~~