數據同步組件otter環境搭建

1、otter介紹

    部分說明引用阿里otter項目的wiki 。阿里otter工具地址:https://github.com/alibaba/otter/wikijava

    otter爲阿里的一款增量數據同步工具,基於數據庫增量日誌解析,準實時同步到本機房或異地機房的mysql/oracle數據庫. 一個分佈式數據庫同步系統。node

    公司最近須要把線下的數據同步到雲倉,選用了otter這款工具作mysql的增量數據同步,因此花了幾周的時間來趟坑。固然otter也能夠作全量數據同步,不過太過於麻煩,能夠考慮其餘方式先作全量後,再作增量。mysql

                                            otter工做原理git

    

    其餘的不作介紹了,阿里wiki中有更詳細的介紹,這裏主要些搭建步驟和躺過的坑。github

2、環境搭建

一、mysql

    源庫mysql須要開啓binlog,由於otter是基於canal的,而canal是基於binlog的,so,第一步須要MySQL開啓binlog。web

開啓binlog的方法:Linux修改my.cnf文件,Windows修改MySQL的my.ini文件。具體先百度吧spring

二、zookeeper搭建

    zk能夠單擊版也能夠搭建集羣,我搭建的是僞分佈式,沒那麼多機子。具體步驟見個人另外一篇博文。sql

三、otter配置

  a)執行SQL文件:爲otter新建一個名稱爲otter庫(能夠取其餘名字),在下載包中(地址:https://github.com/alibaba/otter/releases)找到otter-manager-schema.sql,執行,而後otter中就有數據了。這個庫主要是又來存儲配置ottermanager的信息和otter運行時的一些數據。數據庫

    b) 修改otter的配置文件:conf/manager.propertiesapache

## otter manager domain name
otter.domainName = 127.0.0.1 # otter提供對外訪問的IP
## otter manager http port
otter.port = 8080   # otter提供的web界面訪問的端口
## jetty web config xml
otter.jetty = jetty.xml

## otter manager database config
otter.database.driver.class.name = com.mysql.jdbc.Driver
otter.database.driver.url = jdbc:mysql://127.0.0.1:3306/otter   #數據庫鏈接
otter.database.driver.username = root  # 用戶名
otter.database.driver.password = Geekplus@2017  密碼

## otter communication port
otter.communication.manager.port = 1099   #管理節點端口

## otter communication pool size
otter.communication.pool.size = 10

## default zookeeper address
otter.zookeeper.cluster.default = 127.0.0.1:2181  # zk地址
## default zookeeper sesstion timeout = 60s
otter.zookeeper.sessionTimeout = 60000

## otter arbitrate connect manager config
otter.manager.address = ${otter.domainName}:${otter.communication.manager.port}

## should run in product mode , true/false
otter.manager.productionMode = true

## self-monitor enable or disable
otter.manager.monitor.self.enable = true
## self-montir interval , default 120s
otter.manager.monitor.self.interval = 120
## auto-recovery paused enable or disable
otter.manager.monitor.recovery.paused = true
# manager email user config 執行中報警信息發送地址的一些配置
otter.manager.monitor.email.host = xxx 
otter.manager.monitor.email.username = xxxx
otter.manager.monitor.email.password = xxx
otter.manager.monitor.email.stmp.port = 465

    配置完成後,等10來秒就能夠在htt://127.0.0.1:8080訪問了,若是要作具體操做須要在右上角登陸

默認用戶名/祕密:admin/admin

四、node管理

    node管理在使用管理中進行配置,應爲node的啓動須要manager配置後才能進行啓動。

3、使用管理

一、機器管理

首先在【機器管理】選項中配置【zookeeper管理】

而後添加一個zk集羣

在【node管理】中添加node信息

保存後剛剛配置的node節點就會出如今【node】管理頁面

列表中的序號就是manager給node節點分配的序號,在配置node節點須要用到。

node節點配置:

    1》配置文件在node包的/conf/otter.properties

# otter node root dir
otter.nodeHome = ${user.dir}/../

