阿里巴巴 MySQL 數據庫 binlog 的增量訂閱&消費組件

阿里巴巴 MySQL 數據庫 binlog 的增量訂閱&消費組件

背景

《阿里巴巴mysql數據庫binlog的增量訂閱&消費組件》 https://github.com/alibaba/canal java

早期,阿里巴巴B2B公司由於存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的數據庫同步業務,主要是基於trigger的方式獲取增量變動,不過從2010年開始,阿里系公司開始逐步的嘗試基於數據庫的日誌解析,獲取增量變動進行同步,由此衍生出了增量訂閱&消費的業務,今後開啓了一段新紀元。mysql

項目介紹

名稱:運河[kə'næl]git

譯意:水道/管道/溝渠github

語言:純java開發spring

定位:基於數據庫增量日誌解析,提供增量數據訂閱&消費,目前主要支持了mysqlsql

關鍵詞:mysql binlog解析器/實時/隊列和主題數據庫

基於日誌增量訂閱&消費支持的業務:apache

1.數據庫鏡像
2.數據庫實時備份
3.多級索引 (賣家和買家各自分庫索引)
4.search build
5.業務cache刷新
6.價格變化等重要業務消息segmentfault

工做原理

mysql主備複製實現

圖片描述

從上層來看,複製分紅三步:服務器

master將改變記錄到二進制日誌(binary log)中(這些記錄叫作二進制日誌事件,binary log events,能夠經過show binlog events進行查看);
slave將master的binary log events拷貝到它的中繼日誌(relay log);
slave重作中繼日誌中的事件,將改變反映它本身的數據。

CentOs7.3 搭建 MySQL 5.7.19 主從複製,以及複製實現細節分析

canal的工做原理

圖片描述

原理相對比較簡單:

1.運河模擬mysql slave的交互協議,假裝本身爲mysql slave,向mysql master發送dump協議
2.mysql master收到dump請求,開始推送二進制日誌給slave(也就是運河)
3.運河解析二進制對象(原始爲字節流)

canal的原理是基於mysql binlog技術,因此這裏必定須要開啓mysql的binlog寫入功能,建議配置binlog模式爲row.

針對阿里雲RDS帳號默認已經有binlog dump權限,不須要任何權限或者binlog設置,能夠直接跳過這一步

修改 etc/my.cnf

$ cat /etc/my.cnf
[mysqld]
log-bin=mysql-bin     #添加這一行就ok
binlog-format=ROW     #選擇row模式
server_id=1           #配置mysql replaction須要定義,不能和canal的slaveId重複

配置步驟:

MySQL 安裝

《CentOs7.3 安裝 MySQL 5.7.19 二進制版本》

1.下載canal

直接下載 訪問:https://github.com/alibaba/canal/releases,會列出全部歷史的發佈版本包 下載方式,好比以1.0.24版本爲例子:

$ ca /opt
$ wget https://github.com/alibaba/canal/releases/download/canal-1.0.24/canal.deployer-1.0.24.tar.gz

or 本身編譯

$ git clone git@github.com:alibaba/canal.git
$ cd canal; 
$ mvn clean install -Dmaven.test.skip -Denv=release

編譯完成後,會在根目錄下產生target/canal.deployer-$version.tar.gz

2. 解壓縮

$ mkdir /opt/canal
$ tar zxvf canal.deployer-$version.tar.gz  -C /opt/canal

3. 配置修改

應用參數:

$ vi conf/example/instance.properties
#################################################
## mysql serverId
canal.instance.mysql.slaveId = 1234

canal.instance.master.address 須要改爲本身的數據庫信息

canal.instance.master.address = 192.168.252.124:3306 
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =

username/password,須要改爲本身的數據庫信息

canal.instance.dbUsername = canal

canal.instance.dbPassword = canal

canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8

4. 啓動

$ sh bin/startup.sh

5. 查看日誌

$ less logs/canal/canal.log
2017-08-28 16:21:08.945 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2017-08-28 16:21:09.102 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.252.125:11111]
2017-08-28 16:21:10.087 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

