20180705關於mysql binlog的解析方式

來自:https://blog.csdn.net/u012985132/article/details/74964366/mysql

關係型數據庫和Hadoop生態的溝通愈來愈密集,時效要求也愈來愈高。本篇就來調研下實時抓取MySQL更新數據到HDFS。正則表達式

本篇僅做爲調研報告。spring

初步調研了canal(Ali)+kafka connect+kafka、maxwell(Zendesk)+kafka和mysql_streamer(Yelp)+kafka。這幾個工具抓取MySQL的方式都是經過掃描binlog,模擬MySQL master和slave(Mysql Replication架構–解決了:數據多點備份,提升數據可用性;讀寫分流,提升集羣的併發能力。(並不是是負載均衡);讓一些非實時的數據操做,轉移到slaves上進行。)之間的協議來實現實時更新的sql


先科普下Canal數據庫

Canal簡介

原理

Canal原理圖

Canal原理圖json


原理相對比較簡單:bootstrap

 

  1. canal模擬mysql slave的交互協議,假裝本身爲mysql slave,向mysql master發送dump協議
  2. mysql master收到dump請求,開始推送(slave拉取,不是master主動push給slaves)binary log給slave(也就是canal)
  3. canal解析binary log對象(原始爲byte流)

架構

Canal架構圖

Canal架構圖api

 

組件說明:緩存

  1. server表明一個canal運行實例,對應於一個jvm
  2. instance對應於一個數據隊列(1個server對應1..n個instance)

而instance模塊又由eventParser(數據源接入,模擬slave協議和master進行交互,協議解析)、eventSink(Parser和Store鏈接器,進行數據過濾,加工,分發的工做)、eventStore(數據存儲)和metaManager(增量訂閱&消費信息管理器)組成。服務器

  • EventParser在向mysql發送dump命令以前會先從Log Position中獲取上次解析成功的位置(若是是第一次啓動,則獲取初始指定位置或者當前數據段binlog位點)。mysql接受到dump命令後,由EventParser從mysql上pull binlog數據進行解析並傳遞給EventSink(傳遞給EventSink模塊進行數據存儲,是一個阻塞操做,直到存儲成功),傳送成功以後更新Log Position。流程圖以下:
    EventParser流程圖

    EventParser流程圖

     

  • EventSink起到一個相似channel的功能,能夠對數據進行過濾、分發/路由(1:n)、歸併(n:1)和加工。EventSink是鏈接EventParser和EventStore的橋樑。

  • EventStore實現模式是內存模式,內存結構爲環形隊列,由三個指針(Put、Get和Ack)標識數據存儲和讀取的位置。

  • MetaManager是增量訂閱&消費信息管理器,增量訂閱和消費之間的協議包括get/ack/rollback,分別爲:

Message getWithoutAck(int batchSize),容許指定batchSize,一次能夠獲取多條,每次返回的對象爲Message,包含的內容爲:batch id[惟一標識]和entries[具體的數據對象]
void rollback(long batchId),顧命思議,回滾上次的get請求,從新獲取數據。基於get獲取的batchId進行提交,避免誤操做
void ack(long batchId),顧命思議,確認已經消費成功,通知server刪除數據。基於get獲取的batchId進行提交,避免誤操做

增量訂閱和消費之間的協議交互以下:
增量訂閱和消費協議

增量訂閱和消費協議

 

canal的get/ack/rollback協議和常規的jms協議有所不一樣,容許get/ack異步處理,好比能夠連續調用get屢次,後續異步按順序提交ack/rollback,項目中稱之爲流式api.

流式api設計的好處:

  • get/ack異步化,減小因ack帶來的網絡延遲和操做成本 (99%的狀態都是處於正常狀態,異常的rollback屬於個別狀況,不必爲個別的case犧牲整個性能)
  • get獲取數據後,業務消費存在瓶頸或者須要多進程/多線程消費時,能夠不停的輪詢get數據,不停的日後發送任務,提升並行化. (在實際業務中的一個case:業務數據消費須要跨中美網絡,因此一次操做基本在200ms以上,爲了減小延遲,因此須要實施並行化)

流式api設計示意圖以下:
流式api

流式api

 

  • 每次get操做都會在meta中產生一個mark,mark標記會遞增,保證運行過程當中mark的惟一性
  • 每次的get操做,都會在上一次的mark操做記錄的cursor繼續日後取,若是mark不存在,則在last ack cursor繼續日後取
  • 進行ack時,須要按照mark的順序進行數序ack,不能跳躍ack. ack會刪除當前的mark標記,並將對應的mark位置更新爲last ack cusor
  • 一旦出現異常狀況,客戶端可發起rollback狀況,從新置位:刪除全部的mark, 清理get請求位置,下次請求會從last ack cursor繼續日後取

