另闢蹊徑,MySQL主從同步延遲,這樣解決也挺好

WX搜索【程序員內點事】,回覆【666】妙趣橫生。java

1、canal是個啥?

canal是阿里開發的一款基於數據庫增量日誌解析,提供增量數據訂閱與消費的框架,整個框架純JAVA開發,目前僅支持MysqlMariaDB(和mysql相似)。mysql

那什麼是數據庫增量日誌?git

MySQL的日誌種類是比較多的,主要包含:錯誤日誌、查詢日誌、慢查詢日誌、事務日誌、二進制日誌。而MySQL數據庫所發生的數據變動(DML(data manipulation language)數據操縱語言,也就是咱們熟悉的增刪改),都會以二進制日誌(binary log)形式存儲。程序員

2、canal原理

在介紹canal原理以前,咱們先來回顧一下MySQL主從同步的原理,這或許會讓你更好的理解canal的工做機制。github

一、MySQL主從同步原理:web

MySQL主從同步也叫讀寫分離,能夠提高數據庫的負載和容錯能力,實現數據庫的高可用sql

先來分析一張MySQL主從同步原理圖: 在這裏插入圖片描述數據庫

以上圖片源自網絡,若有侵權聯繫刪除vim

master節點操做過程:服務器

master節點數據發生更改後(delete、update、insert,仍是建立函數、存儲過程等操做),向binary log中寫入記錄日誌,這些記錄又叫作二進制日誌事件(binary log events)。

show binlog events 
複製代碼

在這裏插入圖片描述   這些事件會按照順序寫入bin log中。當slave節點啓動鏈接到master節點的時候,master節點會爲slave節點開啓binlog dump線程(負責傳輸binlog數據)。

一旦master節點的bin log發生變化時,bin logdump線程會通知slave節點有能夠傳輸的binlog,並將相應的bin log內容發送給slave節點。

slave節點操做過程:

slave節點上會建立兩個線程:一個I/O線程,一個SQL線程。I/O線程鏈接到master節點,master節點上的binlog dump 線程會將binlog的內容發送給該I\O線程。

該I/O線程接收到binlog內容後,再將內容寫入到本地的relay log。而sql線程讀取到I/O線程寫入的ralay log,將relay log中的內容寫入slave數據庫。


二、canal原理

懂了上邊MySQL的主從同步原理,canal的工做機制就很好理解了。其實canal是模擬了MySQL數據庫中,slave節點與master節點的交互協議,假裝本身爲MySQL slave節點,向MySQL master節點發送dump協議,MySQL master節點收到dump請求,開始推送binary log給slave節點(也就是canal)。 在這裏插入圖片描述

以上圖片源自網絡,若有侵權聯繫刪除

光說不練假把式,開幹!

3、canal實現「監控」MySQL

在寫代碼前咱們先對MySQL進行一下改造,安裝MySQL就再也不細說了,基本操做。

一、查看一下MySQL是否開啓了binary log功能

show binary logs 
複製代碼

若是沒有開啓是圖中的狀態,通常用戶是沒有這個命令權限的,不過我有,嘖嘖嘖! 在這裏插入圖片描述 若是沒有須要手動開啓,而且在my.cnf文件中配置binlog-formatRow模式

log-bin=mysq-bin
binlog-format=Row 複製代碼

log-binbinlog文件存放位置 binlog-format 設置MySQL複製log-bin的方式

MySQL的三種複製方式:

基於SQL語句的複製(statement-based replication, SBR)

  • 優勢:將修改數據的sql保存在binlog,不須要記錄每一條sql和數據變化,binlog體量會很小,IO開銷少,性能好
  • 缺點:會致使master-slave中的數據不一致

基於行的複製(row-based replication, RBR)

  • 優勢:不記錄每條sql語句的上下文信息,僅需記錄哪條數據被修改了,修改爲什麼樣了
  • 缺點:binlog體積很大,尤爲是在alter table屬性時,會產生大量binlog數據

