部分說明引用阿里otter項目的wiki 。阿里otter工具地址:https://github.com/alibaba/otter/wikijava
otter爲阿里的一款增量數據同步工具,基於數據庫增量日誌解析,準實時同步到本機房或異地機房的mysql/oracle數據庫. 一個分佈式數據庫同步系統。node
公司最近須要把線下的數據同步到雲倉,選用了otter這款工具作mysql的增量數據同步,因此花了幾周的時間來趟坑。固然otter也能夠作全量數據同步,不過太過於麻煩,能夠考慮其餘方式先作全量後,再作增量。mysql
otter工做原理git
其餘的不作介紹了,阿里wiki中有更詳細的介紹,這裏主要些搭建步驟和躺過的坑。github
源庫mysql須要開啓binlog,由於otter是基於canal的,而canal是基於binlog的,so,第一步須要MySQL開啓binlog。web
開啓binlog的方法:Linux修改my.cnf文件,Windows修改MySQL的my.ini文件。具體先百度吧spring
zk能夠單擊版也能夠搭建集羣,我搭建的是僞分佈式,沒那麼多機子。具體步驟見個人另外一篇博文。sql
a)執行SQL文件:爲otter新建一個名稱爲otter庫(能夠取其餘名字),在下載包中(地址:https://github.com/alibaba/otter/releases)找到otter-manager-schema.sql,執行,而後otter中就有數據了。這個庫主要是又來存儲配置ottermanager的信息和otter運行時的一些數據。數據庫
b) 修改otter的配置文件:conf/manager.propertiesapache
## otter manager domain name otter.domainName = 127.0.0.1 # otter提供對外訪問的IP ## otter manager http port otter.port = 8080 # otter提供的web界面訪問的端口 ## jetty web config xml otter.jetty = jetty.xml ## otter manager database config otter.database.driver.class.name = com.mysql.jdbc.Driver otter.database.driver.url = jdbc:mysql://127.0.0.1:3306/otter #數據庫鏈接 otter.database.driver.username = root # 用戶名 otter.database.driver.password = Geekplus@2017 密碼 ## otter communication port otter.communication.manager.port = 1099 #管理節點端口 ## otter communication pool size otter.communication.pool.size = 10 ## default zookeeper address otter.zookeeper.cluster.default = 127.0.0.1:2181 # zk地址 ## default zookeeper sesstion timeout = 60s otter.zookeeper.sessionTimeout = 60000 ## otter arbitrate connect manager config otter.manager.address = ${otter.domainName}:${otter.communication.manager.port} ## should run in product mode , true/false otter.manager.productionMode = true ## self-monitor enable or disable otter.manager.monitor.self.enable = true ## self-montir interval , default 120s otter.manager.monitor.self.interval = 120 ## auto-recovery paused enable or disable otter.manager.monitor.recovery.paused = true # manager email user config 執行中報警信息發送地址的一些配置 otter.manager.monitor.email.host = xxx otter.manager.monitor.email.username = xxxx otter.manager.monitor.email.password = xxx otter.manager.monitor.email.stmp.port = 465
配置完成後,等10來秒就能夠在htt://127.0.0.1:8080訪問了,若是要作具體操做須要在右上角登陸
默認用戶名/祕密:admin/admin
node管理在使用管理中進行配置,應爲node的啓動須要manager配置後才能進行啓動。
首先在【機器管理】選項中配置【zookeeper管理】
而後添加一個zk集羣
在【node管理】中添加node信息
保存後剛剛配置的node節點就會出如今【node】管理頁面
列表中的序號就是manager給node節點分配的序號,在配置node節點須要用到。
node節點配置:
1》配置文件在node包的/conf/otter.properties
# otter node root dir otter.nodeHome = ${user.dir}/../ ## otter node dir otter.htdocs.dir = ${otter.nodeHome}/htdocs otter.download.dir = ${otter.nodeHome}/download otter.extend.dir= ${otter.nodeHome}/extend ## default zookeeper sesstion timeout = 60s otter.zookeeper.sessionTimeout = 60000 ## otter communication pool size otter.communication.pool.size = 10 ## otter arbitrate & node connect manager config otter.manager.address = 127.0.0.1:1099 #node節點鏈接到manager的地址 IP地址爲manner地址, # 端口爲在manager配置文件中的otter.communication.manager.port = 1099配置
2》在conf目錄中增長一個文件 nid,在文件中寫入剛剛manager分配的序號,好比 3。
作完 步驟一二後node就能夠進行啓動了。
在配置管理中進行【數據源配置】【數據表配置】【canal配置】
【數據源配置】和【數據表配置】比較簡單
【canal配置】一個canal對應一個數據庫實例
1)添加一個channel
說明:
a. 同步一致性
2)爲channel添加pipeline
爲pipeline添加監控
說明:
3)配置映射關係
自定義數據處理在後面講
3)字段映射配置
Extract模塊:
目前二者都只支持java語言編寫,但都支持運行時動態編譯&lib包載入的功能。
目前我使用的是EventProcessor,源碼鏈接:https://github.com/alibaba/otter。
該模塊位於源碼包的的extract模塊下,全路徑:com.alibaba.otter.node.etl.extract。在項目的配置文件/spring/otter-node-extract.xml中能夠看到以下配置:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd" default-autowire="byName" default-dependency-check="none"> <bean id="otterExtractorFactory" class="com.alibaba.otter.node.etl.extract.extractor.OtterExtractorFactory" scope="singleton"> <property name="dbBatchExtractor"> <list> <value>freedomExtractor</value> <value>groupExtractor</value> <value>databaseExtractor</value> <value>processorExtractor</value> <value>fileExtractor</value> <value>viewExtractor</value> </list> </property> </bean> <!-- 池化配置 --> <bean id="databaseExtractor" class="org.springframework.aop.framework.ProxyFactoryBean"> <property name="optimize" value="false"/> <property name="proxyTargetClass" value="true" /> <property name="targetSource" ref="databaseExtractorTargetSource" /> </bean> <bean id="databaseExtractorTargetSource" class="org.springframework.aop.target.CommonsPoolTargetSource" > <property name="minIdle" value="1" /> <property name="maxSize" value="-1" /> <property name="timeBetweenEvictionRunsMillis" value="60000" /><!-- 1分鐘進行一次回收 --> <property name="minEvictableIdleTimeMillis" value="600000" /><!-- 10分鐘回收空閒的 --> <property name="targetBeanName" value="databaseExtractorTarget" /> </bean> <bean id="databaseExtractorTarget" class="com.alibaba.otter.node.etl.extract.extractor.DatabaseExtractor" scope="prototype" > <property name="poolSize" value="5" /> </bean> <bean id="fileExtractor" class="com.alibaba.otter.node.etl.extract.extractor.FileExtractor" scope="singleton" > </bean> <bean id="freedomExtractor" class="com.alibaba.otter.node.etl.extract.extractor.FreedomExtractor" scope="singleton" > </bean> <bean id="viewExtractor" class="com.alibaba.otter.node.etl.extract.extractor.ViewExtractor" scope="singleton" > </bean> <bean id="groupExtractor" class="com.alibaba.otter.node.etl.extract.extractor.GroupExtractor" scope="singleton" > </bean> <bean id="processorExtractor" class="com.alibaba.otter.node.etl.extract.extractor.ProcessorExtractor" scope="singleton" > </bean> </beans>
而後轉到OtterExtractorFactory工廠類中,咱們能夠看到工廠類遍歷了dbBatchExtractor,而後移除執行otterExtractor.extract(dbBatch)方法來處理一批數據
public void extract(DbBatch dbBatch) { Assert.notNull(dbBatch); for (Object extractor : dbBatchExtractor) { OtterExtractor otterExtractor = null; if (extractor instanceof java.lang.String) { // 每次從容器中取一次,有作池化處理 otterExtractor = (OtterExtractor) beanFactory.getBean((String) extractor, OtterExtractor.class); } else { otterExtractor = (OtterExtractor) extractor; } otterExtractor.extract(dbBatch); } }
轉到XML的配置中
<bean id="otterExtractorFactory" class="com.alibaba.otter.node.etl.extract.extractor.OtterExtractorFactory" scope="singleton"> <property name="dbBatchExtractor"> <list> <value>freedomExtractor</value> <!--自由門提取器--> <value>groupExtractor</value> <!--這個不知道是什麼,感受是數據表配置中的那個group--> <value>databaseExtractor</value> <!--執行數據反查,包括了自由門數據和配置了基於表反查的數據須要走這裏--> <value>processorExtractor</value> <!--自定義提取器,這是咱們自定義數據處理的入口--> <value>fileExtractor</value> <!--文件提取器,這個我永不着--> <value>viewExtractor</value> <!--這個不知道是幹嗎的--> </list> </property> </bean>
這裏定義了全部的Extractor,咱們的自定義數據處理在processorExtractor中執行。
步驟:添加依賴包-》繼承類AbstractEventProcessor-》編寫業務代碼-》部署(我是)
列舉一個實現:
導入添加maven依賴的兩個包:
<!-- https://mvnrepository.com/artifact/com.alibaba.otter/shared.etl --> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>shared.etl</artifactId> <version>4.2.15</version> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba.otter/node.extend --> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>node.extend</artifactId> <version>4.2.15</version> </dependency>
實現:
package com.geekplus.db.yugong.foshan; import java.util.List; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.otter.node.extend.processor.AbstractEventProcessor; import com.alibaba.otter.shared.etl.model.EventColumn; import com.alibaba.otter.shared.etl.model.EventData; import com.alibaba.otter.shared.etl.model.EventType; public class PrimaryKeyTransfer extends AbstractEventProcessor { private final static Logger logger = LoggerFactory.getLogger(PrimaryKeyTransfer.class); /*** * 描述:每次處理一行數據 */ public boolean process(EventData eventData) { boolean isHandle = true; if (eventData == null) return isHandle; logger.info("eventdata ,tableName:{},tableID:{},db:{},ddlTable:{}", eventData.getTableId(), eventData.getTableName(), eventData.getSchemaName(), eventData.getDdlSchemaName()); if (StringUtils.equals("out_order_back", eventData.getTableName()) || StringUtils.equals("out_order_back", eventData.getTableName())) { return parseOrder(eventData); } else if (StringUtils.equals("out_order_pkg", eventData.getTableName()) || StringUtils.equals("out_order_pkg", eventData.getTableName())) { return parsePkg(eventData); } else if (StringUtils.equals("t_base_warehouse", eventData.getTableName()) || StringUtils.equals("t_base_warehouse", eventData.getTableName())) { return parseWarehouse(eventData); } else if (StringUtils.equals("t_base_customer", eventData.getTableName()) || StringUtils.equals("t_base_owner", eventData.getTableName())) { return parseCustomer(eventData); } else if (StringUtils.equals("t_base_carrier_info", eventData.getTableName()) || StringUtils.equals("t_base_carrier_info", eventData.getTableName())) { return parseCarrier(eventData); } return isHandle; } private EventColumn findColumn(List<EventColumn> columns, String columnName) { for (EventColumn column : columns) { if (StringUtils.equals(columnName, column.getColumnName())) { return column; } } return null; } private boolean parseOrder(EventData eventData) { EventColumn normalWarehouseCl = findColumn(eventData.getColumns(), "warehouse_code"); if (normalWarehouseCl == null) return false; eventData.getKeys().add(normalWarehouseCl); eventData.getColumns().remove(normalWarehouseCl); //處理Insert if (EventType.INSERT.equals(eventData.getEventType())) { } //處理update if (EventType.UPDATE.equals(eventData.getEventType())) { // 若是存在主鍵的變動 if (CollectionUtils.isNotEmpty(eventData.getOldKeys())) { eventData.getOldKeys().add(normalWarehouseCl.clone()); } } //處理delete if (EventType.DELETE.equals(eventData.getEventType())) { } return true; } private boolean parsePkg(EventData eventData) { if (EventType.INSERT.equals(eventData.getEventType())) { return true; } if (EventType.UPDATE.equals(eventData.getEventType())) { EventColumn normalOrderCodeCl = findColumn(eventData.getKeys(), "out_order_code"); EventColumn normalWarehouseCl = findColumn(eventData.getKeys(), "warehouse_code"); if (normalOrderCodeCl == null || normalWarehouseCl == null) return false; eventData.getColumns().clear(); moveDate(eventData.getKeys(), eventData.getColumns()); eventData.getKeys().clear(); eventData.getOldKeys().clear(); // keys 和 oldkeys必須有順序 eventData.getKeys().add(normalOrderCodeCl); eventData.getKeys().add(normalWarehouseCl); eventData.getOldKeys().add(normalOrderCodeCl); eventData.getOldKeys().add(normalWarehouseCl); return true; } if (EventType.DELETE.equals(eventData.getEventType())) { return true; } return false; } private void moveDate(List<EventColumn> source, List<EventColumn> dest) { for (EventColumn srcC : source) { boolean exist = false; for (EventColumn destC : dest) { if (StringUtils.equals(destC.getColumnName(), srcC.getColumnName())) { exist = true; break; } } if (!exist) { dest.add(srcC); } } } private boolean parseWarehouse(EventData eventData) { EventColumn normalWarehouseCl = findColumn(eventData.getColumns(), "warehouse_code"); if (normalWarehouseCl == null) return false; eventData.getColumns().remove(normalWarehouseCl); eventData.getKeys().add(normalWarehouseCl); if (EventType.INSERT.equals(eventData.getEventType())) { } if (EventType.UPDATE.equals(eventData.getEventType())) { if (CollectionUtils.isNotEmpty(eventData.getOldKeys())) { eventData.getOldKeys().add(normalWarehouseCl.clone()); } } if (EventType.DELETE.equals(eventData.getEventType())) { } return true; } private boolean parseCustomer(EventData eventData) { if (EventType.INSERT.equals(eventData.getEventType())) { } if (EventType.UPDATE.equals(eventData.getEventType())) { EventColumn pk = findColumn(eventData.getKeys(), "pk_t_base_customer_code"); eventData.getOldKeys().add(pk); } if (EventType.DELETE.equals(eventData.getEventType())) { } return true; } private boolean parseCarrier(EventData eventData) { EventColumn carrierCodeCl = findColumn(eventData.getColumns(), "carrier_code"); if (carrierCodeCl == null) return false; eventData.getColumns().remove(carrierCodeCl); eventData.getKeys().add(carrierCodeCl); if (EventType.INSERT.equals(eventData.getEventType())) { } if (EventType.UPDATE.equals(eventData.getEventType())) { if (CollectionUtils.isNotEmpty(eventData.getOldKeys())) { eventData.getOldKeys().add(carrierCodeCl.clone()); } } if (EventType.DELETE.equals(eventData.getEventType())) { } return true; } }
以後將打好的jar包放到每一個節點的lib目錄下,而後重啓節點時更改生效。
重點類了
主要功能是在不修改原始表數據的前提下,觸發一下數據表中的數據同步。
可用於:
主要原理:
a. 基於otter系統表retl_buffer,插入特定的數據,包含須要同步的表名,pk信息。
b. otter系統感知後會根據表名和pk提取對應的數據(整行記錄),和正常的增量同步一塊兒同步到目標庫。
目前otter系統感知的自 由 門數據方式爲:
須要注意的是otter的自由門是對源庫有侵入性的,須要增長一個retl庫和一個密碼爲retl的retl用戶,執行的SQL爲:
/* 供 otter 使用, otter 須要對 retl.* 的讀寫權限,以及對業務表的讀寫權限 1. 建立database retl */ CREATE DATABASE retl; /* 2. 用戶受權 給同步用戶受權 */ CREATE USER retl@'%' IDENTIFIED BY 'retl'; GRANT USAGE ON *.* TO `retl`@'%'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO `retl`@'%'; GRANT SELECT, INSERT, UPDATE, DELETE, EXECUTE ON `retl`.* TO `retl`@'%'; /* 業務表受權,這裏能夠限定只受權同步業務的表 */ GRANT SELECT, INSERT, UPDATE, DELETE ON *.* TO `retl`@'%'; /* 3. 建立系統表 */ USE retl; DROP TABLE IF EXISTS retl.retl_buffer; DROP TABLE IF EXISTS retl.retl_mark; DROP TABLE IF EXISTS retl.xdual; CREATE TABLE retl_buffer ( ID BIGINT(20) AUTO_INCREMENT, TABLE_ID INT(11) NOT NULL, FULL_NAME varchar(512), TYPE CHAR(1) NOT NULL, PK_DATA VARCHAR(256) NOT NULL, GMT_CREATE TIMESTAMP NOT NULL, GMT_MODIFIED TIMESTAMP NOT NULL, CONSTRAINT RETL_BUFFER_ID PRIMARY KEY (ID) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; CREATE TABLE retl_mark ( ID BIGINT AUTO_INCREMENT, CHANNEL_ID INT(11), CHANNEL_INFO varchar(128), CONSTRAINT RETL_MARK_ID PRIMARY KEY (ID) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; CREATE TABLE xdual ( ID BIGINT(20) NOT NULL AUTO_INCREMENT, X timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (ID) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; /* 4. 插入初始化數據 */ INSERT INTO retl.xdual(id, x) VALUES (1,now()) ON DUPLICATE KEY UPDATE x = now();
retl_buffer表結構:
CREATE TABLE retl_buffer ( ID BIGINT AUTO_INCREMENT, ## 無心義,自增便可 TABLE_ID INT(11) NOT NULL, ## tableId, 可經過該連接查詢:http://otter.alibaba-inc.com/data_media_list.htm,即序號這一列,若是配置的是正則,須要指定full_name,當前table_id設置爲0. FULL_NAME varchar(512), ## schemaName + '.' + tableName (若是明確指定了table_id,能夠不用指定full_name) TYPE CHAR(1) NOT NULL, ## I/U/D ,分別對應於insert/update/delete PK_DATA VARCHAR(256) NOT NULL, ## 多個pk之間使用char(1)進行分隔 GMT_CREATE TIMESTAMP NOT NULL, ## 無心義,系統時間便可 GMT_MODIFIED TIMESTAMP NOT NULL, ## 無心義,系統時間便可 CONSTRAINT RETL_BUFFER_ID PRIMARY KEY (ID) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
全量同步操做示例:
insert into retl.retl_buffer(ID,TABLE_ID, FULL_NAME,TYPE,PK_DATA,GMT_CREATE,GMT_MODIFIED) (select null,0,'$schema.table$','I',id,now(),now() from $schema.table$);
若是針對多主鍵時,對應的PK_DATA須要將須要同步表幾個主鍵按照(char)1進行拼接,好比
insert into `retl`.`retl_buffer` ( `TABLE_ID`, `FULL_NAME`, `TYPE`, `PK_DATA`, `GMT_CREATE`, `GMT_MODIFIED`) values ( '0', 'test.t_base_warehouse', 'I', '20', '2018-01-19 15:00:57', '2018-01-19 10:34:00');
下面舉個實際的列子:
insert into `retl`.`retl_buffer` ( `TABLE_ID`, `FULL_NAME`, `TYPE`, `PK_DATA`, `GMT_CREATE`, `GMT_MODIFIED`) values ( '0', 'test.t_base_warehouse', 'I', '20', '2018-01-19 15:00:57', '2018-01-19 10:34:00');
TABLE_ID | FULL_NAME | TYPE | PK_DATA | GMT_CREATE | GMT_MODIFIED |
自增不用填 | 源庫名.表名 | I/U/D ,分別對應於insert/update/delete | 主鍵的值,注意是值而不是主鍵名稱,須要同步幾條數據就須要增長几條記錄 | 使用列子就可 | 使用列子就可 |
完結,後面抽時間在解讀下L階段,otter是如何在目標庫拼裝SQL的。