《阿里巴巴mysql數據庫binlog的增量訂閱&消費組件》 https://github.com/alibaba/canal java
早期,阿里巴巴B2B公司由於存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的數據庫同步業務,主要是基於trigger的方式獲取增量變動,不過從2010年開始,阿里系公司開始逐步的嘗試基於數據庫的日誌解析,獲取增量變動進行同步,由此衍生出了增量訂閱&消費的業務,今後開啓了一段新紀元。mysql
名稱:運河[kə'næl]git
譯意:水道/管道/溝渠github
語言:純java開發spring
定位:基於數據庫增量日誌解析,提供增量數據訂閱&消費,目前主要支持了mysqlsql
關鍵詞:mysql binlog解析器/實時/隊列和主題數據庫
基於日誌增量訂閱&消費支持的業務:apache
1.數據庫鏡像
2.數據庫實時備份
3.多級索引 (賣家和買家各自分庫索引)
4.search build
5.業務cache刷新
6.價格變化等重要業務消息segmentfault
從上層來看,複製分紅三步:服務器
master將改變記錄到二進制日誌(binary log)中(這些記錄叫作二進制日誌事件,binary log events,能夠經過show binlog events進行查看);
slave將master的binary log events拷貝到它的中繼日誌(relay log);
slave重作中繼日誌中的事件,將改變反映它本身的數據。
CentOs7.3 搭建 MySQL 5.7.19 主從複製,以及複製實現細節分析
原理相對比較簡單:
1.運河模擬mysql slave的交互協議,假裝本身爲mysql slave,向mysql master發送dump協議
2.mysql master收到dump請求,開始推送二進制日誌給slave(也就是運河)
3.運河解析二進制對象(原始爲字節流)
canal的原理是基於mysql binlog技術,因此這裏必定須要開啓mysql的binlog寫入功能,建議配置binlog模式爲row.
針對阿里雲RDS帳號默認已經有binlog dump權限,不須要任何權限或者binlog設置,能夠直接跳過這一步
修改 etc/my.cnf
$ cat /etc/my.cnf [mysqld] log-bin=mysql-bin #添加這一行就ok binlog-format=ROW #選擇row模式 server_id=1 #配置mysql replaction須要定義,不能和canal的slaveId重複
MySQL 安裝
《CentOs7.3 安裝 MySQL 5.7.19 二進制版本》
直接下載 訪問:https://github.com/alibaba/canal/releases,會列出全部歷史的發佈版本包 下載方式,好比以1.0.24版本爲例子:
$ ca /opt $ wget https://github.com/alibaba/canal/releases/download/canal-1.0.24/canal.deployer-1.0.24.tar.gz
or 本身編譯
$ git clone git@github.com:alibaba/canal.git $ cd canal; $ mvn clean install -Dmaven.test.skip -Denv=release
編譯完成後,會在根目錄下產生target/canal.deployer-$version.tar.gz
$ mkdir /opt/canal $ tar zxvf canal.deployer-$version.tar.gz -C /opt/canal
應用參數:
$ vi conf/example/instance.properties
################################################# ## mysql serverId canal.instance.mysql.slaveId = 1234 canal.instance.master.address 須要改爲本身的數據庫信息 canal.instance.master.address = 192.168.252.124:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = username/password,須要改爲本身的數據庫信息 canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF-8
$ sh bin/startup.sh
$ less logs/canal/canal.log
2017-08-28 16:21:08.945 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server. 2017-08-28 16:21:09.102 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.252.125:11111] 2017-08-28 16:21:10.087 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
具體instance的日誌:
$ less logs/example/example.log
2017-08-28 16:21:09.449 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties] 2017-08-28 16:21:09.460 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties] 2017-08-28 16:21:09.555 [main] WARN org.springframework.beans.TypeConverterDelegate - PropertyEditor [com.sun.beans.editors.EnumEditor] found through deprecated global PropertyEditorManager fallback - consider using a more isolated form of registration, e.g. on the BeanWrapper/BeanFactory! 2017-08-28 16:21:09.730 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 2017-08-28 16:21:09.988 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful.... 2017-08-28 16:21:10.061 [destination = example , address = /192.168.252.124:3306 , EventParser] WARN c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just last position {"identity":{"slaveId":-1,"sourceAddress":{"address":"192.168.252.124","port":3306}},"postion":{"included":false,"journalName":"mysql-bin.000002","position":5225,"serverId":1,"timestamp":1503908357000}}
$ sh bin/stop.sh
把項目源碼克隆下
$ clone https://github.com/alibaba/canal.git
建立表
DROP TABLE IF EXISTS `test`; CREATE TABLE `test` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;
導入開發工具: \canal\example
,修改部署Canal 的服務器IP 便可
啓動測試類
package com.alibaba.otter.canal.example; import java.net.InetSocketAddress; import org.apache.commons.lang.exception.ExceptionUtils; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.common.utils.AddressUtils; /** * 單機模式的測試例子 * * @author jianghang 2013-4-15 下午04:19:20 * @version 1.0.4 */ public class SimpleCanalClientTest extends AbstractCanalClientTest { public SimpleCanalClientTest(String destination){ super(destination); } public static void main(String args[]) { // 根據ip,直接建立連接,無HA的功能 String destination = "example"; // String ip = AddressUtils.getHostIp(); CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.252.125", 11111), destination, "", ""); final SimpleCanalClientTest clientTest = new SimpleCanalClientTest(destination); clientTest.setConnector(connector); clientTest.start(); Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { try { logger.info("## stop the canal client"); clientTest.stop(); } catch (Throwable e) { logger.warn("##something goes wrong when stopping canal:\n{}", ExceptionUtils.getFullStackTrace(e)); } finally { logger.info("## canal client is down."); } } }); } }
在數據庫。表裏插入,測試訂閱數據,等事物提價了,這邊看是否訂閱成功
響應
**************************************************** * Batch Id: [5] ,count : [3] , memsize : [160] , Time : 2017-08-28 16:53:42 * Start : [mysql-bin.000002:6201:1503910428000(2017-08-28 16:53:48)] * End : [mysql-bin.000002:6383:1503910428000(2017-08-28 16:53:48)] **************************************************** ================> binlog[mysql-bin.000002:6201] , executeTime : 1503910428000 , delay : -5311ms BEGIN ----> Thread id: 9 ----------------> binlog[mysql-bin.000002:6329] , name[penglei,test] , eventType : INSERT , executeTime : 1503910428000 , delay : -5311ms id : 8 type=int(11) update=true name : 測試訂閱 type=varchar(255) update=true ---------------- END ----> transaction id: 106 ================> binlog[mysql-bin.000002:6383] , executeTime : 1503910428000 , delay : -5311ms