這個流式api是否是相似hdfs write在pipeline中傳輸packet的形式,先將packet放入dataQueue,而後向下遊傳輸,此時將packet放入ackQueue等到下游返回的ack,這也是異步的。

HA機制

canal是支持HA的,其實現機制也是依賴zookeeper來實現的,用到的特性有watcher和EPHEMERAL節點(和session生命週期綁定),與HDFS的HA相似。

canal的ha分爲兩部分,canal server和canal client分別有對應的ha實現

  • canal server: 爲了減小對mysql dump的請求,不一樣server上的instance(不一樣server上的相同instance)要求同一時間只能有一個處於running,其餘的處於standby狀態(standby是instance的狀態)。
  • canal client: 爲了保證有序性,一份instance同一時間只能由一個canal client進行get/ack/rollback操做,不然客戶端接收沒法保證有序。

server ha的架構圖以下:
ha

ha


大體步驟:

 

  1. canal server要啓動某個canal instance時都先向zookeeper進行一次嘗試啓動判斷(實現:建立EPHEMERAL節點,誰建立成功就容許誰啓動)
  2. 建立zookeeper節點成功後,對應的canal server就啓動對應的canal instance,沒有建立成功的canal instance就會處於standby狀態
  3. 一旦zookeeper發現canal server A建立的instance節點消失後,當即通知其餘的canal server再次進行步驟1的操做,從新選出一個canal server啓動instance。
  4. canal client每次進行connect時,會首先向zookeeper詢問當前是誰啓動了canal instance,而後和其創建連接,一旦連接不可用,會從新嘗試connect。

Canal Client的方式和canal server方式相似,也是利用zookeeper的搶佔EPHEMERAL節點的方式進行控制.

Canal部署及使用

MySQL配置

canal同步數據須要掃描MySQL的binlog日誌,而binlog默認是關閉的,須要開啓,而且爲了保證同步數據的一致性,使用的日誌格式爲row-based replication(RBR),在my.conf中開啓binlog,

 
    
1
2
3
4
 
    
[mysqld]
log-bin=mysql-bin #添加這一行就ok
binlog-format=ROW  #選擇row模式
server_id=1  #配置mysql replaction須要定義,不能和canal的slaveId重複

更改my.conf以後,須要重啓MySQL,重啓的方式有不少找到合適本身的就行。

Canal配置

由上面的介紹得知Canal由ServerInstance組成,而Server中又能夠包含不少個Instance,一個Instance對應一個數據庫實例,則Canal將配置分爲兩類,一類是server的配置,名字爲canal.properties,另外一類是instance的配置,名字爲instance.properties,通常會在conf目錄下新建一個instance同名的目錄,將其放入此目錄中。

先介紹canal.properties中的幾個關鍵屬性

參數名字 參數說明 默認值
canal.destinations 當前server上部署的instance列表
canal.conf.dir conf/目錄所在的路徑 ../conf
canal.instance.global.spring.xml 全局的spring配置方式的組件文件 classpath:spring/file-instance.xml 
(spring目錄相對於canal.conf.dir)
canal.zkServers canal server連接zookeeper集羣的連接信息
canal.zookeeper.flush.period canal持久化數據到zookeeper上的更新頻率,單位毫秒 1000
canal.file.data.dir canal持久化數據到file上的目錄 ../conf (默認和instance.properties爲同一目錄,方便運維和備份)
canal.file.flush.period canal持久化數據到file上的更新頻率,單位毫秒 1000
canal.instance.memory.batch.mode canal內存store中數據緩存模式 
1. ITEMSIZE : 根據buffer.size進行限制,只限制記錄的數量 
2. MEMSIZE : 根據buffer.size * buffer.memunit的大小,限制緩存記錄的大小
MEMSIZE
canal.instance.memory.buffer.size canal內存store中可緩存buffer記錄數,須要爲2的指數 16384
canal.instance.memory.buffer.memunit 內存記錄的單位大小,默認1KB,和buffer.size組合決定最終的內存使用大小 1024

下面看下instance.properties,這裏的屬性較少:

參數名字 參數說明 默認值
canal.instance.mysql.slaveId mysql集羣配置中的serverId概念,須要保證和當前mysql集羣中id惟一 1234
canal.instance.master.address mysql主庫連接地址 127.0.0.1:3306
canal.instance.master.journal.name mysql主庫連接時起始的binlog文件
canal.instance.master.position mysql主庫連接時起始的binlog偏移量
canal.instance.master.timestamp mysql主庫連接時起始的binlog的時間戳
canal.instance.dbUsername mysql數據庫賬號 canal
canal.instance.dbPassword mysql數據庫密碼 canal
canal.instance.defaultDatabaseName mysql連接時默認schema
canal.instance.connectionCharset mysql 數據解析編碼 UTF-8
canal.instance.filter.regex mysql 數據解析關注的表,Perl正則表達式. 
多個正則之間以逗號(,)分隔,轉義符須要雙斜槓
.*\\..*