## otter node dir
otter.htdocs.dir = ${otter.nodeHome}/htdocs
otter.download.dir = ${otter.nodeHome}/download
otter.extend.dir= ${otter.nodeHome}/extend

## default zookeeper sesstion timeout = 60s
otter.zookeeper.sessionTimeout = 60000

## otter communication pool size
otter.communication.pool.size = 10

## otter arbitrate & node connect manager config
otter.manager.address = 127.0.0.1:1099  #node節點鏈接到manager的地址 IP地址爲manner地址,
# 端口爲在manager配置文件中的otter.communication.manager.port = 1099配置

    2》在conf目錄中增長一個文件 nid,在文件中寫入剛剛manager分配的序號,好比 3。

    作完 步驟一二後node就能夠進行啓動了。

二、配置管理

    在配置管理中進行【數據源配置】【數據表配置】【canal配置】

    【數據源配置】和【數據表配置】比較簡單

    【canal配置】一個canal對應一個數據庫實例

    

三、同步管理

    1)添加一個channel

    

    說明:

    a. 同步一致性

  1. 基於數據庫反查 (簡單點說,就是強制反查數據庫,從binlog中拿到pk,直接反查對應數據庫記錄進行同步,回退到幾天前binlog進行消費時避免同步老版本的數據時可採用)
  2. 基於當前日誌變動 (基於binlog/redolog解析出來的字段變動值進行同步,不作數據庫反查,推薦使用)

    2)爲channel添加pipeline 

    

    

    爲pipeline添加監控

    

    說明:

  • 延遲時間 = 數據庫同步到目標庫成功時間 - 數據庫源庫產生變動時間, 單位秒. (由對應node節點定時推送配置)
  • 最後同步時間 = 數據庫同步到目標庫最近一次的成功時間 (當前同步關注的相關表,同步到目標庫的最後一次成功時間)
  • 最後位點時間 = 數據binlog消費最後一次更新位點的時間 (和同步時間區別:一個數據庫可能存在別的表的變動,不會觸發同步時間變動,但會觸發位點時間變動)

    

    3)配置映射關係

    

    自定義數據處理在後面講

    3)字段映射配置

    

    

4、自定義數據處理

   Extract模塊:

  • EventProcessor : 自定義數據處理,能夠改變一條變動數據的任意內容
  • FileResolver : 解決數據和文件的關聯關係

    目前二者都只支持java語言編寫,但都支持運行時動態編譯&lib包載入的功能。

  1. 經過Otter Manager直接發佈source文件代碼,而後推送到node節點上即時生效,不須要重啓任何java進程,有點動態語言的味道
  2. 能夠將class文件放置到extend目錄或者打成jar包,放置在node啓動classpath中,也能夠經過Otter Manager指定類名的方式進行加載,這樣容許業務徹底自定義。(但有個缺點,若是使用了一些外部包加入到node classpath中,好比遠程接口調用,目前EventProcessor的調用是串行處理,針對串行進行遠程調用執行,效率會比較差. )

        