混合模式複製(mixed-based replication, MBR)

  • 對應的,binlog的格式也有三種:STATEMENT,ROW,MIXED。

二、爲canal 建立一個有權限操做MySQL的用戶

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES; 複製代碼

三、安裝canal

下載地址:https://github.com/alibaba/canal/releases

下載後選擇版本例如:canal.deployer-xxx.tar.gz

四、配置canal

修改instance.properties文件,須要添加監聽數據庫和表的規則,canal能夠全量監聽數據庫,也能夠針對某個表進行監聽,比較靈活。

vim conf/example/instance.properties
複製代碼
#################################################
## mysql serverId canal.instance.mysql.slaveId = 2020  # position info 修改本身的數據庫(canal要監聽的數據庫 地址 ) canal.instance.master.address = 127.0.0.1:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp =  #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp =  # username/password 修改爲本身 數據庫信息的帳號 (單獨開一個 準備階段建立的帳號) canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF-8  # table regex 表的監聽規則 # canal.instance.filter.regex = blogs\.blog_info canal.instance.filter.regex = .\*\\\\..\* # table black regex canal.instance.filter.black.regex = 複製代碼

啓動canal

sh bin/startup.sh
複製代碼

看一下server日誌,確認一下canal是否正常啓動

vi logs/canal/canal.log
複製代碼

顯示canal server is running now即爲成功

2020-01-08 15:25:33.361 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ##    start the canal server.
2020-01-08 15:25:33.468 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.12.245:11111] 2020-01-08 15:25:34.061 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ...... 複製代碼

五、編寫Java客戶端代碼,實現canal監聽

引入依賴包

<dependency>
 <groupId>com.alibaba.otter</groupId>  <artifactId>canal.client</artifactId>  <version>1.1.0</version> </dependency> 複製代碼

這裏只是簡單實現

public class MainApp {
  public static void main(String... args) throws Exception {   /**  * 建立與  */  CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),  11111), "example", "", "");   int batchSize = 1000;  int emptyCount = 0;  try {  connector.connect();  /**  * 監控數據庫中全部表  */  connector.subscribe(".*\\..*");  /**  * 指定要監控的表,庫名.表名  */  //connector.subscribe("xin-master.jk_order");  connector.rollback();   //120次心跳事後未檢測到,跳出  int totalEmptyCount = 120;  while (emptyCount < totalEmptyCount) {  Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的數據  long batchId = message.getId();  int size = message.getEntries().size();  if (batchId == -1 || size == 0) {  emptyCount++;  System.out.println("empty count : " + emptyCount);  try {  Thread.sleep(1000);  } catch (InterruptedException e) {  }  } else {  emptyCount = 0;  // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);  printEntry(message.getEntries());  }  /**  * 提交確認  */  connector.ack(batchId);  /**  * 處理失敗, 回滾數據  */  connector.rollback(batchId);  }   System.out.println("empty too many times, exit");  } finally {  connector.disconnect();  /**  * 手動開啓事務回滾  */  //TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();  }  }   private static void printEntry(List<CanalEntry.Entry> entrys) {   for (CanalEntry.Entry entry : entrys) {   if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry  .EntryType  .TRANSACTIONEND) {  continue;  }   CanalEntry.RowChange rowChage = null;  try {  rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());  } catch (Exception e) {  throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),  e);  }   CanalEntry.EventType eventType = rowChage.getEventType();  System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",  entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),  entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),  eventType));   for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {  if (eventType == CanalEntry.EventType.DELETE) {  printColumn(rowData.getBeforeColumnsList());  } else if (eventType == CanalEntry.EventType.INSERT) {  printColumn(rowData.getAfterColumnsList());  } else {  System.out.println("-------> before");  printColumn(rowData.getBeforeColumnsList());  System.out.println("-------> after");  printColumn(rowData.getAfterColumnsList());  }  }  }  }   private static void printColumn(List<CanalEntry.Column> columns) {  for (CanalEntry.Column column : columns) {  System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());  }  } } 複製代碼

代碼到這就編寫完成了,咱們啓動服務看下是什麼效果,因爲並無操做數據庫,因此監聽的結果都是空的。 在這裏插入圖片描述 接下來咱們在數據庫執行一條update語句試試

update jk_orderset order_no = '1111'  where id = 40
複製代碼

控制檯檢測到了數據庫的修改,並生成binlog 日誌文件mysql-bin.000009:3830 在這裏插入圖片描述 那麼生成的binlog 文件該怎麼用,如何解析成SQl語句呢?

<!-- mysql binlog解析 -->
 <dependency>  <groupId>com.github.shyiko</groupId>  <artifactId>mysql-binlog-connector-java</artifactId>  <version>0.13.0</version> </dependency> 複製代碼

將剛纔的binlog文件下載本地測試一下

public static void main(String[] args) throws IOException {
 String filePath = "C:\\ProgramData\\MySQL\\MySQL Server 5.7\\Data\\mysql-bin.000009:3830";  File binlogFile = new File(filePath);  EventDeserializer eventDeserializer = new EventDeserializer();  eventDeserializer.setChecksumType(ChecksumType.CRC32);  BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile, eventDeserializer);  try {  for (Event event; (event = reader.readEvent()) != null; ) {  System.out.println(event.toString());  }  } finally {  reader.close();  }  } 複製代碼

