Spark Connector Writer 原理與實踐

nebula-spark-connector-reader

《Spark Connector Reader 原理與實踐》中咱們提過 Spark Connector 是一個 Spark 的數據鏈接器,能夠經過該鏈接器進行外部數據系統的讀寫操做,Spark Connector 包含兩部分,分別是 Reader 和 Writer,而本文主要講述如何利用 Spark Connector 進行 Nebula Graph 數據的寫入。html

Spark Connector Writer 原理

Spark SQL 容許用戶自定義數據源,支持對外部數據源進行擴展。java

Nebula 的 Spark Connector 單條數據寫入是基於 DatasourceV2 實現的,須要如下幾個步驟:git

  1. 繼承 WriteSupport 並重寫 createWriter,建立自定義的 DataSourceWriter
  2. 繼承 DataSourceWriter 建立 NebulaDataSourceVertexWriter 類和 NebulaDataSourceEdgeWriter 類,重寫 createWriterFactory 方法並返回自定義的 DataWriterFactory,重寫 commit 方法,用來提交整個事務。重寫 abort 方法,用來作事務回滾。Nebula Graph 1.x 不支持事務操做,故該實現中 commitabort 無實質性操做。
  3. 繼承 DataWriterFactory 建立 NebulaVertexWriterFactory 類和 NebulaEdgeWriterFactory 類,重寫 createWriter 方法返回自定義的 DataWriter
  4. 繼承 DataWriter 建立 NebulaVertexWriter 類和 NebulaEdgeWriter 類,重寫 write 方法,用來將數據寫出,重寫 commit 方法用來提交事務,重寫 abort 方法用來作事務回滾 ,一樣 DataWriter 中的 commit 方法和 abort 方法無實質性操做。

Nebula 的 Spark Connector Writer 的實現類圖以下:github

nebula-spark-connector-writer

具體寫入邏輯在 NebulaVertexWriter 和 NebulaEdgeWriter 的 write 方法中,一次寫入的邏輯以下:sql

  1. 建立客戶端,鏈接 Nebula 的 graphd 服務;
  2. 數據寫入前先指定 graphSpace;
  3. 構造 Nebula 的數據寫入 statement;
  4. 提交 statement,執行寫入操做;
  5. 定義回調函數接收寫入操做執行結果。

Nebula 的 Spark Connector 的批量數據寫入與 Exchange 工具相似,是經過對 DataFrame 進行 map 操做批量數據累計提交實現的。shell

Spark Connector Writer 實踐

Spark Connector 的 Writer 功能提供了兩類接口供用戶編程進行數據寫入。寫入的數據源爲 DataFrame,Spark Writer 提供了單條寫入批量寫入兩類接口。數據庫

拉取 GitHub 上 Spark Connector 代碼:apache

git clone -b v1.0 https://github.com/vesoft-inc/nebula-java.git
cd nebula-java/tools/nebula-spark
mvn clean compile package install -Dgpg.skip -Dmaven.javadoc.skip=true
複製代碼

將編譯打成的包 copy 到本地 maven 庫。編程

應用示例以下:json

  1. 在 mvn 項目的 pom 文件中加入 nebula-spark 依賴
<dependency>
  <groupId>com.vesoft</groupId>
  <artifactId>nebula-spark</artifactId>
  <version>1.0.1</version>
</dependency>
複製代碼
  1. 在 Spark 程序中將 DataFrame 數據寫入 Nebula
  • 2.1 逐條寫入 Nebula:
// 構造點和邊數據的 DataFrame ,示例數據在 nebula-java/examples/src/main/resources 目錄下
val vertexDF = spark.read.json("examples/src/main/resources/vertex")
    vertexDF.show()
val edgeDF = spark.read.json("examples/src/main/resources/edge")
		edgeDF.show()

// 寫入點
vertexDF.write
  .nebula("127.0.0.1:3699", "nb", "100")
  .writeVertices("player", "vertexId", "hash")
  