除了上面兩個配置文件,conf目錄下還有一個目錄須要強調下,那就是spring目錄,裏面存放的是instance.xml配置文件,目前默認支持的instance.xml有memory-instance.xml、file-instance.xml、default-instance.xml和group-instance.xml。這裏主要維護的增量訂閱和消費的關係信息(解析位點和消費位點)。

對應的兩個位點組件,目前都有幾種實現:

  • memory (memory-instance.xml中使用)
  • zookeeper
  • mixed
  • file (file-instance.xml中使用,集合了file+memory模式,先寫內存,定時刷新數據到本地file上)
  • period (default-instance.xml中使用,集合了zookeeper+memory模式,先寫內存,定時刷新數據到zookeeper上)

分別介紹下這幾種配置的功能

  • memory-instance.xml:

全部的組件(parser , sink , store)都選擇了內存版模式,記錄位點的都選擇了memory模式,重啓後又會回到初始位點進行解析

特色:速度最快,依賴最少(不須要zookeeper)

場景:通常應用在quickstart,或者是出現問題後,進行數據分析的場景,不該該將其應用於生產環境

  • file-instance.xml:

全部的組件(parser , sink , store)都選擇了基於file持久化模式(組件內容持久化的file存在哪裏???),注意,不支持HA機制.

特色:支持單機持久化

場景:生產環境,無HA需求,簡單可用.

  • default-instance.xml:

全部的組件(parser , sink , store)都選擇了持久化模式,目前持久化的方式主要是寫入zookeeper,保證數據集羣共享.(全部組件持久化的內容只有位置信息吧???)

特色:支持HA

場景:生產環境,集羣化部署.

  • group-instance.xml:

主要針對須要進行多庫合併時,能夠將多個物理instance合併爲一個邏輯instance,提供客戶端訪問。

場景:分庫業務。 好比產品數據拆分了4個庫,每一個庫會有一個instance,若是不用group,業務上要消費數據時,須要啓動4個客戶端,分別連接4個instance實例。使用group後,能夠在canal server上合併爲一個邏輯instance,只須要啓動1個客戶端,連接這個邏輯instance便可.

canal example 部署

  • 在須要同步的MySQL數據庫中建立一個用戶,用來replica數據,這裏新建的用戶名和密碼都爲canal,命令以下:
 
    
1
2
3
4
 
    
CREATE USER canal IDENTIFIED BY  'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO  'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO  'canal'@'%' ;
FLUSH PRIVILEGES;
  • Mysql建立canal用戶併爲其賦所需權限以後,須要對Canal的配置文件(canal.properties和instance.properties)進行設置。

canal.properties和instance.properties裏採用默認配置便可(這裏只是運行個樣例,生產中能夠參考具體的參數屬性進行設置),

  • Canal配置好以後,啓動Canal client(client的做用是將Canal裏的解析的binlog日誌固化到存儲介質中)。

client組件Canal自己是不提供的,須要根據api進行開發,這裏將官方提供的client代碼打包成jar進行消費Canal信息。

canal HA配置

canal的HA機制是依賴zk來實現的,須要更改canal.properties文件,修改內容以下:

 
    
1
2
3
4
 
    
# zk集羣地址
canal.zkServers=10.20.144.51:2181
# 選擇記錄方式
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

更改兩臺canal機器上instance實例的配置instance.properties,修改內容以下:

 
    
1
2
 
    
canal.instance.mysql.slaveId = 1234  ##另一臺機器改爲1235,保證slaveId不重複便可
canal.instance.master.address = 10.20.144.15:3306

配置好以後啓動canal進程,在兩臺服務器上執行sh bin/startup.sh

client進行消費時,能夠直接指定zookeeper地址和instance name,也可讓canal client會自動從zookeeper中的running節點,獲取當前服務的工做節點,而後與其創建連接。

maxwell簡介

maxwell實時抓取mysql數據的原理也是基於binlog,和canal相比,maxwell更像是canal server + 實時client。(數據抽取 + 數據轉換)

maxwell集成了kafka producer,直接從binlog獲取數據更新並寫入kafka,而canal則須要本身開發實時client將canal讀取的binlog內容寫入kafka中。