查看一下執行結果,發現數據庫最近的一次操做是加了一個idx_index索引

Event{header=EventHeaderV4{timestamp=1551325542000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=8455, flags=0}, data=null}
Event{header=EventHeaderV4{timestamp=1551325542000, eventType=QUERY, serverId=1, headerLength=19, dataLength=190, nextPosition=8664, flags=0}, data=QueryEventData{threadId=25, executionTime=0, errorCode=0, database='xin-master', sql='ALTER TABLE `jk_order` DROP INDEX `idx_index` , ADD INDEX `idx_index` (`user_id`, `service_id`, `real_price`) USING BTREE'}} Event{header=EventHeaderV4{timestamp=1551438586000, eventType=STOP, serverId=1, headerLength=19, dataLength=4, nextPosition=8687, flags=0}, data=null} 複製代碼

至此咱們就已經實現了監控MySQL,

4、canal應用場景

canal應用場景大體有如下:

  • 解決MySQL主從同步延遲的問題
  • 實現數據庫實時備份
  • 多級索引 (賣家和買家各自分庫索引)
  • 實現業務cache刷新
  • 價格變化等重要業務消息

重點分析一下canal是如何解決MySQL主從同步延遲的問題

生產環境下MySQL的主從同步模式(maser-slave)很常見,但對於跨機房部署的集羣,會出現同步延時的狀況。舉個栗子:

一條訂單狀態是未付款,master節點修改爲已付款,可因爲某些緣由出現延遲數據未能及時同步到slave,這時用戶當即查看訂單狀態(查詢走slave)顯示仍是未付款,哪一個用戶看到這種狀況不得慌啊。

爲何會出現主從同步延遲呢?

當主庫masterTPS併發較高時,master節點併發產生的修改操做,而slave節點的sql線程是單線程處理同步數據,延時天然而言就產生了。

咱們用canal實時監聽maser節點的數據更新(能夠針對某個表監聽),canal捕捉到更改的SQL後當即在slave節點執行,以此來解決主從延遲問題。

不過形成主從同步的緣由不止這些,因爲主從服務器存在跨機器而且跨機房,除了網絡帶寬緣由以外,網絡的穩定性以及機器之間的同步,都是主從同步應該考慮的主要緣由。

總結

本文只是簡單實現canal監聽數據庫的功能,旨在給你們提供一種解決問題的思路,仍是反覆絮叨的那句話,解決問題的技術方法不少,具體如何應用還需結合具體業務。

整理了幾百本各種技術電子書和視頻資料 ,噓~免費 送,公號內回覆【666】自行領取。和小夥伴們建了一個技術交流羣,一塊兒探討技術、分享技術資料,旨在共同窗習進步,感興趣就入咱們吧!

相關文章
相關標籤/搜索