是時候改變你數倉的增量同步方案了

通過一段時間的演化,spark-binlog,delta-plus慢慢進入正軌。spark-binlog能夠將MySQL binlog做爲標準的Spark數據源來使用,目前支持insert/update/delete 三種事件的捕捉。 delta-plus則是對Delta Lake的一個加強庫,譬如在Delta Plus裏實現了將binlog replay進Detla表,從而保證Delta表和數據庫表接近實時同步。除此以外,detla-plus還集成了譬如布隆過濾器等來儘快數據更新更新速度。更多特性可參考我寫的專欄。mysql

數據湖Delta Lake 深刻解析 ​ zhuanlan.zhihu.com 圖標 有了這兩個庫,加上Spark,咱們就能經過兩行代碼完成庫表的同步。sql

之前若是要作數據增量同步,大概須要這麼個流程: 數據庫

問題很明顯,Pipeline長,涉及到技術多,中間轉存其實也挺麻煩的,難作到實時。咱們但願能夠更簡單些,好比最好是這樣: apache

而後我可能只要寫以下代碼就能夠搞定:app

val spark: SparkSession = ???編輯器

val df = spark.readStream. format("org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource"). option("host","127.0.0.1"). option("port","3306"). option("userName","xxxxx"). option("password","xxxxx"). option("databaseNamePattern","mlsql_console"). option("tableNamePattern","script_file"). optioin("binlogIndex","4"). optioin("binlogFileOffset","4"). load()url

df.writeStream. format("org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource").
option("path","/tmp/sync/tables"). option("mode","Append"). option("idCols","id"). option("duration","5"). option("syncType","binlog"). checkpointLocation("/tmp/cpl-binlog2") .mode(OutputMode.Append).save("{db}/{table}") 讀和寫,很是簡單。讀你須要提供MySQL binlog信息,寫的時候指定主鍵,以及表的存儲路徑。spa

若是使用MLSQL則更簡單,下面是一個完整的流式同步腳本:插件

set streamName="binlog";code

load binlog.`` where host="127.0.0.1" and port="3306" and userName="xxxx" and password="xxxxxx" and bingLogNamePrefix="mysql-bin" and binlogIndex="4" and binlogFileOffset="4" and databaseNamePattern="mlsql_console" and tableNamePattern="script_file" as table1;

save append table1
as rate.mysql_{db}.{table} options mode="Append" and idCols="id" and duration="5" and syncType="binlog" and checkpointLocation="/tmp/cpl-binlog2";

由於是增量同步,因此第一次須要先全量同步一次,用MLSQL也很簡單:

connect jdbc where url="jdbc:mysql://127.0.0.1:3306/mlsql_console?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false" and driver="com.mysql.jdbc.Driver" and user="xxxxx" and password="xxxx" as db_cool;

load jdbc.db_cool.script_file as script_file; save overwrite script_file as delta.mysql_mlsql_console.script_file ;

load delta.mysql_mlsql_console.script_file as output; 若是你使用了Console則可在編輯器裏直接運行:

若是你安裝了binlog2delta插件, 則可享受嚮導便利: