最近在工做中須要處理一些大數據量同步的場景,正好運用到了canal這款數據庫中間件,所以特地花了點時間來進行該中間件的的學習和總結。前端
早期,阿里巴巴B2B公司由於存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的數據庫同步業務,主要是基於trigger的方式獲取增量變動,不過從2010年開始,阿里系公司開始逐步的嘗試基於數據庫的日誌解析,獲取增量變動進行同步,由此衍生出了增量訂閱&消費的業務,今後開啓了一段新紀元。java
支持mysql5.7及如下版本mysql
master將數據記錄到了binlog日誌裏面,而後slave會經過一個io線程去讀取master那邊指定位置點開始的binlog日誌內容,並將相應的信息寫會到slave這邊的relay日誌裏面,最後slave會有單獨的sql線程來讀取這些master那邊執行的sql語句記錄,達成兩端的數據同步。git
傳統的mysql主從同步實現的原理圖以下所示:github
基於純java語言開發,能夠用於作增量數據訂閱和消費功能。spring
相比於傳統的數據同步,咱們一般須要進行先搭建主從架構,而後使用binlog日誌進行讀取,而後指定須要同步的數據庫,數據庫表等信息。可是隨着咱們業務的不斷複雜,這種傳統的數據同步方式以及開始變得較爲繁瑣,不夠靈活。sql
canal模擬mysql slave的交互協議,假裝本身爲mysql slave,向mysql master發送dump協議mysql master收到dump請求,開始推送binary log給slave(也就是canal),canal解析binary log對象(原始爲byte流),經過對binlog數據進行解析便可獲取須要同步的數據,在進行同步數據的過程當中還能夠加入開發人員的一些額外邏輯處理,比較開放。數據庫
Binlog的三種基本類型分別爲:apache
STATEMENT模式只記錄了sql語句,可是沒有記錄上下文信息,在進行數據恢復的時候可能會致使數據的丟失狀況api
ROW模式除了記錄sql語句以外,還會記錄每一個字段的變化狀況,可以清楚的記錄每行數據的變化歷史,可是會佔用較多的空間,須要使用mysqlbinlog工具進行查看。
MIX模式比較靈活的記錄,例如說當遇到了表結構變動的時候,就會記錄爲statement模式。當遇到了數據更新或者刪除狀況下就會變爲row模式
須要先登陸mysql數據庫,檢查binlog功能是否有開啓。
mysql> show variables like 'log_bin'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin | OFF | +---------------+-------+ 1 row in set (0.00 sec)
若是顯示狀態爲OFF表示該功能未開啓,那麼這個時候就須要到my.ini裏面進行相關配置了,在原來的my.ini配置底部插入如下內容:
server-id=192 log-bin=mysql-bin binlog_format = ROW
當再次經過客戶端查看log_bin狀態爲ON的時候,就表示binlog已經開啓:
mysql> show variables like 'log_bin'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin | ON | +---------------+-------+ 1 row in set (0.00 sec)
而後在mysql裏面添加如下的相關用戶和權限:
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal'; GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
開啓以後,咱們能夠前往canal的官方地址進行相應版本的安裝包進行下載:
https://github.com/alibaba/canal/releases
下載好指定的版本以後,找到裏面的bin目錄底下的startup腳本,啓動。
啓動以後會發現黑窗中止在這樣一行的內容上,而後就不動了
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0 Listening for transport dt_socket at address: 9099
這時候須要前往日誌文件夾底下canallogs,查看canal日誌文件是否已經開啓,若是顯示如下內容,就表示啓動已經成功
2019-05-06 10:41:56.116 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler 2019-05-06 10:41:56.144 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations 2019-05-06 10:41:56.145 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server. 2019-05-06 10:41:56.233 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.164.1:11111] 2019-05-06 10:41:58.179 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now .....
canal server的默認端口號爲:11111,若是須要調整的話,能夠去到conf目錄底下的canal.properties文件中進行修改。
啓動了canal的server以後,即是基於java的客戶端搭建了。
首先在canalconf目錄底下建立一個獨立的文件夾(文件命名 idea_user_data),用於作額外的數據源配置:
而後建立一份特定的properties文件:(名稱最好爲:instance.properties),這裏面只須要建立properties文件便可,其他幾份文件會自動生成,instance.properties能夠直接從example文件夾裏面進行copy。
首先是導入相應的依賴文件:
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.0</version> </dependency>
單機版本的環境比較好搭建,相應的代碼以下:
首先是canal客戶端的配置類
/** * @author idea * @date 2019/5/6 * @Version V1.0 */ public class CanalConfig { public static String CANAL_ADDRESS="127.0.0.1"; public static int PORT=11111; public static String DESTINATION="idea_user_data"; public static String FILTER=".*\..*"; }
客戶端代碼:
package com.sise.client; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException; import java.net.InetSocketAddress; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import static com.sise.config.CanalConfig.*; /** * @author idea * @date 2019/5/6 * @Version V1.0 */ public class CanalClient { private static Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>(); public static void main(String args[]) { CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(CANAL_ADDRESS, PORT), DESTINATION, "", ""); int batchSize = 1000; try { connector.connect(); connector.subscribe(FILTER); connector.rollback(); try { while (true) { //嘗試從master那邊拉去數據batchSize條記錄,有多少取多少 Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { Thread.sleep(1000); } else { dataHandle(message.getEntries()); } connector.ack(batchId); //當隊列裏面堆積的sql大於必定數值的時候就模擬執行 if (SQL_QUEUE.size() >= 10) { executeQueueSql(); } } } catch (InterruptedException e) { e.printStackTrace(); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } finally { connector.disconnect(); } } /** * 模擬執行隊列裏面的sql語句 */ public static void executeQueueSql() { int size = SQL_QUEUE.size(); for (int i = 0; i < size; i++) { String sql = SQL_QUEUE.poll(); System.out.println("[sql]----> " + sql); } } /** * 數據處理 * * @param entrys */ private static void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException { for (Entry entry : entrys) { if (EntryType.ROWDATA == entry.getEntryType()) { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); EventType eventType = rowChange.getEventType(); if (eventType == EventType.DELETE) { saveDeleteSql(entry); } else if (eventType == EventType.UPDATE) { saveUpdateSql(entry); } else if (eventType == EventType.INSERT) { saveInsertSql(entry); } } } } /** * 保存更新語句 * * @param entry */ private static void saveUpdateSql(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> rowDatasList = rowChange.getRowDatasList(); for (RowData rowData : rowDatasList) { List<Column> newColumnList = rowData.getAfterColumnsList(); StringBuffer sql = new StringBuffer("update " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " set "); for (int i = 0; i < newColumnList.size(); i++) { sql.append(" " + newColumnList.get(i).getName() + " = '" + newColumnList.get(i).getValue() + "'"); if (i != newColumnList.size() - 1) { sql.append(","); } } sql.append(" where "); List<Column> oldColumnList = rowData.getBeforeColumnsList(); for (Column column : oldColumnList) { if (column.getIsKey()) { //暫時只支持單一主鍵 sql.append(column.getName() + "=" + column.getValue()); break; } } SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } /** * 保存刪除語句 * * @param entry */ private static void saveDeleteSql(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> rowDatasList = rowChange.getRowDatasList(); for (RowData rowData : rowDatasList) { List<Column> columnList = rowData.getBeforeColumnsList(); StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " where "); for (Column column : columnList) { if (column.getIsKey()) { //暫時只支持單一主鍵 sql.append(column.getName() + "=" + column.getValue()); break; } } SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } /** * 保存插入語句 * * @param entry */ private static void saveInsertSql(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> rowDatasList = rowChange.getRowDatasList(); for (RowData rowData : rowDatasList) { List<Column> columnList = rowData.getAfterColumnsList(); StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " ("); for (int i = 0; i < columnList.size(); i++) { sql.append(columnList.get(i).getName()); if (i != columnList.size() - 1) { sql.append(","); } } sql.append(") VALUES ("); for (int i = 0; i < columnList.size(); i++) { sql.append("'" + columnList.get(i).getValue() + "'"); if (i != columnList.size() - 1) { sql.append(","); } } sql.append(")"); SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } }
啓動程序以後,咱們對數據庫表進行10次左右的修改操做以後,即可以從控制檯中看到sql的打印信息。
在實際開發中,若是隻有一臺canal機器做爲server,當該臺機器掛掉以後,服務就會終止,那麼這個時候咱們便須要引入集羣部署的方式了。
搭建canal集羣的環境須要先搭建好相應的zk集羣模式。zk的集羣搭建網上資料不少,這裏就不進行講解了。
canal搭建集羣的一些資料能夠參考如下連接:
https://github.com/alibaba/canal/wiki/AdminGuide
canal在搭建HA模式的時候有幾個容易掉坑的步驟:
canal.properties配置裏面須要添加zk的地址,同時canal.instance.global.spring.xml
須要修改成classpath:spring/default-instance.xml
每臺機子的canal裏面的具體instance所在目錄的名稱須要統一,每一個實例都有對應的slaveId,他們的id須要保證不重複。搭建好了canal集羣環境以後,而後代碼部分須要在連接的那個模塊進行稍微的調整:
CanalConnector connector = CanalConnectors.newClusterConnector(CLUSTER_ADDRESS, DESTINATION, "", "");
爲了保證master在某些特殊場景下掛掉,mysql須要搭建爲雙M模式,那麼咱們這個時候能夠在每一個canal機器的instance配置文件中加入master的地址和standby的地址:
canal.instance.master.address=******
canal.instance.standby.address = ******
同時對於detecing也須要進行配置修改
canal.instance.detecting.enable = true ## 須要開啓心跳檢查 canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() ##心跳檢查sql canal.instance.detecting.interval.time = 3 ##心跳檢查頻率 canal.instance.detecting.retry.threshold = 3 ## 心跳檢查失敗次數閥值,當超過這個次數以後,就會自動切換到standby上邊的機器進行binlog的訂閱讀取 canal.instance.detecting.heartbeatHaEnable = true ## 是否開啓master和standby的主動切換
ps: master和standby進行切換機器的時候可能會有時間延遲。
啓動2臺canal機器,能夠在zk裏面查看到canal註冊的節點信息:
經過模擬測試,關閉當前端口爲11111的canal機器,節點信息會自動更換爲第二臺canal進行替換:
ClusterCanalConnector和SimpleCanalConnector類發現了username和password的參數,可是彷佛具體配置中並無作具體的設置,這是爲何呢?
後來也在github上邊查看到了一些網友的相關討論:
pom依賴:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>1.0.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.1</version> </dependency>
kafka的配置類:
public class KafkaProperties { public final static String ZK_CONNECTION = "XXX.XXX.XXX.XXX:2181"; public final static String BROKER_LIST_ADDRESS = "XXX.XXX.XXX.XXX:9092"; public final static String GROUP_ID = "group1"; public final static String TOPIC = "USER-DATA"; }
關於kafka的環境搭建步驟比較簡單,網上有不少的資料,這裏就很少一一介紹了。
首先是kafka的producer部分代碼:
import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.log4j.Logger; import java.util.Properties; import static com.sise.kafka.KafkaProperties.TOPIC; /** * @author idea * @date 2019/5/7 * @Version V1.0 */ public class KafkaProducerDemo extends Thread { public static Logger log = Logger.getLogger(KafkaProducerDemo.class); //kafka的連接地址要使用hostname 默認9092端口 private static final String BROKER_LIST = BROKER_LIST_ADDRESS; private static KafkaProducer<String, String> producer = null; static { Properties configs = initConfig(); producer = new KafkaProducer<String, String>(configs); } /* 初始化配置 */ private static Properties initConfig() { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); return properties; } public static void sendMsg(String msg) { ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, msg); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (null != e) { log.info("send error" + e.getMessage()); } else { System.out.println("send success"); } } }); } }
接着是consumer部分的代碼:
import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; /** * @author idea * @date 2019/5/7 * @Version V1.0 */ public class KafkaConsumerDemo extends Thread { private final ConsumerConnector consumer; private final String topic; public KafkaConsumerDemo(String topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig()); this.topic = topic; } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put("zookeeper.connect", KafkaProperties.ZK_CONNECTION); props.put("group.id", KafkaProperties.GROUP_ID); props.put("zookeeper.session.timeout.ms", "40000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } @Override public void run() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { System.out.println("【receive】" + new String(it.next().message())); } } }
而後須要在CanalClient 的executeQueueSql函數出進行部分功能的修改:
/** * 給kafka發送sql語句 */ public static void executeQueueSql() { int size = SQL_QUEUE.size(); for (int i = 0; i < size; i++) { String sql = SQL_QUEUE.poll(); //發送sql給kafka KafkaProducerDemo.sendMsg(sql); } }
爲了驗證程序是否正常,啓動canal和kafka以後,對canal監聽的數據庫裏面的表進行數據信息的修改,而後canal會將修改的binlog裏面的sql放入隊列中,當隊列滿了以後便向kafka中進行發送:
consumer端接受到數據以後控制檯便打印出相應內容: