使用canal+kafka監聽MySQL binlog小實踐

前言

最近,想對MySQL有進一步的認識,看如何保證緩存與數據庫一致性,在負責業務上也須要這方面的優化,有些文章提到使用監聽MySQL binlog實現,想試下,本文純屬好奇心驅使。html

  • 所用工具:MySQL + Canal + Kafka

一、kafka:kafka.apache.org/quickstart(… 二、Canal:github.com/alibaba/can… 三、MySQL版本以下:java

+-------------------------+-----------------------+
| Variable_name           | Value                 |
+-------------------------+-----------------------+
| innodb_version          | 8.0.12                |
| protocol_version        | 10                    |
| slave_type_conversions  |                       |
| tls_version             | TLSv1,TLSv1.1,TLSv1.2 |
| version                 | 8.0.12                |
| version_comment         | Homebrew              |
| version_compile_machine | x86_64                |
| version_compile_os      | osx10.14              |
| version_compile_zlib    | 1.2.11                |
+-------------------------+-----------------------+
複製代碼

MySQL binlog簡介

  • binlog是MySQL server層維護的一種二進制日誌,與innodb等存儲引擎中的redo/undo log是徹底不一樣的日誌;主要是用來記錄對MySQL數據更新或潛在發生更新的SQL語句,並以「事務」的形式保存在磁盤中。 那麼,binlog日誌的做用以下:

主從複製 數據恢復 增量備份mysql

  • 查看binlog相關配置: show variables like '%log_bin%';
+---------------------------------+-----------------------------------+
| Variable_name                   | Value                             |
+---------------------------------+-----------------------------------+
| log_bin                         | ON                                |
| log_bin_basename                | /usr/local/var/mysql/binlog       |
| log_bin_index                   | /usr/local/var/mysql/binlog.index |
| log_bin_trust_function_creators | OFF                               |
| log_bin_use_v1_row_events       | OFF                               |
| sql_log_bin                     | ON                                |
+---------------------------------+-----------------------------------+
複製代碼
  • 查看binlog目錄:show binary logs;
+---------------+-----------+
| Log_name      | File_size |
+---------------+-----------+
| binlog.000036 |       155 |
| binlog.000037 |      1066 |
| binlog.000038 |      3075 |
+---------------+-----------+
複製代碼
  • 查看binlog的狀態:show master status;可查看當前二進制日誌文件的狀態信息,顯示正在寫入的二進制文件,以及當前的position。
| binlog.000038 |     3075 |              |                  |                 |
複製代碼
  • 那麼,再來查看binlog.000038日誌內容,./mysqlbinlog /usr/local/var/mysql/binlog.000038
  • ROW級別下,SQL語句須要解碼,須要加解碼選項,./mysqlbinlog --base64-output=decode-rows -v /usr/local/var/mysql/binlog.000038
# at 3311
#200410 17:29:47 server id 1  end_log_pos 3386 CRC32 0xac866698 	Write_rows: table id 65 flags: STMT_END_F
### INSERT INTO `zacblog`.`t_zb_article`
### SET
###   @1=5
###   @2=22121
###   @3='dada'
###   @4='dadwad'
###   @5=0
###   @6='2020-04-10 17:29:31'
###   @7=1586510981
複製代碼
  • 固然,binlog的格式也是能夠設定的,分別有ROWSTATEMENTMIXED選項。

緩存與數據庫一致性

  • 首先,根據CAP理論,一個系統同時知足C、A、P是不可能的,放棄C也不是說放棄一致性,而是放棄強一致性,追求最終一致性
  • 此前,項目也遇到過,緩存與數據庫如何保持一致性的問題,網上形形色色的答案,但我看到有些大神每每引用Cache-Aside pattern這篇文章,文章中講的模式就是先更新數據庫,再刪除緩存

The order of the steps is important. Update the data store before removing the item from the cache. If you remove the cached item first, there is a small window of time when a client might fetch the item before the data store is updated. That will result in a cache miss (because the item was removed from the cache), causing the earlier version of the item to be fetched from the data store and added back into the cache. The result will be stale cache data. 譯文:若是先刪除緩存,會有短暫的時間窗口,客戶端訪問數據庫的舊值,而且致使該key下的請求所有打到數據庫,而後舊值也會從新保存到緩存。git

enter image description here

  • 那麼,更新完數據庫,刪除緩存失敗(機率不能說沒有)時,仍是會有問題。。。

一、可使用MQ重試機制,當remove拋出異常,咱們能夠利用MQ異步重試刪除。 二、利用監控捕捉重試異常。github

  • 寫到這兒,我都快以爲我跑題了,確實這部分網上各抒己見,上面純屬我的見解。下面就是知足我的好奇心了,有文章說使用MQ監聽MySQL binlog的方式異步刪除緩存,刪除失敗繼續放入MQ重試。

Canal簡介

  • 爲了監聽MySQL binlog我搜到美團DB數據同步到數據倉庫的架構與實踐 文章中寫到使用Canal實現binlogKafka的鏈接。
  • Canal譯意爲水道/管道/溝渠,主要用途是基於 MySQL 數據庫增量日誌解析,提供增量數據訂閱和消費。
  • 基於日誌增量訂閱和消費的業務包括

一、數據庫鏡像 二、數據庫實時備份 三、索引構建和實時維護(拆分異構索引、倒排索引等) 四、業務 cache 刷新 五、帶業務邏輯的增量數據處理sql

  • 而後,咱們就要開始根據QuickStart的命令去啓動了,大部分都是根據官網操做的,只是遇到了幾個坑。
  • 具體命令,我就再也不贅述了,具體順序爲

一、先啓動kafka 二、Canal instance.properties和canal.properties,包含db、用戶信息、Kafka集羣及topic信息 三、根據Canal配置信息,在Kafka建立相應的topic和消費者數據庫

踩坑記錄

  • canal.properties關鍵配置
...
canal.serverMode = kafka
...
canal.mq.servers = localhost:9092,localhost:9093,localhost:9094
複製代碼
  • instance.properties關鍵配置
canal.instance.master.address=192.168.1.20:3306
# username/password,數據庫的用戶名和密碼
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# mq config
canal.mq.topic=example
複製代碼
  • caching_sha2_password Auth failed
Caused by: java.io.IOException: connect /127.0.0.1:3306 failure
	at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:83) ~[canal.parse.driver-1.1.4.jar:na]
	at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.connect(MysqlConnection.java:89) ~[canal.parse-1.1.4.jar:na]
	at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.preDump(MysqlEventParser.java:86) ~[canal.parse-1.1.4.jar:na]
	at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:183) ~[canal.parse-1.1.4.jar:na]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_181]