具體instance的日誌:

$ less logs/example/example.log
2017-08-28 16:21:09.449 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2017-08-28 16:21:09.460 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2017-08-28 16:21:09.555 [main] WARN  org.springframework.beans.TypeConverterDelegate - PropertyEditor [com.sun.beans.editors.EnumEditor] found through deprecated global PropertyEditorManager fallback - consider using a more isolated form of registration, e.g. on the BeanWrapper/BeanFactory!
2017-08-28 16:21:09.730 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2017-08-28 16:21:09.988 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2017-08-28 16:21:10.061 [destination = example , address = /192.168.252.124:3306 , EventParser] WARN  c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just last position
 {"identity":{"slaveId":-1,"sourceAddress":{"address":"192.168.252.124","port":3306}},"postion":{"included":false,"journalName":"mysql-bin.000002","position":5225,"serverId":1,"timestamp":1503908357000}}

6. 中止

$ sh bin/stop.sh

ClientExample

阿里巴巴 / canal /github 項目首頁

把項目源碼克隆下

$ clone https://github.com/alibaba/canal.git

測試訂閱功能

建立表

DROP TABLE IF EXISTS `test`;
CREATE TABLE `test` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;

導入開發工具: \canal\example ,修改部署Canal 的服務器IP 便可

啓動測試類

package com.alibaba.otter.canal.example;

import java.net.InetSocketAddress;

import org.apache.commons.lang.exception.ExceptionUtils;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;

/**
 * 單機模式的測試例子
 * 
 * @author jianghang 2013-4-15 下午04:19:20
 * @version 1.0.4
 */
public class SimpleCanalClientTest extends AbstractCanalClientTest {

    public SimpleCanalClientTest(String destination){
        super(destination);
    }

    public static void main(String args[]) {
        // 根據ip,直接建立連接,無HA的功能
        String destination = "example";
       // String ip = AddressUtils.getHostIp();

        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.252.125", 11111),
            destination,
            "",
            "");

        final SimpleCanalClientTest clientTest = new SimpleCanalClientTest(destination);
        clientTest.setConnector(connector);
        clientTest.start();
        Runtime.getRuntime().addShutdownHook(new Thread() {

            public void run() {
                try {
                    logger.info("## stop the canal client");
                    clientTest.stop();
                } catch (Throwable e) {
                    logger.warn("##something goes wrong when stopping canal:\n{}", ExceptionUtils.getFullStackTrace(e));
                } finally {
                    logger.info("## canal client is down.");
                }
            }

        });
    }

}

在數據庫。表裏插入,測試訂閱數據,等事物提價了,這邊看是否訂閱成功

響應

****************************************************
* Batch Id: [5] ,count : [3] , memsize : [160] , Time : 2017-08-28 16:53:42
* Start : [mysql-bin.000002:6201:1503910428000(2017-08-28 16:53:48)] 
* End : [mysql-bin.000002:6383:1503910428000(2017-08-28 16:53:48)] 
****************************************************

================> binlog[mysql-bin.000002:6201] , executeTime : 1503910428000 , delay : -5311ms
 BEGIN ----> Thread id: 9
----------------> binlog[mysql-bin.000002:6329] , name[penglei,test] , eventType : INSERT , executeTime : 1503910428000 , delay : -5311ms
id : 8    type=int(11)    update=true
name : 測試訂閱    type=varchar(255)    update=true
----------------
 END ----> transaction id: 106
================> binlog[mysql-bin.000002:6383] , executeTime : 1503910428000 , delay : -5311ms

Contact

  • 做者:鵬磊
  • 出處:http://www.ymq.io
  • Email:admin@souyunku.com
  • 版權歸做者全部,轉載請註明出處
  • Wechat:關注公衆號,搜雲庫,專一於開發技術的研究與知識分享

關注公衆號-搜雲庫

相關文章
相關標籤/搜索