1、前言html
Binlog是MySQL數據庫的二進制日誌,用於記錄用戶對數據庫操做的SQL語句(除了數據查詢語句)信息。而Binlog格式也有三種,分別爲STATEMENT、ROW、MIXED。STATMENT模式基於SQL語句的複製,每一條會修改數據的SQL語句會記錄。ROW模式除了記錄SQL語句以外,還會記錄每一個字段的變化狀況,可以清楚的記錄每行數據的變化歷史,會佔用較多的空間。MIXED比較靈活的記錄,當遇到表結構變動的時候,就會記錄爲STATMENT模式,當遇到了數據更新或者刪除狀況下就會變爲ROW模式。Binlog三個用途分別爲數據恢復、複製、審計。java
Canal是阿里MySQL數據庫Binlog的增量訂閱&消費組件 ,基於數據庫Binlog能夠監控數據庫數據的變化,進而用於數據同步等業務。分爲Canal Server與Canal Client,前者讀取Binlog解析後存儲,後者鏈接前者消費。
mysql
2、安裝搭建git
一、下載安裝包。並上傳至服務器中。下載地址爲:https://github.com/alibaba/canal/releasesgithub
二、將home文件夾中的壓縮包解壓至安裝路徑(以下圖所示)。spring
1 tar -xzf /home/canal.deployer-1.1.3.tar.gz -C /usr/java/canal
三、進入canal文件夾,修改配置文件(以下圖所示)。sql
1 vi conf/example/instance.properties
1 canal.instance.dbUsername=root #數據庫帳號 2 canal.instance.dbPassword=1234 #數據庫密碼 3 canal.instance.defaultDatabaseName = corporate_genealogy #數據庫 4 canal.instance.connectionCharset = UTF-8 #數據庫編碼
四、配置MySQL數據庫,開啓Binlog,並選擇模式爲ROW(以下圖所示)。數據庫
1 vi /etc/my.cnf
1 #canal 2 log-bin=mysql-bin 3 binlog-format=ROW 4 server_id=1
五、數據庫建立canal用戶,賦予權限,並刷新(以下圖所示)。服務器
ps:這裏遇到一個異常信息,是由於數據庫密碼過於簡單,不符合密碼策略,須要修改一下策略。。。ide
1 mysql -uroot -p1234
1 SHOW VARIABLES LIKE 'validate_password%';
1 set global validate_password_policy=LOW;
1 set global validate_password_length=4;
1 create user canal identified by 'canal';
1 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
1 FLUSH PRIVILEGES;
六、退出並重啓MySQL。
1 exit;
1 sudo service mysqld restart;
七、進入canal的bin文件夾,啓動canal-server。
1 ./startup.sh
八、查看logs文件中日誌是否啓動成功(以下圖所示)。
3、客戶端代碼檢測
ps:須要注意的是服務器防火牆需打開對應端口號,這裏是11111。
一、添加Maven依賴
1 <!-- Canal --> 2 <dependency> 3 <groupId>com.alibaba.otter</groupId> 4 <artifactId>canal.client</artifactId> 5 <version>1.1.3</version> 6 </dependency>
二、測試類代碼
1 import java.net.InetSocketAddress; 2 import java.util.List; 3 4 import com.alibaba.otter.canal.client.CanalConnector; 5 import com.alibaba.otter.canal.client.CanalConnectors; 6 import com.alibaba.otter.canal.protocol.CanalEntry.Column; 7 import com.alibaba.otter.canal.protocol.CanalEntry.Entry; 8 import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; 9 import com.alibaba.otter.canal.protocol.CanalEntry.EventType; 10 import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; 11 import com.alibaba.otter.canal.protocol.CanalEntry.RowData; 12 import com.alibaba.otter.canal.protocol.Message; 13 14 public class TestCanal { 15 16 public static void main(String args[]) { 17 // 建立連接 18 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("服務器IP", 11111), 19 "example", "", ""); 20 int batchSize = 1000; 21 int emptyCount = 0; 22 try { 23 connector.connect(); 24 connector.subscribe(".*\\..*"); 25 connector.rollback(); 26 int totalEmtryCount = 1200; 27 while (emptyCount < totalEmtryCount) { 28 // 獲取指定數量的數據 29 Message message = connector.getWithoutAck(batchSize); 30 long batchId = message.getId(); 31 int size = message.getEntries().size(); 32 if (batchId == -1 || size == 0) { 33 emptyCount++; 34 try { 35 Thread.sleep(1000); 36 } catch (InterruptedException e) { 37 e.printStackTrace(); 38 } 39 } else { 40 emptyCount = 0; 41 printEntry(message.getEntries()); 42 } 43 // 提交確認 44 connector.ack(batchId); 45 } 46 System.out.println("empty too many times, exit"); 47 } finally { 48 connector.disconnect(); 49 } 50 } 51 52 private static void printEntry(List<Entry> entrys) { 53 for (Entry entry : entrys) { 54 if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN 55 || entry.getEntryType() == EntryType.TRANSACTIONEND) { 56 continue; 57 } 58 59 RowChange rowChage; 60 try { 61 rowChage = RowChange.parseFrom(entry.getStoreValue()); 62 } catch (Exception e) { 63 throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); 64 } 65 66 EventType eventType = rowChage.getEventType(); 67 System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", 68 entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), 69 entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); 70 71 for (RowData rowData : rowChage.getRowDatasList()) { 72 if (eventType == EventType.DELETE) { 73 printColumn(rowData.getBeforeColumnsList()); 74 } else if (eventType == EventType.INSERT) { 75 printColumn(rowData.getAfterColumnsList()); 76 } else { 77 System.out.println("-------> before"); 78 printColumn(rowData.getBeforeColumnsList()); 79 System.out.println("-------> after"); 80 printColumn(rowData.getAfterColumnsList()); 81 } 82 } 83 } 84 } 85 86 private static void printColumn(List<Column> columns) { 87 for (Column column : columns) { 88 System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); 89 } 90 } 91 92 }
三、Navicat 鏈接對應數據庫進行一些添加刪除更新操做,控制檯輸出以下圖所示。
4、總結展望
考慮到Canal的堆積能力並不強。堆積數據到10W+時,速度會變慢,並會出現假死現象。所以介入消息中間件MQ很是有必要,解決堆積能力問題,能夠延後消費,可以方便的獲得積壓數據,進行監控報警。
本文部分學習參考了:https://www.cnblogs.com/java-spring/p/8930740.html
至此是關於介紹在Linux系統中阿里Canal中間件的初步搭建和使用,後續會介紹配合消息中間件等方式處理數據同步及其它業務邏輯。
若有疏漏錯誤之處,還請不吝賜教!