1.基於EventProcessor實現原理

    目前我使用的是EventProcessor,源碼鏈接:https://github.com/alibaba/otter。

    該模塊位於源碼包的的extract模塊下,全路徑:com.alibaba.otter.node.etl.extract。在項目的配置文件/spring/otter-node-extract.xml中能夠看到以下配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:aop="http://www.springframework.org/schema/aop"
	xmlns:tx="http://www.springframework.org/schema/tx"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd	   http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd	   http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd"
	default-autowire="byName" default-dependency-check="none">

	<bean id="otterExtractorFactory" class="com.alibaba.otter.node.etl.extract.extractor.OtterExtractorFactory" scope="singleton">
		<property name="dbBatchExtractor">
			<list>
				<value>freedomExtractor</value>
				<value>groupExtractor</value>
				<value>databaseExtractor</value>
				<value>processorExtractor</value>
				<value>fileExtractor</value>
				<value>viewExtractor</value>
			</list>
		</property>
	</bean> 
	
	<!-- 池化配置 -->
	<bean id="databaseExtractor" class="org.springframework.aop.framework.ProxyFactoryBean">
		<property name="optimize" value="false"/>
   		<property name="proxyTargetClass" value="true" />
		<property name="targetSource" ref="databaseExtractorTargetSource" />
	</bean>
	<bean id="databaseExtractorTargetSource" class="org.springframework.aop.target.CommonsPoolTargetSource" >
		<property name="minIdle" value="1" />
		<property name="maxSize" value="-1" />
		<property name="timeBetweenEvictionRunsMillis" value="60000" /><!-- 1分鐘進行一次回收 -->
		<property name="minEvictableIdleTimeMillis" value="600000" /><!-- 10分鐘回收空閒的 -->
		<property name="targetBeanName" value="databaseExtractorTarget" />
	</bean>
	<bean id="databaseExtractorTarget" class="com.alibaba.otter.node.etl.extract.extractor.DatabaseExtractor" scope="prototype" >
		<property name="poolSize" value="5" />
	</bean>
	
	<bean id="fileExtractor" class="com.alibaba.otter.node.etl.extract.extractor.FileExtractor" scope="singleton" >
	</bean>
	
	<bean id="freedomExtractor" class="com.alibaba.otter.node.etl.extract.extractor.FreedomExtractor" scope="singleton" >
	</bean>
	
	<bean id="viewExtractor" class="com.alibaba.otter.node.etl.extract.extractor.ViewExtractor" scope="singleton" >
	</bean>
	
	<bean id="groupExtractor" class="com.alibaba.otter.node.etl.extract.extractor.GroupExtractor" scope="singleton" >
	</bean>
	
	<bean id="processorExtractor" class="com.alibaba.otter.node.etl.extract.extractor.ProcessorExtractor" scope="singleton" >
	</bean>
</beans>

    而後轉到OtterExtractorFactory工廠類中,咱們能夠看到工廠類遍歷了dbBatchExtractor,而後移除執行otterExtractor.extract(dbBatch)方法來處理一批數據

public void extract(DbBatch dbBatch) {
        Assert.notNull(dbBatch);
        for (Object extractor : dbBatchExtractor) {
            OtterExtractor otterExtractor = null;
            if (extractor instanceof java.lang.String) {
                // 每次從容器中取一次,有作池化處理
                otterExtractor = (OtterExtractor) beanFactory.getBean((String) extractor, OtterExtractor.class);
            } else {
                otterExtractor = (OtterExtractor) extractor;
            }

            otterExtractor.extract(dbBatch);
        }
    }

    轉到XML的配置中

<bean id="otterExtractorFactory" class="com.alibaba.otter.node.etl.extract.extractor.OtterExtractorFactory" scope="singleton">
		<property name="dbBatchExtractor">
			<list>
				<value>freedomExtractor</value>  <!--自由門提取器-->
				<value>groupExtractor</value>    <!--這個不知道是什麼,感受是數據表配置中的那個group-->
				<value>databaseExtractor</value>    <!--執行數據反查,包括了自由門數據和配置了基於表反查的數據須要走這裏-->
				<value>processorExtractor</value>    <!--自定義提取器,這是咱們自定義數據處理的入口-->
				<value>fileExtractor</value>    <!--文件提取器,這個我永不着-->
				<value>viewExtractor</value>    <!--這個不知道是幹嗎的-->
			</list>
		</property>
	</bean>

這裏定義了全部的Extractor,咱們的自定義數據處理在processorExtractor中執行。

2.數據自定義的實現

    步驟:添加依賴包-》繼承類AbstractEventProcessor-》編寫業務代碼-》部署(我是)

    列舉一個實現:

導入添加maven依賴的兩個包:

<!-- https://mvnrepository.com/artifact/com.alibaba.otter/shared.etl -->
		<dependency>
			<groupId>com.alibaba.otter</groupId>
			<artifactId>shared.etl</artifactId>
			<version>4.2.15</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/com.alibaba.otter/node.extend -->
		<dependency>
			<groupId>com.alibaba.otter</groupId>
			<artifactId>node.extend</artifactId>
			<version>4.2.15</version>
		</dependency>

