在《Spark Connector Reader 原理與實踐》中咱們提過 Spark Connector 是一個 Spark 的數據鏈接器,能夠經過該鏈接器進行外部數據系統的讀寫操做,Spark Connector 包含兩部分,分別是 Reader 和 Writer,而本文主要講述如何利用 Spark Connector 進行 Nebula Graph 數據的寫入。html
Spark SQL 容許用戶自定義數據源,支持對外部數據源進行擴展。java
Nebula 的 Spark Connector 單條數據寫入是基於 DatasourceV2 實現的,須要如下幾個步驟:git
WriteSupport
並重寫 createWriter
,建立自定義的 DataSourceWriter
。DataSourceWriter
建立 NebulaDataSourceVertexWriter
類和 NebulaDataSourceEdgeWriter
類,重寫 createWriterFactory
方法並返回自定義的 DataWriterFactory
,重寫 commit
方法,用來提交整個事務。重寫 abort
方法,用來作事務回滾。Nebula Graph 1.x 不支持事務操做,故該實現中 commit
和 abort
無實質性操做。DataWriterFactory
建立 NebulaVertexWriterFactory
類和 NebulaEdgeWriterFactory
類,重寫 createWriter
方法返回自定義的 DataWriter
。DataWriter
建立 NebulaVertexWriter
類和 NebulaEdgeWriter
類,重寫 write
方法,用來將數據寫出,重寫 commit
方法用來提交事務,重寫 abort
方法用來作事務回滾 ,一樣 DataWriter
中的 commit
方法和 abort
方法無實質性操做。Nebula 的 Spark Connector Writer 的實現類圖以下:github
具體寫入邏輯在 NebulaVertexWriter
和 NebulaEdgeWriter
的 write
方法中,一次寫入的邏輯以下:sql
Nebula 的 Spark Connector 的批量數據寫入與 Exchange 工具相似,是經過對 DataFrame 進行 map
操做批量數據累計提交實現的。shell
Spark Connector 的 Writer 功能提供了兩類接口供用戶編程進行數據寫入。寫入的數據源爲 DataFrame,Spark Writer 提供了單條寫入和批量寫入兩類接口。數據庫
拉取 GitHub 上 Spark Connector 代碼:apache
git clone -b v1.0 https://github.com/vesoft-inc/nebula-java.git
cd nebula-java/tools/nebula-spark
mvn clean compile package install -Dgpg.skip -Dmaven.javadoc.skip=true
複製代碼
將編譯打成的包 copy 到本地 maven 庫。編程
應用示例以下:json
nebula-spark
依賴<dependency>
<groupId>com.vesoft</groupId>
<artifactId>nebula-spark</artifactId>
<version>1.0.1</version>
</dependency>
複製代碼
// 構造點和邊數據的 DataFrame ,示例數據在 nebula-java/examples/src/main/resources 目錄下
val vertexDF = spark.read.json("examples/src/main/resources/vertex")
vertexDF.show()
val edgeDF = spark.read.json("examples/src/main/resources/edge")
edgeDF.show()
// 寫入點
vertexDF.write
.nebula("127.0.0.1:3699", "nb", "100")
.writeVertices("player", "vertexId", "hash")
// 寫入邊
edgeDF.write
.nebula("127.0.0.1:3699", "nb", "100")
.wirteEdges("follow", "source", "target")
複製代碼
配置說明:
// 構造點和邊數據的 DataFrame ,示例數據在 nebula-java/examples/src/main/resources 目錄下
val vertexDF = spark.read.json("examples/src/main/resources/vertex")
vertexDF.show()
val edgeDF = spark.read.json("examples/src/main/resources/edge")
edgeDF.show()
// 批量寫入點
new NebulaBatchWriterUtils()
.batchInsert("127.0.0.1:3699", "nb", 2000)
.batchToNebulaVertex(vertexDF, "player", "vertexId")
// 批量寫入邊
new NebulaBatchWriterUtils()
.batchInsert("127.0.0.1:3699", "nb", 2000)
.batchToNebulaEdge(edgeDF, "follow", "source", "target")
複製代碼
配置說明:
至此,Nebula Spark Connector Writer 講解完畢,歡迎前往 GitHub:github.com/vesoft-inc/… 試用。
喜歡這篇文章?來來來,給咱們的 GitHub 點個 star 表鼓勵啦~~ 🙇♂️🙇♀️ [手動跪謝]
交流圖數據庫技術?交個朋友,Nebula Graph 官方小助手微信:NebulaGraphbot 拉你進交流羣~~