maxwell特點:

  • 支持bootstrap啓動,同步歷史數據
  • 集成kafka,直接將數據落地到kafka
  • 已將binlog中的DML和DDL進行了模式匹配,將其解碼爲有schema的json(有利於後期將其重組爲nosql支持的語言)
    {「database」:」test」,」table」:」e」,」type」:」update」,」ts」:1488857869,」xid」:8924,」commit」:true,」data」:{「id」:1,」m」:5.556666,」torvalds」:null},」old」:{「m」:5.55}}

缺點:

  • 一個MySQL實例須要對應一個maxwell進程
  • bootstrap的方案使用的是select *

maxwell的配置文件只有一個config.properties,在home目錄。其中除了須要配置mysql master的地址、kafka地址還須要配置一個用於存放maxwell相關信息的mysql地址,maxwell會把讀取binlog關係的信息,如binlog name、position。

工具對比

以上是Canal的原理及部署,其他相似maxwell和mysql_streamer對mysql進行實時數據抓取的原理同樣就再也不進行一一介紹,這裏只對他們進行下對比:

特點 Canal Maxwell mysql_streamer
語言 Java Java Python
活躍度 活躍 活躍 不活躍
HA 支持 定製 支持
數據落地 定製 落地到kafka 落地到kafka
分區 支持 不支持 不支持
bootstrap 不支持 支持 支持
數據格式 格式自由 json(格式固定) json(格式固定)
文檔 較詳細 較詳細 略粗
隨機讀 支持 支持 支持

以上只是將mysql裏的實時變化數據的binlog以同種形式同步到kafka,但要實時更新到hadoop還須要使用一個實時數據庫來存儲數據,並自定製開發將kafka中數據解析爲nosql數據庫能夠識別的DML進行實時更新Nosql數據庫,使其與MySQL裏的數據實時同步。

基礎架構

架構圖以下:
基礎架構圖

基礎架構圖

 

虛線框是可選的方案

方案對比

  1. 方案1使用阿里開源的Canal進行Mysql binlog數據的抽取,另需開發一個數據轉換工具將從binlog中解析出的數據轉換成自帶schema的json數據並寫入kafka中。而方案2使用maxwell可直接完成對mysql binlog數據的抽取和轉換成自帶schema的json數據寫入到kafka中。
  2. 方案1中不支持表中已存在的歷史數據進行同步,此功能須要開發(若是使用sqoop進行歷史數據同步,不夠靈活,會使結果表與原始表結構相同,有區別於數據交換平臺所需的schema)。方案2提供同步歷史數據的解決方案。
  3. 方案1支持HA部署,而方案2不支持HA

方案1和方案2的區別只在於kafka以前,當數據緩存到kafka以後,須要一個定製的數據路由組件來將自帶schema的數據解析到目標存儲中。
數據路由組件主要負責將kafka中的數據實時讀出,寫入到目標存儲中。(如將全部日誌數據保存到HDFS中,也能夠將數據落地到全部支持jdbc的數據庫,落地到HBase,Elasticsearch等。)

綜上,
方案1須要開發的功能有:

  • bootstrap功能
  • 實時數據轉換工具
  • 數據路由工具

方案2須要開發的功能有:

  • 數據路由工具
  • HA模塊(初期可暫不支持HA,因此開發緊急度不高)

數據路由工具是兩個方案都須要開發的,則我比較偏向於第二種方案,由於在初期試水階段能夠短時間出成果,能夠較快的驗證想法,並在嘗試中可以較快的發現問題,好及時的調整方案。即便方案2中maxwell最終不能知足需求,而使用canal的話,咱們也可能將實時數據轉換工具的數據輸出模式與maxwell一致,這樣初始投入人力開發的數據路由工具依然能夠繼續使用,而不須要從新開發。

把增量的Log做爲一切系統的基礎。後續的數據使用方,經過訂閱kafka來消費log。

好比:
大數據的使用方能夠將數據保存到Hive表或者Parquet文件給Hive或Spark查詢;
提供搜索服務的使用方能夠保存到Elasticsearch或HBase 中;
提供緩存服務的使用方能夠將日誌緩存到Redis或alluxio中;
數據同步的使用方能夠將數據保存到本身的數據庫中;
因爲kafka的日誌是能夠重複消費的,而且緩存一段時間,各個使用方能夠經過消費kafka的日誌來達到既能保持與數據庫的一致性,也能保證明時性;

{「database」:」test」,」table」:」e」,」type」:」update」,」ts」:1488857869,」xid」:8924,」commit」:true,」data」:{「id」:1,」m」:5.556666,」torvalds」:null},」old」:{「m」:5.55}}

{「database」:」test」,」table」:」e」,」type」:」insert」,」ts」:1488857922,」xid」:8932,」commit」:true,」data」:{「id」:2,」m」:4.2,」torvalds」:null}}

相關文章
相關標籤/搜索