實現:

package com.geekplus.db.yugong.foshan;

import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.otter.node.extend.processor.AbstractEventProcessor;
import com.alibaba.otter.shared.etl.model.EventColumn;
import com.alibaba.otter.shared.etl.model.EventData;
import com.alibaba.otter.shared.etl.model.EventType;

public class PrimaryKeyTransfer extends AbstractEventProcessor {
    private final static Logger logger = LoggerFactory.getLogger(PrimaryKeyTransfer.class);

    /***
     * 描述:每次處理一行數據
     */
    public boolean process(EventData eventData) {
        boolean isHandle = true;
        if (eventData == null)
            return isHandle;
        logger.info("eventdata ,tableName:{},tableID:{},db:{},ddlTable:{}", eventData.getTableId(),
                eventData.getTableName(), eventData.getSchemaName(), eventData.getDdlSchemaName());
        if (StringUtils.equals("out_order_back", eventData.getTableName())
                || StringUtils.equals("out_order_back", eventData.getTableName())) {
            return parseOrder(eventData);
        } else if (StringUtils.equals("out_order_pkg", eventData.getTableName())
                || StringUtils.equals("out_order_pkg", eventData.getTableName())) {
            return parsePkg(eventData);
        } else if (StringUtils.equals("t_base_warehouse", eventData.getTableName())
                || StringUtils.equals("t_base_warehouse", eventData.getTableName())) {
            return parseWarehouse(eventData);
        } else if (StringUtils.equals("t_base_customer", eventData.getTableName())
                || StringUtils.equals("t_base_owner", eventData.getTableName())) {
            return parseCustomer(eventData);
        } else if (StringUtils.equals("t_base_carrier_info", eventData.getTableName())
                || StringUtils.equals("t_base_carrier_info", eventData.getTableName())) {
            return parseCarrier(eventData);
        }
        return isHandle;
    }

    private EventColumn findColumn(List<EventColumn> columns, String columnName) {
        for (EventColumn column : columns) {
            if (StringUtils.equals(columnName, column.getColumnName())) {
                return column;
            }
        }
        return null;
    }

    private boolean parseOrder(EventData eventData) {
        EventColumn normalWarehouseCl = findColumn(eventData.getColumns(), "warehouse_code");
        if (normalWarehouseCl == null)
            return false;
        eventData.getKeys().add(normalWarehouseCl);
        eventData.getColumns().remove(normalWarehouseCl);
        //處理Insert
        if (EventType.INSERT.equals(eventData.getEventType())) {

        }
        //處理update
        if (EventType.UPDATE.equals(eventData.getEventType())) {
            // 若是存在主鍵的變動
            if (CollectionUtils.isNotEmpty(eventData.getOldKeys())) {
                eventData.getOldKeys().add(normalWarehouseCl.clone());
            }
        }
        //處理delete
        if (EventType.DELETE.equals(eventData.getEventType())) {

        }
        return true;
    }

    private boolean parsePkg(EventData eventData) {
        if (EventType.INSERT.equals(eventData.getEventType())) {
            return true;
        }
        if (EventType.UPDATE.equals(eventData.getEventType())) {
            EventColumn normalOrderCodeCl = findColumn(eventData.getKeys(), "out_order_code");
            EventColumn normalWarehouseCl = findColumn(eventData.getKeys(), "warehouse_code");
            if (normalOrderCodeCl == null || normalWarehouseCl == null)
                return false;
            eventData.getColumns().clear();
            moveDate(eventData.getKeys(), eventData.getColumns());
            eventData.getKeys().clear();
            eventData.getOldKeys().clear();
            // keys 和 oldkeys必須有順序
            eventData.getKeys().add(normalOrderCodeCl);
            eventData.getKeys().add(normalWarehouseCl);
            eventData.getOldKeys().add(normalOrderCodeCl);
            eventData.getOldKeys().add(normalWarehouseCl);
            return true;
        }
        if (EventType.DELETE.equals(eventData.getEventType())) {
            return true;
        }
        return false;
    }

