摘要: 主要介紹如何經過官方 ETL 工具 Exchange 將業務線上數據從 Neo4j 直接導入到 Nebula Graph 以及在導入過程當中遇到的問題和優化方法。html
本文首發於 Nebula 論壇:discuss.nebula-graph.com.cn/t/topic/204…java
隨着業務數據量不斷增加,業務對圖數據庫在線數據實時更新寫入和查詢的效率要求也不斷增長。Neo4j 存在明顯性能不足,Neo4j 社區開源版本只支持單機部署,擴展能力存在比較大的問題,沒法知足讀寫性能的線性擴展以及讀寫分離的業務需求,而且開源版本 Neo4j 對點和邊的總數據量也有限制;而 Neo4j 企業版因果集羣也存在單機主節點 Cypher 實時寫入的性能瓶頸。git
相比於 Neo4j,Nebula Graph 最大的特點即是採用 shared-nothing 分佈式的架構,無單主寫入瓶頸問題,讀寫支持線性擴展,擅長處理千億節點、萬億條邊的超大規模數據集。github
本文主要介紹如何經過官方 ETL 工具 Exchange 將業務線上數據從 Neo4j 直接導入到 Nebula Graph 以及在導入過程當中遇到的問題和優化方法。其中絕大部分問題都已經經過論壇發帖的方式獲得社區的支持和解決,本文會結合問題進行逐一列舉。數據庫
系統環境:緩存
軟件環境:服務器
注意:單臺機器部署 Nebula 多節點的端口分配:每一個 storage 還會將用戶配置的端口號 + 1的端口做爲內部使用。請參考論壇帖子 nebula從neo4j導入數據出現Get UUID Failed錯誤markdown
根據 Neo4j 點和邊的屬性信息建立 Nebula Graph 的 Tag 和 Edge 結構,這裏須要注意一點,業務可能會根據不一樣需求只在部分點和邊上增長 Neo4j 點和邊的屬性信息,其餘點和邊對應的屬性爲 NULL
,因此須要先跟業務明確一下點和邊的所有屬性信息,避免遺漏屬性。Nebula Graph 的 Schema 信息相似 MySQL,支持 Create 和 Alter 添加屬性,而且全部的 Tag 和 Edge 的元數據信息是一致的。架構
一、Nebula Graph 建立 Tag 和 Edge併發
# 示例
# 建立圖空間,10 個分區,3 個 storage 副本。
CREATE SPACE test(partition_num=10,replica_factor=3);
# 選擇圖空間 test
USE test;
# 建立標籤 tagA
CREATE TAG tagA(vid string, field-a0 string, field-a1 bool, field-a2 double);
# 建立標籤 tagB
CREATE TAG tagB(vid string, field-b0 string, field-b1 bool, field-b2 double);
# 建立邊類型 edgeAB
CREATE EDGE edgeAB(vid string, field-e0 string, field-e1 bool, field-e2 double);
複製代碼
二、Exchange 導入配置文件
bolt+routing
的方式鏈接neo4j,若是是因果集羣,能夠選擇一個從節點進行 bolt
方式直連讀取數據,減小集羣壓力。{
# Spark relation config
spark: {
app: {
name: Spark Writer
}
driver: {
cores: 1
maxResultSize: 1G
}
cores {
max: 16
}
}
# Nebula Graph relation config
nebula: {
address:{
graph:["xxx.xxx.xxx.xx:3699"]
meta:["xxx.xxx.xxx.xx:45500"]
}
user: user
pswd: password
space: test
connection {
timeout: 3000
retry: 3
}
execution {
retry: 3
}
error: {
max: 32
output: /tmp/errors
}
rate: {
limit: 1024
timeout: 1000
}
}
# Processing tags
tags: [
# Loading tag from neo4j
{
name: tagA
type: {
source: neo4j
sink: client
}
server: "bolt://xxx.xxx.xxx.xxx:7687"
user: neo4j
password: neo4j
exec: "match (n:tagA) where id(n) < 300000000 return n.vid as vid, n.field-a0 as field-a0, n.field-a1 as field-a1, n.field-a2 as field-a2 order by id(n)"
fields: [vid, field-a0, field-a1, field-a2]
nebula.fields: [vid, field-a0, field-a1, field-a2]
vertex: {
field: vid
policy: "uuid"
}
partition: 10
batch: 1000
check_point_path: /tmp/test
}
# Loading tag from neo4j
{
name: tagB
type: {
source: neo4j
sink: client
}
server: "bolt://xxx.xxx.xxx.xxx:7687"
user: neo4j
password: neo4j
exec: "match (n:tagB) where id(n) < 300000000 return n.vid as vid, n.field-b0 as field-b0, n.field-b1 as field-b1, n.field-b2 as field-b2 order by id(n)"
fields: [vid, field-b0, field-b1, field-b2]
nebula.fields: [vid, field-b0, field-b1, field-b2]
vertex: {
field: vid
policy: "uuid"
}
partition: 10
batch: 1000
check_point_path: /tmp/test
}
]
# Processing edges
edges: [
# Loading edges from neo4j
{
name: edgeAB
type: {
source: neo4j
sink: client
}
server: "bolt://xxx.xxx.xxx.xxx:7687"
user: neo4j
password: neo4j
exec: "match (a:tagA)-[r:edgeAB]->(b:tagB) where id(r) < 300000000 return n.vid as vid, n.field-e0 as field-e0, n.field-e1 as field-e1, n.field-e2 as field-e2 order by id(r)"
fields: [vid, field-e0, field-e1, field-e2]
nebula.fields: [vid, field-e0, field-e1, field-e2]
source: {
field: a.vid
policy: "uuid"
}
target: {
field: b.vid
policy: "uuid"
}
partition: 10
batch: 1000
check_point_path: /tmp/test
}
]
}
複製代碼
三、執行導入命令
nohup spark-submit --class com.vesoft.nebula.tools.importer.Exchange --master "local" exchange-1.1.0.jar -c test.conf > test.log &
複製代碼
四、查看導入 Nebula Graph 的數據量
./bin/db_dump --space=test --db_path=./data/storage/nebula/ --meta_server=127.0.0.1:45500 -limit 0 --mode=stat --tags=tagA,tagB --edges=edgeAB
複製代碼
注意:Nebula 1.x 版本目前還只能用 db_dump 統計,2.0 會支持 nGQL 命令的方式統計數量。
增量數據導入主要是經過 Neo4j 內部點和邊的自增 id()
進行切割,在導入配置文件 exec 項執行 Neo4j Cypher 語句時增長 id()
範圍限制,但前提是須要業務停掉刪數據操做,由於增量導入時,若是以前的數據被刪除後 Neo4j 會複用 id()
,這會致使複用 id()
的增量數據導入時查詢不到形成數據丟失。固然業務若是有條件支持 Neo4j Nebula 雙寫的話,增量導入就不會出現這種問題。
exec: "match (n:user) where id(n) >= 300000000 and id(n) < 400000000 return xxx order by id(n)"
複製代碼
請參考論壇帖子 neo4j到nebula如何作增量導入
使用 Exchange 導入過程當中遇到兩個問題,及時的獲得官方 @nicole 的支持和解決,具體請參考下面兩個帖子:
問題 1:Exchange 不支持「換行回車」等特殊字符的轉義。以下 string 數據中帶有回車,在拼接 insert
語句插入時會由於換行致使插入失敗。
PR:github.com/vesoft-inc/… 已經合入 exchange v1.0 分支
問題 2:Exchange 不支持屬性爲 NULL
的數據導入。前文 3.1 中提到,業務可能會根據不一樣需求爲某些點和邊增長屬性,這時其餘點和邊屬性則是 NULL
,這樣在使用 Exchange 導入時會報錯。
參考帖子 2 給出的修改建議解決:修改 com.vesoft.nebula.tools.importer.processor.Processor#extraValue
,增長 NULL
類型的轉化值。
case NullType => {
fieldTypeMap(field) match {
case StringType => ""
case IntegerType => 0
case LongType => 0L
case DoubleType => 0.0
case BooleanType => false
}
}
複製代碼
關於導入效率的優化,請參考下面兩個帖子:
優化 1:經過適當增長導入配置中的 partition 和 batch 值,提高導入效率。 優化 2:若是是 string 類型作 vid 的話,1.x 版本儘可能使用 hash() 函數轉化,2.0 版本會支持 string id 類型;若是是int類型作vid的話,能夠直接使用,不用轉化效率更高。 優化 3:官方建議 spark-submit 提交命令 master 配置改成 yarn-cluster
, 若不使用 yarn,可配置成 spark://ip:port
;咱們是經過 spark-submit --master "local[16]"
的方式增長 spark 併發,導入效率比使用 "local"
提高 4 倍+,測試環境單機三節點 HDD 盤 IO 峯值能到 200-300 MB/s。但在指定 --master "local[16]"
併發導入時遇到 hadoop 緩存問題,採用增長 hdfs 配置 fs.hdfs.impl.disable.cache=true
後重啓 hadoop 解決。具體請參考第二個帖子。
使用 Exchange 從 Neo4j 導入 Nebula Graph 過程當中遇到一些問題,經過積極與社區進行溝通獲得了官方 @nicole 及其餘小夥伴的快速響應和大力支持,這一點在 Neo4j 導入 Nebula Graph 的實踐過程當中起到了十分關鍵的做用,感謝社區的大力支持。期待支持 openCypher 的 Nebula Graph 2.0。