本文主要講述如何使用數據導入工具 Nebula Graph Exchange 將數據從 Neo4j 導入到 Nebula Graph Database。在講述如何實操數據導入以前,咱們先來了解下 Nebula Graph 內部是如何實現這個導入功能的。java
Nebula Graph Exchange 的數據處理原理
咱們這個導入工具名字是 Nebula Graph Exchange,採用 Spark 做爲導入平臺,來支持海量數據的導入和保障性能。Spark 自己提供了不錯的抽象——DataFrame,使得能夠輕鬆支持多種數據源。在 DataFrame 的支持下,添加新的數據源只需提供配置文件讀取的代碼和返回 DataFrame 的 Reader 類,便可支持新的數據源。git
DataFrame 能夠視爲一種分佈式存表格。DataFrame 能夠存儲在多個節點的不一樣分區中,多個分區能夠存儲在不一樣的機器上,從而支持並行操做。Spark 還提供了一套簡潔的 API 使用戶輕鬆操做 DataFrame 如同操做本地數據集通常。如今大多數數據庫提供直接將數據導出成 DataFrame 功能,即便某個數據庫並未提供此功能也能夠經過數據庫 driver 手動構建 DataFrame。github
Nebula Graph Exchange 將數據源的數據處理成 DataFrame 以後,會遍歷它的每一行,根據配置文件中 fields 的映射關係,按列名獲取對應的值。在遍歷 batchSize
個行以後,Exchange 會將獲取的數據一次性寫入到 Nebula Graph 中。目前,Exchange 是經過生成 nGQL 語句再由 Nebula Client 異步寫入數據,下一步會支持直接導出 Nebula Graph 底層存儲的 sst 文件,以獲取更好的性能。接下來介紹一下 Neo4j 數據源導入的具體實現。sql
Neo4j 數據導入具體實現
雖然 Neo4j 官方提供了可將數據直接導出爲 DataFrame 的庫,但使用它讀取數據難以知足斷點續傳的需求,咱們未直接使用這個庫,而是使用 Neo4j 官方的 driver 實現數據讀取。Exchange 經過在不一樣分區調取 Neo4j driver 執行不一樣 skip
和 limit
的 Cypher 語句,將數據分佈在不一樣的分區,來獲取更好的性能。這個分區數量由配置項 partition
指定。docker
Exchange 中的 Neo4jReader 類會先將用戶配置中的 exec
Cypher 語句,return
後邊的語句替換成 count(*)
執行獲取數據總量,再根據分區數計算每一個分區的起始偏移量和大小。這裏若是用戶配置了 check_point_path
目錄,會讀取目錄中的文件,若是處於續傳的狀態,Exchange 會計算出每一個分區應該的偏移量和大小。而後每一個分區在 Cypher 語句後邊添加不一樣的 skip
和 limit
,調用 driver 執行。最後將返回的數據處理成 DataFrame 就完成了 Neo4j 的數據導入。shell
過程以下圖所示:數據庫
Neo4j 數據導入實踐
咱們這裏導入演示的系統環境以下:微信
- cpu name:Intel(R) Xeon(R) CPU E5-2697 v3 @ 2.60GHz
- cpu cores:14
- memory size:251G
軟件環境以下:架構
- Neo4j:3.5.20 社區版
- Nebula graph:docker-compose 部署,默認配置
- Spark:單機版,版本爲 2.4.6 pre-build for hadoop2.7
因爲 Nebula Graph 是強 schema 數據庫,數據導入前需先進行建立 Space,建 Tag 和 Edge 的 schema,具體的語法能夠參考這裏。併發
這裏建了名爲 test 的 Space,副本數爲 1。這裏建立了兩種 Tag 分別爲 tagA 和 tagB,均含有 4 個屬性的點類型,此外,還建立一種名爲 edgeAB 的邊類型,一樣含有 4 個屬性。具體的 nGQL 語句以下所示:
# 建立圖空間 CREATE SPACE test(replica_factor=1); # 選擇圖空間 test USE test; # 建立標籤 tagA CREATE TAG tagA(idInt int, idString string, tboolean bool, tdouble double); # 建立標籤 tagB CREATE TAG tagB(idInt int, idString string, tboolean bool, tdouble double); # 建立邊類型 edgeAB CREATE EDGE edgeAB(idInt int, idString string, tboolean bool, tdouble double);
同時向 Neo4j 導入 Mock 數據——標籤爲 tagA 和 tagB 的點,數量總共爲 100 萬,而且導入了鏈接 tagA 和 tagB 類型點邊類型爲 edgeAB 的邊,共 1000 萬個。另外須要注意的是,從 Neo4j 導出的數據在 Nebula Graph 中必須存在屬性,且數據對應的類型要同 Nebula Graph 一致。
最後爲了提高向 Neo4j 導入 Mock 數據的效率和 Mock 數據在 Neo4j 中的讀取效率,這裏爲 tagA 和 tagB 的 idInt
屬性建了索引。關於索引須要注意 Exchange 並不會將 Neo4j 中的索引、約束等信息導入到 Nebula Graph 中,因此須要用戶在執行數據寫入在 Nebula Graph 以後,自行建立索引和 REBUILD 索引(爲已有數據創建索引)。
接下來就能夠將 Neo4j 數據導入到 Nebula Graph 中了,首先咱們須要下載和編譯打包項目,項目在 nebula-java 這個倉庫下 tools/exchange 文件夾中。可執行以下命令:
git clone https://github.com/vesoft-inc/nebula-java.git cd nebula-java/tools/exchange mvn package -DskipTests
而後就能夠看到 target/exchange-1.0.1.jar
這個文件。
接下來編寫配置文件,配置文件的格式爲:HOCON(Human-Optimized Config Object Notation),能夠基於 src/main/resources/server_application.conf
文件的基礎上進行更改。首先對 nebula 配置項下的 address、user、pswd 和 space 進行配置,測試環境均爲默認配置,因此這裏不須要額外的修改。而後進行 tags 配置,須要 tagA 和 tagB 的配置,這裏僅展現 tagA 配置,tagB 和 tagA 配置相同。
{ # ======neo4j鏈接設置======= name: tagA # 必須和 Nebula Graph 的中 tag 名字一致,須要在 Nebula Graph 中事先建好 tag server: "bolt://127.0.0.1:7687" # neo4j 的地址配置 user: neo4j # neo4j 的用戶名 password: neo4j # neo4j 的密碼 encryption: false # (可選): 傳輸是否加密,默認值爲 false database: graph.db # (可選): neo4j database 名稱,社區版不支持 # ======導入設置============ type: { source: neo4j # 還支持 PARQUET、ORC、JSON、CSV、HIVE、MYSQL、PULSAR、KAFKA... sink: client # 寫入 Nebula Graph 的方式,目前僅支持 client,將來會支持直接導出 Nebula Graph 底層數據庫文件 } nebula.fields: [idInt, idString, tdouble, tboolean] fields : [idInt, idString, tdouble, tboolean] # 映射關係 fields,上方爲 nebula 的屬性名,下方爲 neo4j 的屬性名,一一對應 # 映射關係的配置是 List 而不是 Map,是爲了保持 fields 的順序,將來直接導出 nebula 底層存儲文件時須要 vertex: idInt # 做爲 nebula vid 的 neo4j field,類型須要是整數(long or int)。 partition: 10 # 分區數 batch: 2000 # 一次寫入 nebula 多少數據 check_point_path: "file:///tmp/test" # (可選): 保存導入進度信息的目錄,用於斷點續傳 exec: "match (n:tagA) return n.idInt as idInt, n.idString as idString, n.tdouble as tdouble, n.tboolean as tboolean order by n.idInt" }
邊的設置大部分與點的設置無異,但因爲邊在 Nebula Graph 中有起點的 vid 和終點的 vid 標識,因此這裏須要指定做爲邊起點 vid 的域和做爲邊終點 vid 的域。
下面給出邊的特別配置。
source: { field: a.idInt # policy: "hash" } # 起點的 vid 設置 target: { field: b.idInt # policy: "uuid" } # 終點的 vid 設置 ranking: idInt # (可選): 做爲 rank 的 field partition: 1 # 這裏分區數設置爲 1,緣由在後邊 exec: "match (a:tagA)-[r:edgeAB]->(b:tagB) return a.idInt, b.idInt, r.idInt as idInt, r.idString as idString, r.tdouble as tdouble, r.tboolean as tboolean order by id(r)"
點的 vertex 和邊的 source、target 配置項下均可以設置 policy hash/uuid,它能夠將類型爲字符串的域做爲點的 vid,經過 hash/uuid 函數將字符串映射成整數。
上面的例子因爲做爲點的 vid 爲整數,因此並不須要 policy 的設置。hash/uuid的 區別請看這裏。
Cypher 標準中若是沒有 order by
約束的話就不能保證每次查詢結果的排序一致,雖然看起來即使不加 order by
Neo4j 返回的結果順序也是不變的,但爲了防止可能形成的導入時數據丟失,仍是強烈建議在 Cypher 語句中加入 order by
,雖然這會增長導入的時間。爲了提高導入效率, order by
語句最好選取有索引的屬性做爲排序的屬性。若是沒有索引,也可觀察默認的排序,選擇合適的排序屬性以提升效率。若是默認的排序找不到規律,可使用點/關係的 ID 做爲排序屬性,而且將 partition
的值儘可能設小,減小 Neo4j 的排序壓力,本文中邊 edgeAB
的 partition
就設置爲 1。
另外 Nebula Graph 在建立點和邊時會將 ID 做爲惟一主鍵,若是主鍵已存在則會覆蓋該主鍵中的數據。因此假如將某個 Neo4j 屬性值做爲 Nebula Graph 的 ID,而這個屬性值在 Neo4j 中是有重複的,就會致使「重複 ID」對應的數據有且只有一條會存入 Nebula Graph 中,其它的則會被覆蓋掉。因爲數據導入過程是併發地往 Nebula Graph 中寫數據,最終保存的數據並不能保證是 Neo4j 中最新的數據。
這裏還要留意下斷點續傳功能,在斷點和續傳之間,數據庫不該該改變狀態,如添加數據或刪除數據,且 partition
數量也不能更改,不然可能會有數據丟失。
最後因爲 Exchange 須要在不一樣分區執行不一樣 skip
和 limit
的 Cypher 語句,因此用戶提供的 Cypher 語句不能含有 skip
和 limit
語句。
接下來就能夠運行 Exchange 程序導數據了,執行以下命令:
$SPARK_HOME/bin/spark-submit --class com.vesoft.nebula.tools.importer.Exchange --master "local[10]" target/exchange-1.0.1.jar -c /path/to/conf/neo4j_application.conf
在上述這些配置下,導入 100 萬個點用時 13s,導入 1000 萬條邊用時 213s,總用時是 226s。
附:Neo4j 3.5 Community 和 Nebula Graph 1.0.1的一些比較
Neo4j 和 Nebula Graph 在系統架構、數據模型和訪問方式上都有一些差別,下表列舉了常見的異同
做者有話說:Hi,我是李夢捷,圖數據庫 Nebula Graph 的研發工程師,若是你對此文有疑問,歡迎來咱們的 Nebula Graph 論壇交流下心得~~
喜歡這篇文章?來來來,給咱們的 GitHub 點個 star 表鼓勵啦~~ 🙇♂️🙇♀️ [手動跪謝]
交流圖數據庫技術?交個朋友,Nebula Graph 官方小助手微信:NebulaGraphbot 拉你進交流羣~~