    private void moveDate(List<EventColumn> source, List<EventColumn> dest) {
        for (EventColumn srcC : source) {
            boolean exist = false;
            for (EventColumn destC : dest) {
                if (StringUtils.equals(destC.getColumnName(), srcC.getColumnName())) {
                    exist = true;
                    break;
                }
            }
            if (!exist) {
                dest.add(srcC);
            }
        }
    }

    private boolean parseWarehouse(EventData eventData) {
        EventColumn normalWarehouseCl = findColumn(eventData.getColumns(), "warehouse_code");
        if (normalWarehouseCl == null)
            return false;
        eventData.getColumns().remove(normalWarehouseCl);
        eventData.getKeys().add(normalWarehouseCl);
        if (EventType.INSERT.equals(eventData.getEventType())) {

        }
        if (EventType.UPDATE.equals(eventData.getEventType())) {
            if (CollectionUtils.isNotEmpty(eventData.getOldKeys())) {
                eventData.getOldKeys().add(normalWarehouseCl.clone());
            }
        }
        if (EventType.DELETE.equals(eventData.getEventType())) {

        }
        return true;
    }

    private boolean parseCustomer(EventData eventData) {
        if (EventType.INSERT.equals(eventData.getEventType())) {

        }
        if (EventType.UPDATE.equals(eventData.getEventType())) {
            EventColumn pk = findColumn(eventData.getKeys(), "pk_t_base_customer_code");
            eventData.getOldKeys().add(pk);
        }
        if (EventType.DELETE.equals(eventData.getEventType())) {

        }
        return true;
    }

    private boolean parseCarrier(EventData eventData) {
        EventColumn carrierCodeCl = findColumn(eventData.getColumns(), "carrier_code");
        if (carrierCodeCl == null)
            return false;
        eventData.getColumns().remove(carrierCodeCl);
        eventData.getKeys().add(carrierCodeCl);
        if (EventType.INSERT.equals(eventData.getEventType())) {

        }
        if (EventType.UPDATE.equals(eventData.getEventType())) {
            if (CollectionUtils.isNotEmpty(eventData.getOldKeys())) {
                eventData.getOldKeys().add(carrierCodeCl.clone());
            }
        }
        if (EventType.DELETE.equals(eventData.getEventType())) {

        }
        return true;
    }
}

以後將打好的jar包放到每一個節點的lib目錄下,而後重啓節點時更改生效。

 

重點類了

5、自定義數據同步(自 由 門)

    主要功能是在不修改原始表數據的前提下,觸發一下數據表中的數據同步。 

可用於: 

  • 同步數據訂正
  • 全量數據同步. (自 由 門觸發全量,同時otter增量同步,須要配置爲行記錄模式,避免update時因目標庫不存在記錄而丟失update操做)

1.原理

主要原理:

a. 基於otter系統表retl_buffer,插入特定的數據,包含須要同步的表名,pk信息。

b. otter系統感知後會根據表名和pk提取對應的數據(整行記錄),和正常的增量同步一塊兒同步到目標庫。

目前otter系統感知的自 由 門數據方式爲:

  • 日誌記錄. (插入表數據的每次變動,須要開啓binlog,otter獲取binlog數據,提取同步的表名,pk信息,而後回表查詢整行記錄)

須要注意的是otter的自由門是對源庫有侵入性的,須要增長一個retl庫和一個密碼爲retl的retl用戶,執行的SQL爲:

/*
供 otter 使用, otter 須要對 retl.* 的讀寫權限,以及對業務表的讀寫權限
1. 建立database retl
*/
CREATE DATABASE retl;

/* 2. 用戶受權 給同步用戶受權 */
CREATE USER retl@'%' IDENTIFIED BY 'retl';
GRANT USAGE ON *.* TO `retl`@'%';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO `retl`@'%';
GRANT SELECT, INSERT, UPDATE, DELETE, EXECUTE ON `retl`.* TO `retl`@'%';
/* 業務表受權,這裏能夠限定只受權同步業務的表 */
GRANT SELECT, INSERT, UPDATE, DELETE ON *.* TO `retl`@'%';  

