來自:https://blog.csdn.net/u012985132/article/details/74964366/mysql
關係型數據庫和Hadoop生態的溝通愈來愈密集,時效要求也愈來愈高。本篇就來調研下實時抓取MySQL更新數據到HDFS。正則表達式
本篇僅做爲調研報告。spring
初步調研了canal(Ali)+kafka connect+kafka、maxwell(Zendesk)+kafka和mysql_streamer(Yelp)+kafka。這幾個工具抓取MySQL的方式都是經過掃描binlog,模擬MySQL master和slave(Mysql Replication架構–解決了:數據多點備份,提升數據可用性;讀寫分流,提升集羣的併發能力。(並不是是負載均衡);讓一些非實時的數據操做,轉移到slaves上進行。)之間的協議來實現實時更新的。sql
先科普下Canal數據庫
Canal原理圖json
原理相對比較簡單:bootstrap
Canal架構圖api
組件說明:緩存
而instance模塊又由eventParser(數據源接入,模擬slave協議和master進行交互,協議解析)、eventSink(Parser和Store鏈接器,進行數據過濾,加工,分發的工做)、eventStore(數據存儲)和metaManager(增量訂閱&消費信息管理器)組成。服務器
EventParser在向mysql發送dump命令以前會先從Log Position中獲取上次解析成功的位置(若是是第一次啓動,則獲取初始指定位置或者當前數據段binlog位點)。mysql接受到dump命令後,由EventParser從mysql上pull binlog數據進行解析並傳遞給EventSink(傳遞給EventSink模塊進行數據存儲,是一個阻塞操做,直到存儲成功),傳送成功以後更新Log Position。流程圖以下:
EventSink起到一個相似channel的功能,能夠對數據進行過濾、分發/路由(1:n)、歸併(n:1)和加工。EventSink是鏈接EventParser和EventStore的橋樑。
EventStore實現模式是內存模式,內存結構爲環形隊列,由三個指針(Put、Get和Ack)標識數據存儲和讀取的位置。
MetaManager是增量訂閱&消費信息管理器,增量訂閱和消費之間的協議包括get/ack/rollback,分別爲:
Message getWithoutAck(int batchSize),容許指定batchSize,一次能夠獲取多條,每次返回的對象爲Message,包含的內容爲:batch id[惟一標識]和entries[具體的數據對象]
void rollback(long batchId),顧命思議,回滾上次的get請求,從新獲取數據。基於get獲取的batchId進行提交,避免誤操做
void ack(long batchId),顧命思議,確認已經消費成功,通知server刪除數據。基於get獲取的batchId進行提交,避免誤操做
canal的get/ack/rollback協議和常規的jms協議有所不一樣,容許get/ack異步處理,好比能夠連續調用get屢次,後續異步按順序提交ack/rollback,項目中稱之爲流式api.
流式api設計的好處:
這個流式api是否是相似hdfs write在pipeline中傳輸packet的形式,先將packet放入dataQueue,而後向下遊傳輸,此時將packet放入ackQueue等到下游返回的ack,這也是異步的。
canal是支持HA的,其實現機制也是依賴zookeeper來實現的,用到的特性有watcher和EPHEMERAL節點(和session生命週期綁定),與HDFS的HA相似。
canal的ha分爲兩部分,canal server和canal client分別有對應的ha實現
大體步驟:
Canal Client的方式和canal server方式相似,也是利用zookeeper的搶佔EPHEMERAL節點的方式進行控制.
canal同步數據須要掃描MySQL的binlog日誌,而binlog默認是關閉的,須要開啓,而且爲了保證同步數據的一致性,使用的日誌格式爲row-based replication(RBR),在my.conf
中開啓binlog,
1
2
3
4
|
[mysqld]
log-bin=mysql-bin #添加這一行就ok
binlog-format=ROW
#選擇row模式
server_id=1
#配置mysql replaction須要定義,不能和canal的slaveId重複
|
更改my.conf以後,須要重啓MySQL,重啓的方式有不少找到合適本身的就行。
由上面的介紹得知Canal由Server
和Instance
組成,而Server中又能夠包含不少個Instance,一個Instance對應一個數據庫實例,則Canal將配置分爲兩類,一類是server的配置,名字爲canal.properties
,另外一類是instance的配置,名字爲instance.properties
,通常會在conf目錄下新建一個instance同名的目錄,將其放入此目錄中。
先介紹canal.properties中的幾個關鍵屬性
參數名字 | 參數說明 | 默認值 |
---|---|---|
canal.destinations | 當前server上部署的instance列表 | 無 |
canal.conf.dir | conf/目錄所在的路徑 | ../conf |
canal.instance.global.spring.xml | 全局的spring配置方式的組件文件 | classpath:spring/file-instance.xml (spring目錄相對於canal.conf.dir) |
canal.zkServers | canal server連接zookeeper集羣的連接信息 | 無 |
canal.zookeeper.flush.period | canal持久化數據到zookeeper上的更新頻率,單位毫秒 | 1000 |
canal.file.data.dir | canal持久化數據到file上的目錄 | ../conf (默認和instance.properties爲同一目錄,方便運維和備份) |
canal.file.flush.period | canal持久化數據到file上的更新頻率,單位毫秒 | 1000 |
canal.instance.memory.batch.mode | canal內存store中數據緩存模式 1. ITEMSIZE : 根據buffer.size進行限制,只限制記錄的數量 2. MEMSIZE : 根據buffer.size * buffer.memunit的大小,限制緩存記錄的大小 |
MEMSIZE |
canal.instance.memory.buffer.size | canal內存store中可緩存buffer記錄數,須要爲2的指數 | 16384 |
canal.instance.memory.buffer.memunit | 內存記錄的單位大小,默認1KB,和buffer.size組合決定最終的內存使用大小 | 1024 |
下面看下instance.properties,這裏的屬性較少:
參數名字 | 參數說明 | 默認值 |
---|---|---|
canal.instance.mysql.slaveId | mysql集羣配置中的serverId概念,須要保證和當前mysql集羣中id惟一 | 1234 |
canal.instance.master.address | mysql主庫連接地址 | 127.0.0.1:3306 |
canal.instance.master.journal.name | mysql主庫連接時起始的binlog文件 | 無 |
canal.instance.master.position | mysql主庫連接時起始的binlog偏移量 | 無 |
canal.instance.master.timestamp | mysql主庫連接時起始的binlog的時間戳 | 無 |
canal.instance.dbUsername | mysql數據庫賬號 | canal |
canal.instance.dbPassword | mysql數據庫密碼 | canal |
canal.instance.defaultDatabaseName | mysql連接時默認schema | 無 |
canal.instance.connectionCharset | mysql 數據解析編碼 | UTF-8 |
canal.instance.filter.regex | mysql 數據解析關注的表,Perl正則表達式. 多個正則之間以逗號(,)分隔,轉義符須要雙斜槓 |
.*\\..* |
除了上面兩個配置文件,conf目錄下還有一個目錄須要強調下,那就是spring目錄,裏面存放的是instance.xml配置文件,目前默認支持的instance.xml有memory-instance.xml、file-instance.xml、default-instance.xml和group-instance.xml。這裏主要維護的增量訂閱和消費的關係信息(解析位點和消費位點)。
對應的兩個位點組件,目前都有幾種實現:
分別介紹下這幾種配置的功能
全部的組件(parser , sink , store)都選擇了內存版模式,記錄位點的都選擇了memory模式,重啓後又會回到初始位點進行解析
特色:速度最快,依賴最少(不須要zookeeper)
場景:通常應用在quickstart,或者是出現問題後,進行數據分析的場景,不該該將其應用於生產環境
全部的組件(parser , sink , store)都選擇了基於file持久化模式(組件內容持久化的file存在哪裏???),注意,不支持HA機制.
特色:支持單機持久化
場景:生產環境,無HA需求,簡單可用.
全部的組件(parser , sink , store)都選擇了持久化模式,目前持久化的方式主要是寫入zookeeper,保證數據集羣共享.(全部組件持久化的內容只有位置信息吧???)
特色:支持HA
場景:生產環境,集羣化部署.
主要針對須要進行多庫合併時,能夠將多個物理instance合併爲一個邏輯instance,提供客戶端訪問。
場景:分庫業務。 好比產品數據拆分了4個庫,每一個庫會有一個instance,若是不用group,業務上要消費數據時,須要啓動4個客戶端,分別連接4個instance實例。使用group後,能夠在canal server上合併爲一個邏輯instance,只須要啓動1個客戶端,連接這個邏輯instance便可.
canal
,命令以下:
1
2
3
4
|
CREATE USER canal IDENTIFIED BY
'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO
'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO
'canal'@'%' ;
FLUSH PRIVILEGES;
|
canal
用戶併爲其賦所需權限以後,須要對Canal的配置文件(canal.properties和instance.properties)進行設置。canal.properties和instance.properties裏採用默認配置便可(這裏只是運行個樣例,生產中能夠參考具體的參數屬性進行設置),
client組件Canal自己是不提供的,須要根據api進行開發,這裏將官方提供的client代碼打包成jar進行消費Canal信息。
canal的HA機制是依賴zk來實現的,須要更改canal.properties文件,修改內容以下:
1
2
3
4
|
# zk集羣地址
canal.zkServers=10.20.144.51:2181
# 選擇記錄方式
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
|
更改兩臺canal機器上instance實例的配置instance.properties,修改內容以下:
1
2
|
canal.instance.mysql.slaveId = 1234
##另一臺機器改爲1235,保證slaveId不重複便可
canal.instance.master.address = 10.20.144.15:3306
|
配置好以後啓動canal進程,在兩臺服務器上執行sh bin/startup.sh
client進行消費時,能夠直接指定zookeeper地址和instance name,也可讓canal client會自動從zookeeper中的running節點,獲取當前服務的工做節點,而後與其創建連接。
maxwell實時抓取mysql數據的原理也是基於binlog,和canal相比,maxwell更像是canal server + 實時client
。(數據抽取 + 數據轉換)
maxwell集成了kafka producer,直接從binlog獲取數據更新並寫入kafka,而canal則須要本身開發實時client將canal讀取的binlog內容寫入kafka中。
maxwell特點:
缺點:
select *
maxwell的配置文件只有一個config.properties,在home目錄。其中除了須要配置mysql master的地址、kafka地址還須要配置一個用於存放maxwell相關信息的mysql地址,maxwell會把讀取binlog關係的信息,如binlog name、position。
以上是Canal的原理及部署,其他相似maxwell和mysql_streamer對mysql進行實時數據抓取的原理同樣就再也不進行一一介紹,這裏只對他們進行下對比:
特點 | Canal | Maxwell | mysql_streamer |
---|---|---|---|
語言 | Java | Java | Python |
活躍度 | 活躍 | 活躍 | 不活躍 |
HA | 支持 | 定製 | 支持 |
數據落地 | 定製 | 落地到kafka | 落地到kafka |
分區 | 支持 | 不支持 | 不支持 |
bootstrap | 不支持 | 支持 | 支持 |
數據格式 | 格式自由 | json(格式固定) | json(格式固定) |
文檔 | 較詳細 | 較詳細 | 略粗 |
隨機讀 | 支持 | 支持 | 支持 |
以上只是將mysql裏的實時變化數據的binlog以同種形式同步到kafka,但要實時更新到hadoop還須要使用一個實時數據庫來存儲數據,並自定製開發將kafka中數據解析爲nosql數據庫能夠識別的DML進行實時更新Nosql數據庫,使其與MySQL裏的數據實時同步。
虛線框是可選的方案
方案1和方案2的區別只在於kafka以前,當數據緩存到kafka以後,須要一個定製的數據路由組件來將自帶schema的數據解析到目標存儲中。
數據路由組件主要負責將kafka中的數據實時讀出,寫入到目標存儲中。(如將全部日誌數據保存到HDFS中,也能夠將數據落地到全部支持jdbc的數據庫,落地到HBase,Elasticsearch等。)
綜上,
方案1須要開發的功能有:
方案2須要開發的功能有:
數據路由工具是兩個方案都須要開發的,則我比較偏向於第二種方案,由於在初期試水階段能夠短時間出成果,能夠較快的驗證想法,並在嘗試中可以較快的發現問題,好及時的調整方案。即便方案2中maxwell最終不能知足需求,而使用canal的話,咱們也可能將實時數據轉換工具的數據輸出模式與maxwell一致,這樣初始投入人力開發的數據路由工具依然能夠繼續使用,而不須要從新開發。
把增量的Log做爲一切系統的基礎。後續的數據使用方,經過訂閱kafka來消費log。
好比:
大數據的使用方能夠將數據保存到Hive表或者Parquet文件給Hive或Spark查詢;
提供搜索服務的使用方能夠保存到Elasticsearch或HBase 中;
提供緩存服務的使用方能夠將日誌緩存到Redis或alluxio中;
數據同步的使用方能夠將數據保存到本身的數據庫中;
因爲kafka的日誌是能夠重複消費的,而且緩存一段時間,各個使用方能夠經過消費kafka的日誌來達到既能保持與數據庫的一致性,也能保證明時性;
{「database」:」test」,」table」:」e」,」type」:」update」,」ts」:1488857869,」xid」:8924,」commit」:true,」data」:{「id」:1,」m」:5.556666,」torvalds」:null},」old」:{「m」:5.55}}
{「database」:」test」,」table」:」e」,」type」:」insert」,」ts」:1488857922,」xid」:8932,」commit」:true,」data」:{「id」:2,」m」:4.2,」torvalds」:null}}