隨着互聯網的飛速發展,互聯網的業務量呈爆發性增加,對於的數據量也迅速激增。傳統的單機數據庫在存儲空間及性能的瓶頸,致使其將沒法支撐企業業務的高速發展。伴隨着海量數據對系統性能,成本以及擴展性的新需求,分佈式數據庫系統應運而生。sequoiadb做爲是一款優秀的分佈式文檔型數據庫,其底層基於分佈式,高可用,高性能與動態數據類型設計的,可以應對海量數據的存儲,及提供高效檢索。java
傳統數據庫能夠利用分佈式數據庫的優點來緩解其自身的瓶頸。好比,將歷史數據遷移到sequoiadb,由sequoiadb提供存儲及業務服務,以緩解傳統數據庫自身的壓力。數據遷移分爲全量遷移和增量遷移,本文主要對mysql到sequoiadb的增量數據遷移過程進行分析。mysql
本文將經過一個小案例來分析數據從mysql數據庫抽取,並經由spark對數據進行清洗轉換,最後裝載到sequoiadb的遷移過程。sql
源數據mysql中有兩張待遷移的數據表分別爲student表和grade表,咱們須要對grade表進行增量遷移,每次只遷移更新的數據;此外,咱們須要一張由student和grade整合的大表,方便提供查詢服務。數據庫
具體方案爲先經過select ...... into outfile ......方式從mysql數據庫中導出數據爲csv格式文件,指定導出編碼爲gb18030(此處模擬銀行數據編碼),經由java將文件編碼由gb18030轉爲utf8編碼類型;而後,針對不一樣的數據,選擇不一樣的導入的方式。對於增量的數據能夠調用sdbimprt和sdbupsert工具結合的方式來導入到SDB集羣中,對於須要額外加工處理的數據能夠利用spark的RDD特性進行處理並加裝到sequoiadb集羣中。json
數據遷移流程多線程
mysql導出數據可使用mysqldump工具,也可用select into outfile的方式,此處使用select into outfile方式進行數據抽取,執行如下語句便可實現帶條件抽取指定數據到指定目錄下。其中參數character 指定爲gb18030 編碼,fields 分隔符爲",",lines 分隔符爲'\n'。併發
select * from student where id < 51 into outfile '/data/hbh/student.csv' character set gb18030 fields terminated by ',' lines terminated by '\n'; select * from grade into outfile '/tmp/grade.csv' character set gb18030 fields terminated by ',' lines terminated by '\n';
在執行select into outfile語句時,應確保具備MySQL的FILE權限,不然會遇到權限問題;而且輸出文件必須不存在。這能夠防止MySQL破壞重要的文件。app
咱們遷移的目標數據庫爲sequoiadb,此案例中咱們根據sequoiadb分佈式的優點,結合它的數據分區特性,採用多維分區的方式將海量數據分散到多個數據分區組上進行存儲。該方式經過結合了Hash分佈方式和Partition分佈方式的優勢,讓集合中的數據以更小的顆粒度分佈到數據庫多個數據分區組上,使得數據庫的性能獲得極大提高。dom
1) 對抽取出來的數據進行轉碼分佈式
利用Linux自帶的轉碼工具iconv進行轉碼,將編碼由gb18030 轉爲utf8
iconv -t utf-8 -f gb2312 -c filename > newFilename
-f 原編碼
-t 目標編碼
-c 忽略沒法轉換的字符
2) 建立目標表(主子表模式)
//建立主表 sdb 'db.createCS("test")' db.test.createCL("clname",{ShardingKey:{_id:1},ShardingType:"range",IsMainCL:true}) //建立子表 db.createCS("clname1",{Domain:"test_domain"}).createCL("clname1",{ShardingKey:{"_id:1"},ShardingType:"hash",Compressed:true,CompressionType:"lzw",AutoSplit:true}) //掛載 db.test.clname.attachCL("clname1.clname1",{LowBound:{_id:MinKey()},UpBound:{_id:MaxKey()}})' //對主鍵創建惟一索引 db.test.clname.createIndex("id_Idx",{id:1},true,true)
增量的數據分爲新增數據和更新數據,對於新增數據能夠根據import工具直接導入,對於更新數據能夠利用sdbupsert工具進行更新插入。sdbimprt工具導入數據時,對於更新後的數據而不是新插入的數據,在有惟一性約束的狀況下就會出現鍵值重複(Duplicate Key)的異常,而且這些異常數據會以Json格式記錄在.rec結尾的文件中(與導入的數據文件在同一目錄下)。sdbupsert的功能就是將這些鍵值重複的數據upsert到SDB數據庫中。而且支持多線程併發導入。
增量遷移
1) 使用sdbimprt導入工具進行導入遷移
sdbimprt --hosts="server1:11810,server2:11810,server3:11810" --delfield="\44" --type=csv --file=clname.csv -c test -l clname --fields='id string, user_id string,project string, score double, create_time string' -n 1000 -j 100
2) 使用sdbupsert進行更新遷移
對於鍵值重複的數據sdbimprt會記錄在.rec結尾的文件,使用sdbupert進行更新導入。
java -jar sdbupsert.jar sdb.properties clname.rec
參數說明:
sdb.properties: 配置文件,含有調控參數,及鏈接參數配置
clname.rec:需導入的數據文件,json格式類型
在數據庫的常見模型中(好比星型模型或者雪花模型),表通常分爲兩種:事實表和維度表。維度表通常指固定的、變更較少的表,例如聯繫人、物品種類等,通常數據有限。而事實表通常記錄流水,好比銷售清單等,一般隨着時間的增加不斷膨脹。這裏的student表相似於維度表,grade表相似於事實表。對於這種大小表關聯的join操做,sparkSQL有兩種實現分爲Broadcast Join和Shuffle Hash Join。在SparkSQL中,對兩個表作Join最直接的方式是先根據key分區,再在每一個分區中把key值相同的記錄拿出來作鏈接操做。但這樣就不可避免地涉及到shuffle,而shuffle在Spark中是比較耗時的操做,Broadcast Join爲了不shuffle,咱們能夠將大小有限的維度表的所有數據分發到每一個節點上,供事實表使用。executor存儲維度表的所有數據,必定程度上犧牲了空間,換取shuffle操做大量的耗時。可是這種方式只能用於廣播較小的表,不然數據的冗餘傳輸就遠大於shuffle的開銷。當維度表較大時,可使用Shuffle Hash Join方式,利用key相同必然分區相同的這個原理,SparkSQL將較大表的join分而治之,先將表劃分紅n個分區,再對兩個表中相對應分區的數據分別進行Hash Join,這樣即在必定程度上減小了driver廣播一側表的壓力,也減小了executor端取整張被廣播表的內存消耗。
Spark join同步數據
Spark做爲一個分佈式的計算引擎,能夠經過分區的形式將大批量的數據劃分紅n份較小的數據集進行並行計算。咱們利用其這種特性能夠進行大批量的數據的join關聯操做,並落地到sequoiadb數據庫中。
將須要作join的兩張表數據分別加載出來作成dataframe,而後建立爲臨時視圖進行join操做,以後將沒份結果集插入到sequoiadb中。
val db= new Sequoiadb("192.168.5.188:11810","","") val stu_grade=db.getCollectionSpace("test").getCollection("stu_grade") import spark.implicits._ val spark = SparkSession .builder() .appName("SparkSQL_join") .config("spark.some.config.option", "some-value") .getOrCreate() val user_array = Array(StructField("id", StringType, nullable = true),StructField("name", StringType, nullable = true),StructField("age", IntegerType, nullable = true)) val user_schema = (StructType(user_array)) val user_df = spark.read.schema(user_schema).csv("user.csv") user_df.createTempView("user_tbl"); val grade_array = Array(StructField("id", StringType, nullable = true),StructField("user_id", StringType, nullable = true),StructField("project", StringType, nullable = true),StructField("grade", IntegerType, nullable = true),StructField("create_time", IntegerType, nullable = true)) val grade_schema = (StructType(grade_array)) val grade_df = spark.read.schema(grade_schema).csv("grade.csv") grade_df.createTempView("grade_tbl") val user_grade = spark.sql("select g.id,u.name,g.project,g.grade,g.create_time from user_tbl u right join grade_tbl g on u.id=g.user_id") user_grade.foreachPartition(partitionOfRecords => { partitionOfRecords.foreach(pair => { val id = pair.get(0) val name = pair.get(1) val project = pair.get(2) val grade = pair.get(3) val create_time = pair.get(4); stu_grade.insert("{'id':'"+id+"','name':'"+name+"','project':'"+project+"','grade':'"+grade+"','create_time':"+create_time+ "}") })
數據遷移是系統開發常常涉及到的一項工做。在企業級應用系統中,新系統的開發,新舊系統的升級換代,以及正常的系統維護,不可避免地涉及到大量的遷移工做。數據遷移看視簡單卻蘊含許多技術實現細節。本文粗劣的介紹了數據遷移的整個流程,及一些遷移的方式,還有許多遷移的細節,包括數據的檢查,遷移先後數據一致性等還沒有涉及。