/* 3. 建立系統表 */
USE retl;
DROP TABLE IF EXISTS retl.retl_buffer;
DROP TABLE IF EXISTS retl.retl_mark;
DROP TABLE IF EXISTS retl.xdual;

CREATE TABLE retl_buffer
(	
	ID BIGINT(20) AUTO_INCREMENT,
	TABLE_ID INT(11) NOT NULL,
	FULL_NAME varchar(512),
	TYPE CHAR(1) NOT NULL,
	PK_DATA VARCHAR(256) NOT NULL,
	GMT_CREATE TIMESTAMP NOT NULL,
	GMT_MODIFIED TIMESTAMP NOT NULL,
	CONSTRAINT RETL_BUFFER_ID PRIMARY KEY (ID) 
)  ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE retl_mark
(	
	ID BIGINT AUTO_INCREMENT,
	CHANNEL_ID INT(11),
	CHANNEL_INFO varchar(128),
	CONSTRAINT RETL_MARK_ID PRIMARY KEY (ID) 
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

CREATE TABLE xdual (
  ID BIGINT(20) NOT NULL AUTO_INCREMENT,
  X timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (ID)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

/* 4. 插入初始化數據 */
INSERT INTO retl.xdual(id, x) VALUES (1,now()) ON DUPLICATE KEY UPDATE x = now();

retl_buffer表結構:

CREATE TABLE retl_buffer 
   (    
    ID BIGINT AUTO_INCREMENT,   ## 無心義,自增便可
    TABLE_ID INT(11) NOT NULL,   ## tableId, 可經過該連接查詢:http://otter.alibaba-inc.com/data_media_list.htm,即序號這一列,若是配置的是正則,須要指定full_name,當前table_id設置爲0. 
    FULL_NAME varchar(512),  ## schemaName + '.' +  tableName  (若是明確指定了table_id,能夠不用指定full_name)
    TYPE CHAR(1) NOT NULL,   ## I/U/D ,分別對應於insert/update/delete
    PK_DATA VARCHAR(256) NOT NULL, ## 多個pk之間使用char(1)進行分隔
    GMT_CREATE TIMESTAMP NOT NULL, ## 無心義,系統時間便可
    GMT_MODIFIED TIMESTAMP NOT NULL,  ## 無心義,系統時間便可
    CONSTRAINT RETL_BUFFER_ID PRIMARY KEY (ID) 
   )  ENGINE=InnoDB DEFAULT CHARSET=utf8;

2.操做

全量同步操做示例:

insert into retl.retl_buffer(ID,TABLE_ID, FULL_NAME,TYPE,PK_DATA,GMT_CREATE,GMT_MODIFIED) (select null,0,'$schema.table$','I',id,now(),now() from $schema.table$);

若是針對多主鍵時,對應的PK_DATA須要將須要同步表幾個主鍵按照(char)1進行拼接,好比 

insert into `retl`.`retl_buffer` ( `TABLE_ID`, `FULL_NAME`, `TYPE`, `PK_DATA`, `GMT_CREATE`, `GMT_MODIFIED`) values ( '0', 'test.t_base_warehouse', 'I', '20', '2018-01-19 15:00:57', '2018-01-19 10:34:00');

下面舉個實際的列子:

insert into `retl`.`retl_buffer` ( `TABLE_ID`, `FULL_NAME`, `TYPE`, `PK_DATA`, `GMT_CREATE`, `GMT_MODIFIED`) values ( '0', 'test.t_base_warehouse', 'I', '20', '2018-01-19 15:00:57', '2018-01-19 10:34:00');

TABLE_ID FULL_NAME TYPE PK_DATA GMT_CREATE GMT_MODIFIED
自增不用填 源庫名.表名 I/U/D ,分別對應於insert/update/delete 主鍵的值,注意是值而不是主鍵名稱,須要同步幾條數據就須要增長几條記錄 使用列子就可 使用列子就可

完結,後面抽時間在解讀下L階段,otter是如何在目標庫拼裝SQL的。

相關文章
相關標籤/搜索