// 寫入邊
edgeDF.write
	.nebula("127.0.0.1:3699", "nb", "100")
  .wirteEdges("follow", "source", "target")

複製代碼

配置說明:

  • nebula(address: String, space: String, partitionNum: String)
    • address:能夠配置多個地址,以英文逗號分割,如「ip1:3699,ip2:3699」
    • space: Nebula 的 graphSpace
    • partitionNum:建立 space 時指定的 Nebula 中的 partitionNum,未指定則默認爲 100
  • writeVertices(tag: String, vertexFiled: String, policy: String = "")
    • tag:Nebula 中點的 tag
    • vertexFiled:Dataframe 中可做爲 Nebula 點 ID 的列,如 DataFrame 的列爲 a,b,c,若是把 a 列做爲點的 ID 列,則該參數設置爲 a
    • policy:若 DataFrame 中 vertexFiled 列的數據類型非數值型,則須要配置 Nebula 中 VID 的映射策略
  • writeEdges(edge: String, srcVertexField: String, dstVertexField: String, policy: String = "")
    • edge:Nebula 中邊的 edge
    • srcVertexField:DataFrame 中可做爲源點的列
    • dstVertexField:DataFrame 中可做爲邊目標點的列
    • policy:若 DataFrame 中 srcVertexField 列或 dstVertexField 列的數據類型非數值型,則須要配置 Nebula 中 edge ID 的映射策略
  • 2.2 批量寫入 Nebula
// 構造點和邊數據的 DataFrame ,示例數據在 nebula-java/examples/src/main/resources 目錄下
val vertexDF = spark.read.json("examples/src/main/resources/vertex")
    vertexDF.show()
val edgeDF = spark.read.json("examples/src/main/resources/edge")
		edgeDF.show()

// 批量寫入點
new NebulaBatchWriterUtils()
      .batchInsert("127.0.0.1:3699", "nb", 2000)
      .batchToNebulaVertex(vertexDF, "player", "vertexId")
  
// 批量寫入邊
new NebulaBatchWriterUtils()
      .batchInsert("127.0.0.1:3699", "nb", 2000)
      .batchToNebulaEdge(edgeDF, "follow", "source", "target")

複製代碼

配置說明:

  • batchInsert(address: String, space: String, batch: Int = 2000)
    • address:能夠配置多個地址,以英文逗號分割,如「ip1:3699,ip2:3699」
    • space:Nebula 的 graphSpace
    • batch:批量寫入時一批次的數據量,可不配置,默認爲 2000
  • batchToNebulaVertex(data: DataFrame, tag: String, vertexField: String, policy: String = "")
    • data:待寫入 Nebula 的 DataFrame 數據
    • tag:Nebula 中點的 tag
    • vertexField:Dataframe 中可做爲 Nebula 點 ID 的列
    • policy:Nebula 中 VID 的映射策略,當 vertexField 列的值爲數值時可不配置
  • batchToNebulaEdge(data: DataFrame,  edge: String, srcVertexField: String, dstVertexField: String, rankField: String = "",  policy: String = "")
    • data:待寫入 Nebula 的 DataFrame 數據
    • edge:Nebula 中邊的 edge
    • srcVertexField:DataFrame 中可做爲源點的列
    • dstVertexField:DataFrame 中可做爲邊目標點的列
    • rankField:DataFrame 中可做爲邊 rank 值的列,可不配置
    • policy:edge 中點的映射策略,當 srcVertexField 和 dstVertexField 列的值爲數值時可不配置

至此,Nebula Spark Connector Writer 講解完畢,歡迎前往 GitHub:github.com/vesoft-inc/… 試用。

喜歡這篇文章?來來來,給咱們的 GitHub 點個 star 表鼓勵啦~~ 🙇‍♂️🙇‍♀️ [手動跪謝]

交流圖數據庫技術?交個朋友,Nebula Graph 官方小助手微信:NebulaGraphbot 拉你進交流羣~~

相關文章
相關標籤/搜索