Caused by: java.io.IOException: caching_sha2_password Auth failed
	at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:257) ~[canal.parse.driver-1.1.4.jar:na]
	at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:80) ~[canal.parse.driver-1.1.4.jar:na]
	... 4 common frames omitted
複製代碼
  • mysql -hlocalhost -P3306 -ucanal -p123 -Dzacblog登陸數據庫,重啓canal便可。

最後效果

  • 根據對數據庫表,進行修改,Canal會經過Kafka發出消息,在Consumer接收到的消息格式以下。
  • INSERT語句,data爲關鍵字段
{
    "data":[
        {
            "article_id":"3",
            "author_id":"1121212",
            "article_title":"dadad",
            "article_content":"dadad",
            "status":"0",
            "create_time":"2020-04-10 16:30:53",
            "update_time":"2020-04-10 16:31:00"
        }
    ],
    "database":"zacblog",
    "es":1586507462000,
    "id":1,
    "isDdl":false,
    "mysqlType":{
        "article_id":"bigint(20)",
        "author_id":"bigint(20)",
        "article_title":"varchar(50)",
        "article_content":"text",
        "status":"tinyint(3)",
        "create_time":"datetime",
        "update_time":"timestamp"
    },
    "old":null,
    "pkNames":[
        "article_id"
    ],
    "sql":"",
    "sqlType":{
        "article_id":-5,
        "author_id":-5,
        "article_title":12,
        "article_content":2005,
        "status":-6,
        "create_time":93,
        "update_time":93
    },
    "table":"t_zb_article",
    "ts":1586507463069,
    "type":"INSERT"
}
複製代碼
  • UPDATE語句,更新字段用old的list來表示
{
	...
    "old":[
        {
            "article_title":"dadad",
            "update_time":"2020-04-10 16:31:00"
        }
    ],
	...
    "type":"UPDATE"
}
複製代碼
  • DELETE語句,data爲關鍵字段
{
	...
    "data":[
        {
            "article_id":"5",
            "author_id":"22121",
            "article_title":"dada",
            "article_content":"dadwad",
            "status":"0",
            "create_time":"2020-04-10 17:29:31",
            "update_time":"2020-04-10 17:29:41",
            "ext":null
        }
    ],
    "database":"zacblog",
    "es":1586517722000,
	...
    "type":"DELETE"
}
複製代碼
  • ALTER TABLE,直接是語句
{
    "data":null,
    "database":"zacblog",
    "es":1586511807000,
    "id":6,
    "isDdl":true,
    "mysqlType":null,
    "old":null,
    "pkNames":null,
    "sql":"ALTER TABLE `zacblog`.`t_zb_article` 
ADD COLUMN `ext` varchar(255) NULL AFTER `update_time`",
    "sqlType":null,
    "table":"t_zb_article",
    "ts":1586511808016,
    "type":"ALTER"
}
複製代碼

小結

  • 本文從MySQL binlog出發,途徑緩存-數據庫一致性的討論,最後爲了知足好奇心,簡單動手監聽了下binlog改動的Kafka消息。
  • 其實,在實際開發當中,咱們也能夠參照這個消息格式,對數據庫的增刪改,發出對應的業務消息,這個消息格式,我以爲仍是有借鑑意義的。

參考文章

相關文章
相關標籤/搜索