MySQL Binlog 解析工具 Maxwell 詳解

maxwell 簡介

Maxwell是一個能實時讀取MySQL二進制日誌binlog,並生成 JSON 格式的消息,做爲生產者發送給 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平臺的應用程序。它的常見應用場景有ETL、維護緩存、收集表級別的dml指標、增量到搜索引擎、數據分區遷移、切庫binlog回滾方案等。官網(http://maxwells-daemon.io)、GitHub(https://github.com/zendesk/maxwell)html

Maxwell主要提供了下列功能:java

  • 支持 SELECT * FROM table 的方式進行全量數據初始化
  • 支持在主庫發生failover後,自動恢復binlog位置(GTID)
  • 能夠對數據進行分區,解決數據傾斜問題,發送到kafka的數據支持database、table、column等級別的數據分區
  • 工做方式是假裝爲Slave,接收binlog events,而後根據schemas信息拼裝,能夠接受ddl、xid、row等各類event

除了Maxwell外,目前經常使用的MySQL Binlog解析工具主要有阿里的canal、mysql_streamer,三個工具對好比下:node

canal、maxwell、mysql_streamer對比

canal 由Java開發,分爲服務端和客戶端,擁有衆多的衍生應用,性能穩定,功能強大;canal 須要本身編寫客戶端來消費canal解析到的數據。python

maxwell相對於canal的優點是使用簡單,它直接將數據變動輸出爲json字符串,不須要再編寫客戶端。mysql

快速開始

首先MySQL須要先啓用binlog,關於什麼是MySQL binlog,能夠參考文章《MySQL Binlog 介紹linux

$ vi my.cnf

[mysqld]
server_id=1
log-bin=master
binlog_format=row

建立Maxwell用戶,並賦予 maxwell 庫的一些權限git

CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';

使用 maxwell 以前須要先啓動 kafkagithub

wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
tar -xzf kafka_2.11-2.1.0.tgz
cd kafka_2.11-2.1.0
# 啓動Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

單機啓動 kafka 以前,須要修改一下配置文件,打開配置文件 vi config/server.properties,在文件最後加入 advertised.host.name 的配置,值爲 kafka 所在機器的IP正則表達式

advertised.host.name=10.100.97.246

否則後面經過 docker 啓動 maxwell 將會報異常(其中的 hadoop2 是個人主機名)sql

17:45:21,446 DEBUG NetworkClient - [Producer clientId=producer-1] Error connecting to node hadoop2:9092 (id: 0 rack: null)
java.io.IOException: Can't resolve address: hadoop2:9092
        at org.apache.kafka.common.network.Selector.connect(Selector.java:217) ~[kafka-clients-1.0.0.jar:?]
        at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:793) [kafka-clients-1.0.0.jar:?]
        at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:230) [kafka-clients-1.0.0.jar:?]
        at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:263) [kafka-clients-1.0.0.jar:?]
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238) [kafka-clients-1.0.0.jar:?]
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:176) [kafka-clients-1.0.0.jar:?]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
Caused by: java.nio.channels.UnresolvedAddressException
        at sun.nio.ch.Net.checkAddress(Net.java:101) ~[?:1.8.0_181]
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) ~[?:1.8.0_181]
        at org.apache.kafka.common.network.Selector.connect(Selector.java:214) ~[kafka-clients-1.0.0.jar:?]
        ... 6 more

接着能夠啓動 kafka

bin/kafka-server-start.sh config/server.properties

測試 kafka

# 建立一個 topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

# 列出全部 topic
bin/kafka-topics.sh --list --zookeeper localhost:2181

# 啓動一個生產者,而後隨意發送一些消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

# 在另外一個終端啓動一下消費者,觀察所消費的消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

經過 docker 快速安裝並使用 Maxwell (固然以前須要自行安裝 docker)

# 拉取鏡像 
docker pull zendesk/maxwell

# 啓動maxwell,並將解析出的binlog輸出到控制檯
docker run -ti --rm zendesk/maxwell bin/maxwell --user='maxwell' --password='123456' --host='10.100.97.246' --producer=stdout

測試Maxwell,首先建立一張簡單的表,而後增改刪數據

CREATE TABLE `test` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `age` int(11) DEFAULT NULL,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
 
insert into test values(1,22,"小旋鋒");
update test set name='whirly' where id=1;
delete from test where id=1;

觀察docker控制檯的輸出,從輸出的日誌中能夠看出Maxwell解析出的binlog的JSON字符串的格式

{"database":"test","table":"test","type":"insert","ts":1552153502,"xid":832,"commit":true,"data":{"id":1,"age":22,"name":"小旋鋒"}}
{"database":"test","table":"test","type":"update","ts":1552153502,"xid":833,"commit":true,"data":{"id":1,"age":22,"name":"whirly"},"old":{"name":"小旋鋒"}}
{"database":"test","table":"test","type":"delete","ts":1552153502,"xid":834,"commit":true,"data":{"id":1,"age":22,"name":"whirly"}}

輸出到 Kafka,關閉 docker,從新設置啓動參數

docker run -it --rm zendesk/maxwell bin/maxwell --user='maxwell' \
    --password='123456' --host='10.100.97.246' --producer=kafka \
    --kafka.bootstrap.servers='10.100.97.246:9092' --kafka_topic=maxwell --log_level=debug

而後啓動一個消費者來消費 maxwell topic的消息,觀察其輸出;再一次執行增改刪數據的SQL,仍然能夠獲得相同的輸出

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic maxwell

輸出JSON字符串的格式

  • data 最新的數據,修改後的數據
  • old 舊數據,修改前的數據
  • type 操做類型,有insert, update, delete, database-create, database-alter, database-drop, table-create, table-alter, table-drop,bootstrap-insert,int(未知類型)
  • xid 事務id
  • commit 同一個xid表明同一個事務,事務的最後一條語句會有commit,能夠利用這個重現事務
  • server_id
  • thread_id
  • 運行程序時添加參數--output_ddl,能夠捕捉到ddl語句
  • datetime列會輸出爲"YYYY-MM-DD hh:mm:ss",若是遇到"0000-00-00 00:00:00"會原樣輸出
  • maxwell支持多種編碼,但僅輸出utf8編碼
  • maxwell的TIMESTAMP老是做爲UTC處理,若是要調整爲本身的時區,須要在後端邏輯上進行處理

與輸出格式相關的配置以下

選項 參數值 描述 默認值
output_binlog_position BOOLEAN 是否包含 binlog position false
output_gtid_position BOOLEAN 是否包含 gtid position false
output_commit_info BOOLEAN 是否包含 commit and xid true
output_xoffset BOOLEAN 是否包含 virtual tx-row offset false
output_nulls BOOLEAN 是否包含值爲NULL的字段 true
output_server_id BOOLEAN 是否包含 server_id false
output_thread_id BOOLEAN 是否包含 thread_id false
output_schema_id BOOLEAN 是否包含 schema_id false
output_row_query BOOLEAN 是否包含 INSERT/UPDATE/DELETE 語句. Mysql須要開啓 binlog_rows_query_log_events false
output_ddl BOOLEAN 是否包含 DDL (table-alter, table-create, etc) events false
output_null_zerodates BOOLEAN 是否將 '0000-00-00' 轉換爲 null? false

進階使用

基本的配置

選項 參數值 描述 默認值
config 配置文件 config.properties 的路徑
log_level [debug | info | warn | error] 日誌級別 info
daemon 指定Maxwell實例做爲守護進程到後臺運行
env_config_prefix STRING 匹配該前綴的環境變量將被視爲配置值

能夠把Maxwell的啓動參數寫到一個配置文件 config.properties 中,而後經過 config 選項指定,bin/maxwell --config config.properties

user=maxwell
password=123456
host=10.100.97.246
producer=kafka
kafka.bootstrap.servers=10.100.97.246:9092
kafka_topic=maxwell

mysql 配置選項

Maxwell 根據用途將 MySQL 劃分爲3種角色:

  • host:主機,建maxwell庫表,存儲捕獲到的schema等信息
    • 主要有六張表,bootstrap用於數據初始化,schemas記錄全部的binlog文件信息,databases記錄了全部的數據庫信息,tables記錄了全部的表信息,columns記錄了全部的字段信息,positions記錄了讀取binlog的位移信息,heartbeats記錄了心跳信息
  • replication_host:複製主機,Event監聽,讀取該主機binlog
    • hostreplication_host分開,能夠避免 replication_user 往生產庫裏寫數據
  • schema_host:schema主機,捕獲表結構schema的主機
    • binlog裏面沒有字段信息,因此maxwell須要從數據庫查出schema,存起來。
    • schema_host通常用不到,但在binlog-proxy場景下就很實用。好比要將已經離線的binlog經過maxwell生成json流,因而自建一個mysql server裏面沒有結構,只用於發送binlog,此時表機構就能夠制動從 schema_host 獲取。

一般,這三個主機都是同一個,schema_host 只在有 replication_host 的時候使用。

與MySQL相關的有下列配置

選項 參數值 描述 默認值
host STRING mysql 地址 localhost
user STRING mysql 用戶名
password STRING mysql 密碼 (no password)
port INT mysql 端口 3306
jdbc_options STRING mysql jdbc connection options DEFAULT_JDBC_OPTS
ssl SSL_OPT SSL behavior for mysql cx DISABLED
schema_database STRING Maxwell用於維護的schema和position將使用的數據庫 maxwell
client_id STRING 用於標識Maxwell實例的惟一字符串 maxwell
replica_server_id LONG 用於標識Maxwell實例的惟一數字 6379 (see notes)
master_recovery BOOLEAN enable experimental master recovery code false
gtid_mode BOOLEAN 是否開啓基於GTID的複製 false
recapture_schema BOOLEAN 從新捕獲最新的表結構(schema),不可在 config.properties中配置 false
replication_host STRING server to replicate from. See split server roles schema-store host
replication_password STRING password on replication server (none)
replication_port INT port on replication server 3306
replication_user STRING user on replication server
replication_ssl SSL_OPT SSL behavior for replication cx cx DISABLED
schema_host STRING server to capture schema from. See split server roles schema-store host
schema_password STRING password on schema-capture server (none)
schema_port INT port on schema-capture server 3306
schema_user STRING user on schema-capture server
schema_ssl SSL_OPT SSL behavior for schema-capture server DISABLED

生產者的配置

僅介紹kafka,其餘的生產者的配置詳見官方文檔。

kafka是maxwell支持最完善的一個生產者,而且內置了多個版本的kafka客戶端(0.8.2.2, 0.9.0.1, 0.10.0.1, 0.10.2.1 or 0.11.0.1, 1.0.0.),默認 kafka_version=1.0.0(當前Maxwell版本1.20.0)

Maxwell 會將消息投遞到Kafka的Topic中,該Topic由 kafka_topic 選項指定,默認值爲 maxwell,除了指定爲靜態的Topic,還能夠指定爲動態的,譬如 namespace_%{database}_%{table}%{database}%{table} 將被具體的消息的 database 和 table 替換。

Maxwell 讀取配置時,若是配置項是以 kafka. 開頭,那麼該配置將設置到 Kafka Producer 客戶端的鏈接參數中去,譬如

kafka.acks = 1
kafka.compression.type = snappy
kafka.retries=5

下面是Maxwell通用生產者和Kafka生產者的配置參數

選項 參數值 描述 默認值
producer PRODUCER_TYPE 生產者類型 stdout
custom_producer.factory CLASS_NAME 自定義消費者的工廠類
producer_ack_timeout PRODUCER_ACK_TIMEOUT 異步消費認爲消息丟失的超時時間(毫秒ms)
producer_partition_by PARTITION_BY 輸入到kafka/kinesis的分區函數 database
producer_partition_columns STRING 若按列分區,以逗號分隔的列名稱
producer_partition_by_fallback PARTITION_BY_FALLBACK producer_partition_by=column時須要,當列不存在是使用
ignore_producer_error BOOLEAN 爲false時,在kafka/kinesis發生錯誤時退出程序;爲true時,僅記錄日誌 See also dead_letter_topic true
kafka.bootstrap.servers STRING kafka 集羣列表,HOST:PORT[,HOST:PORT]
kafka_topic STRING kafka topic maxwell
dead_letter_topic STRING 詳見官方文檔
kafka_version KAFKA_VERSION 指定maxwell的 kafka 生產者客戶端版本,不可在config.properties中配置 0.11.0.1
kafka_partition_hash [default | murmur3] 選擇kafka分區時使用的hash方法 default
kafka_key_format [array | hash] how maxwell outputs kafka keys, either a hash or an array of hashes hash
ddl_kafka_topic STRING output_ddl爲true時, 全部DDL的消息都將投遞到該topic kafka_topic

過濾器配置

Maxwell 能夠經過 --filter 配置項來指定過濾規則,經過 exclude 排除,經過 include 包含,值能夠爲具體的數據庫、數據表、數據列,甚至用 Javascript 來定義複雜的過濾規則;能夠用正則表達式描述,有幾個來自官網的例子

# 僅匹配foodb數據庫的tbl表和全部table_數字的表
--filter='exclude: foodb.*, include: foodb.tbl, include: foodb./table_\d+/'
# 排除全部庫全部表,僅匹配db1數據庫
--filter = 'exclude: *.*, include: db1.*'
# 排除含db.tbl.col列值爲reject的全部更新
--filter = 'exclude: db.tbl.col = reject'
# 排除任何包含col_a列的更新
--filter = 'exclude: *.*.col_a = *'
# blacklist 黑名單,徹底排除bad_db數據庫,若要恢復,必須刪除maxwell庫
--filter = 'blacklist: bad_db.*'

數據初始化

Maxwell 啓動後將從maxwell庫中獲取上一次中止時position,從該斷點處開始讀取binlog。若是binlog已經清除了,那麼怎樣能夠經過maxwell把整張表都複製出來呢?也就是數據初始化該怎麼作?

對整張表進行操做,人爲地產生binlog?譬如找一個不影響業務的字段譬如update_time,而後加一秒,再減一秒?

update test set update_time = DATE_ADD(update_time,intever 1 second);
update test set update_time = DATE_ADD(update_time,intever -1 second);

這樣明顯存在幾個大問題:

  • 不存在一個不重要的字段怎麼辦?每一個字段都很重要,不能隨便地修改!
  • 若是整張表很大,修改的過程耗時很長,影響了業務!
  • 將產生大量非業務的binlog!

針對數據初始化的問題,Maxwell 提供了一個命令工具 maxwell-bootstrap 幫助咱們完成數據初始化,maxwell-bootstrap 是基於 SELECT * FROM table 的方式進行全量數據初始化,不會產生多餘的binlog!

這個工具備下面這些參數:

參數 說明
--log_level LOG_LEVEL 日誌級別(DEBUG, INFO, WARN or ERROR)
--user USER mysql 用戶名
--password PASSWORD mysql 密碼
--host HOST mysql 地址
--port PORT mysql 端口
--database DATABASE 要bootstrap的表所在的數據庫
--table TABLE 要引導的表
--where WHERE_CLAUSE 設置過濾條件
--client_id CLIENT_ID 指定執行引導操做的Maxwell實例

實驗一番,下面將引導 test 數據庫中 test 表,首先是準備幾條測試用的數據

INSERT INTO `test` VALUES (1, 1, '1');
INSERT INTO `test` VALUES (2, 2, '2');
INSERT INTO `test` VALUES (3, 3, '3');
INSERT INTO `test` VALUES (4, 4, '4');

而後 reset master; 清空binlog,刪除 maxwell 庫中的表。接着使用快速開始中的命令,啓動Kafka、Maxwell和Kafka消費者,而後啓動 maxwell-bootstrap

docker run -it --rm zendesk/maxwell bin/maxwell-bootstrap --user maxwell  \
    --password 123456 --host 10.100.97.246  --database test --table test --client_id maxwell

注意:--bootstrapper=sync 時,在處理bootstrap時,會阻塞正常的binlog解析;--bootstrapper=async 時,不會阻塞。

也能夠執行下面的SQL,在 maxwell.bootstrap 表中插入記錄,手動觸發

insert into maxwell.bootstrap (database_name, table_name) values ('test', 'test');

就能夠在 kafka 消費者端看見引導過來的數據了

{"database":"maxwell","table":"bootstrap","type":"insert","ts":1552199115,"xid":36738,"commit":true,"data":{"id":3,"database_name":"test","table_name":"test","where_clause":null,"is_complete":0,"inserted_rows":0,"total_rows":0,"created_at":null,"started_at":null,"completed_at":null,"binlog_file":null,"binlog_position":0,"client_id":"maxwell"}}
{"database":"test","table":"test","type":"bootstrap-start","ts":1552199115,"data":{}}
{"database":"test","table":"test","type":"bootstrap-insert","ts":1552199115,"data":{"id":1,"age":1,"name":"1"}}
{"database":"test","table":"test","type":"bootstrap-insert","ts":1552199115,"data":{"id":2,"age":2,"name":"2"}}
{"database":"test","table":"test","type":"bootstrap-insert","ts":1552199115,"data":{"id":3,"age":3,"name":"3"}}
{"database":"test","table":"test","type":"bootstrap-insert","ts":1552199115,"data":{"id":4,"age":4,"name":"4"}}
{"database":"maxwell","table":"bootstrap","type":"update","ts":1552199115,"xid":36756,"commit":true,"data":{"id":3,"database_name":"test","table_name":"test","where_clause":null,"is_complete":1,"inserted_rows":4,"total_rows":0,"created_at":null,"started_at":"2019-03-10 14:25:15","completed_at":"2019-03-10 14:25:15","binlog_file":"mysql-bin.000001","binlog_position":64446,"client_id":"maxwell"},"old":{"is_complete":0,"inserted_rows":1,"completed_at":null}}
{"database":"test","table":"test","type":"bootstrap-complete","ts":1552199115,"data":{}}

中間的4條即是 test.test 的binlog數據了,注意這裏的 type 是 bootstrap-insert,而不是 insert

而後再一次查看binlog,show binlog events;,會發現只有與 maxwell 相關的binlog,並無 test.test 相關的binlog,因此 maxwell-bootstrap 命令並不會產生多餘的 binlog,當數據表的數量很大時,這個好處會更加明顯

Bootstrap 的過程是 bootstrap-start -> bootstrap-insert -> bootstrap-complete,其中,start和complete的data字段爲空,不攜帶數據。

在進行bootstrap過程當中,若是maxwell崩潰,重啓時,bootstrap會徹底從新開始,無論以前進行到多少,若不但願這樣,能夠到數據庫中設置 is_complete 字段值爲1(表示完成),或者刪除該行

Maxwell監控

Maxwell 提供了 base logging mechanism, JMX, HTTP or by push to Datadog 這四種監控方式,與監控相關的配置項有下列這些:

選項 參數值 描述 默認值
metrics_prefix STRING 指標的前綴 MaxwellMetrics
metrics_type [slf4j | jmx | http | datadog] 發佈指標的方式
metrics_jvm BOOLEAN 是否收集JVM信息 false
metrics_slf4j_interval SECONDS 將指標記錄到日誌的頻率,metrics_type須配置爲slf4j 60
http_port INT metrics_type爲http時,發佈指標綁定的端口 8080
http_path_prefix STRING http的路徑前綴 /
http_bind_address STRING http發佈指標綁定的地址 all addresses
http_diagnostic BOOLEAN http是否開啓diagnostic後綴 false
http_diagnostic_timeout MILLISECONDS http diagnostic 響應超時時間 10000
metrics_datadog_type [udp | http] metrics_type爲datadog時發佈指標的方式 udp
metrics_datadog_tags STRING 提供給 datadog 的標籤,如 tag1:value1,tag2:value2
metrics_datadog_interval INT 推指標到datadog的頻率,單位秒 60
metrics_datadog_apikey STRING metrics_datadog_type=http 時datadog用的api key
metrics_datadog_host STRING metrics_datadog_type=udp時推指標的目標地址 localhost
metrics_datadog_port INT metrics_datadog_type=udp 時推指標的端口 8125

具體能夠獲得哪些監控指標呢?有以下,注意全部指標都預先配置了指標前綴 metrics_prefix

指標 類型 說明
messages.succeeded Counters 成功發送到kafka的消息數量
messages.failed Counters 發送失敗的消息數量
row.count Counters 已處理的binlog行數,注意並不是全部binlog都發往kafka
messages.succeeded.meter Meters 消息成功發送到Kafka的速率
messages.failed.meter Meters 消息發送失敗到kafka的速率
row.meter Meters 行(row)從binlog鏈接器到達maxwell的速率
replication.lag Gauges 從數據庫事務提交到Maxwell處理該事務之間所用的時間(毫秒)
inflightmessages.count Gauges 當前正在處理的消息數(等待來自目的地的確認,或在消息以前)
message.publish.time Timers 向kafka發送record所用的時間(毫秒)
message.publish.age Timers 從數據庫產生事件到發送到Kafka之間的時間(毫秒),精確度爲+/-500ms
replication.queue.time Timers 將一個binlog事件送處處理隊列所用的時間(毫秒)

上述有些指標爲kafka特有的,並不支持全部的生產者。

實驗一番,經過 http 方式獲取監控指標

docker run -p 8080:8080 -it --rm zendesk/maxwell bin/maxwell --user='maxwell' \
    --password='123456' --host='10.100.97.246' --producer=kafka \
    --kafka.bootstrap.servers='10.100.97.246:9092' --kafka_topic=maxwell --log_level=debug \
    --metrics_type=http  --metrics_jvm=true --http_port=8080

上面的配置大部分與前面的相同,不一樣的有 -p 8080:8080 docker端口映射,以及 --metrics_type=http --metrics_jvm=true --http_port=8080,配置了經過http方式發佈指標,啓用收集JVM信息,端口爲8080,以後能夠經過 http://10.100.97.246:8080/metrics 即可獲取全部的指標

Maxwell監控

http 方式有四種後綴,分別對應四種不一樣的格式

endpoint 說明
/metrics 全部指標以JSON格式返回
/prometheus 全部指標以Prometheus格式返回(Prometheus是一套開源的監控&報警&時間序列數據庫的組合)
/healthcheck 返回Maxwell過去15分鐘是否健康
/ping 簡單的測試,返回 pong

若是是經過 JMX 的方式收集Maxwell監控指標,能夠 JAVA_OPTS 環境變量配置JMX訪問權限

export JAVA_OPTS="-Dcom.sun.management.jmxremote \
-Dcom.sun.management.jmxremote.port=9010 \
-Dcom.sun.management.jmxremote.local.only=false \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Djava.rmi.server.hostname=10.100.97.246"

多個Maxwell實例

在不一樣的配置下,Maxwell能夠在同一個主服務器上運行多個實例。若是但願讓生產者以不一樣的配置運行,例如未來自不一樣組的表(table)的事件投遞到不一樣的Topic中,這將很是有用。Maxwell的每一個實例都必須配置一個惟一的client_id,以便區分的binlog位置。

GTID 支持

Maxwell 從1.8.0版本開始支持基於GTID的複製(GTID-based replication),在GTID模式下,Maxwell將在主機更改後透明地選擇新的複製位置。

什麼是GTID Replication?

GTID (Global Transaction ID) 是對於一個已提交事務的編號,而且是一個全局惟一的編號。

從 MySQL 5.6.5 開始新增了一種基於 GTID 的複製方式。經過 GTID 保證了每一個在主庫上提交的事務在集羣中有一個惟一的ID。這種方式強化了數據庫的主備一致性,故障恢復以及容錯能力。

在原來基於二進制日誌的複製中,從庫須要告知主庫要從哪一個偏移量進行增量同步,若是指定錯誤會形成數據的遺漏,從而形成數據的不一致。藉助GTID,在發生主備切換的狀況下,MySQL的其它從庫能夠自動在新主庫上找到正確的複製位置,這大大簡化了複雜複製拓撲下集羣的維護,也減小了人爲設置複製位置發生誤操做的風險。另外,基於GTID的複製能夠忽略已經執行過的事務,減小了數據發生不一致的風險。

注意事項

timestamp column

maxwell對時間類型(datetime, timestamp, date)都是當作字符串處理的,這也是爲了保證數據一致(好比0000-00-00 00:00:00這樣的時間在timestamp裏是非法的,但mysql卻認,解析成java或者python類型就是null/None)。

若是MySQL表上的字段是 timestamp 類型,是有時區的概念,binlog解析出來的是標準UTC時間,但用戶看到的是本地時間。好比 f_create_time timestamp 建立時間是北京時間 2018-01-05 21:01:01,那麼mysql實際存儲的是 2018-01-05 13:01:01,binlog裏面也是這個時間字符串。若是不作消費者不作時區轉換,會少8個小時。

與其每一個客戶端都要考慮這個問題,我以爲更合理的作法是提供時區參數,而後maxwell自動處理時區問題,不然要麼客戶端先須要知道哪些列是timestamp類型,或者鏈接上原庫緩存上這些類型。

binary column

maxwell能夠處理binary類型的列,如blob、varbinary,它的作法就是對二進制列使用 base64_encode,當作字符串輸出到json。消費者拿到這個列數據後,不能直接拼裝,須要 base64_decode

表結構不一樣步

若是是拿比較老的binlog,放到新的mysql server上去用maxwell拉去,有可能表結構已經發生了變化,好比binlog裏面字段比 schema_host 裏面的字段多一個。目前這種狀況沒有發現異常,好比阿里RDS默認會爲 無主鍵無惟一索引的表,增長一個__##alibaba_rds_rowid##__,在 show create tableschema 裏面都看不到這個隱藏主鍵,但binlog裏面會有,同步到從庫。

另外咱們有經過git去管理結構版本,若是真有這種場景,也能夠應對。

大事務binlog

當一個事物產生的binlog量很是大的時候,好比遷移日表數據,maxwell爲了控制內存使用,會自動將處理不過來的binlog放到文件系統

Using kafka version: 0.11.0.1
21:16:07,109 WARN  MaxwellMetrics - Metrics will not be exposed: metricsReportingType not configured.
21:16:07,380 INFO  SchemaStoreSchema - Creating maxwell database
21:16:07,540 INFO  Maxwell - Maxwell v?? is booting (RabbitmqProducer), starting at Position[BinlogPosition[mysql-bin.006235:24980714],
lastHeartbeat=0]
21:16:07,649 INFO  AbstractSchemaStore - Maxwell is capturing initial schema
21:16:08,267 INFO  BinlogConnectorReplicator - Setting initial binlog pos to: mysql-bin.006235:24980714
21:16:08,324 INFO  BinaryLogClient - Connected to rm-xxxxxxxxxxx.mysql.rds.aliyuncs.com:3306 at mysql-bin.006235/24980714 (sid:637
9, cid:9182598)
21:16:08,325 INFO  BinlogConnectorLifecycleListener - Binlog connected.
03:15:36,104 INFO  ListWithDiskBuffer - Overflowed in-memory buffer, spilling over into /tmp/maxwell7935334910787514257events
03:17:14,880 INFO  ListWithDiskBuffer - Overflowed in-memory buffer, spilling over into /tmp/maxwell3143086481692829045events

可是遇到另一個問題,overflow隨後就出現異常 EventDataDeserializationException: Failed to deserialize data of EventHeaderV4,當我另起一個maxwell指點以前的binlog postion開始解析,卻有沒有拋異常。過後的數據也代表並無數據丟失。

問題產生的緣由還不明,Caused by: java.net.SocketException: Connection reset,感受像讀取 binlog 流的時候還沒讀取到完整的event,異常關閉了鏈接。這個問題比較頑固,github上面相似問題都沒有達到明確的解決。(這也從側面告訴咱們,大表數據遷移,也要批量進行,不要一個insert into .. select 搞定)

03:18:20,586 INFO  ListWithDiskBuffer - Overflowed in-memory buffer, spilling over into /tmp/maxwell5229190074667071141events
03:19:31,289 WARN  BinlogConnectorLifecycleListener - Communication failure.
com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{time
stamp=1514920657000, eventType=WRITE_ROWS, serverId=2115082720, headerLength=19, dataLength=8155, nextPosition=520539918, flags=0}
        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:216) ~[mys
ql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:184) ~[mysql-binlog-c
onnector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:890) [mysql-binlog-connector-java-0
.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:559) [mysql-binlog-connector-java-0.13.0.jar:0.13
.0]
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:793) [mysql-binlog-connector-java-0.13.0.jar:0.13.0
]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
Caused by: java.net.SocketException: Connection reset
        at java.net.SocketInputStream.read(SocketInputStream.java:210) ~[?:1.8.0_121]
        at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[?:1.8.0_121]
        at com.github.shyiko.mysql.binlog.io.BufferedSocketInputStream.read(BufferedSocketInputStream.java:51) ~[mysql-binlog-connector-
java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readWithinBlockBoundaries(ByteArrayInputStream.java:202) ~[mysql-binlo
g-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:184) ~[mysql-binlog-connector-java-0.13
.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readInteger(ByteArrayInputStream.java:46) ~[mysql-binlog-connector-jav
a-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeLong(AbstractRowsEventDataD
eserializer.java:212) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeCell(AbstractRowsEventDataD
eserializer.java:150) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeRow(AbstractRowsEventDataDeserializer.java:132) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserializeRows(WriteRowsEventDataDeserializer.java:64) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserialize(WriteRowsEventDataDeserializer.java:56) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserialize(WriteRowsEventDataDeserializer.java:32) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:210) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        ... 5 more
03:19:31,514 INFO  BinlogConnectorLifecycleListener - Binlog disconnected.
03:19:31,590 WARN  BinlogConnectorReplicator - replicator stopped at position: mysql-bin.006236:520531744 -- restarting
03:19:31,595 INFO  BinaryLogClient - Connected to rm-xxxxxx.mysql.rds.aliyuncs.com:3306 at mysql-bin.006236/520531744 (sid:6379, cid:9220521)

tableMapCache

前面講過,若是我只想獲取某幾個表的binlog變動,須要用 include_tables 來過濾,但若是mysql server上如今刪了一個表t1,但個人binlog是從昨天開始讀取,被刪的那個表t1在maxwell啓動的時候是拉取不到表結構的。而後昨天的binlog裏面有 t1 的變動,由於找不到表結構給來組裝成json,會拋異常。

手動在 maxwell.tables/columns 裏面插入記錄是可行的。但這個問題的根本是,maxwell在binlog過濾的時候,只在處理row_event的時候,而對 tableMapCache 要求binlog裏面的全部表都要有。

本身(seanlook)提交了一個commit,能夠在作 tableMapCache 的時候也僅要求緩存 include_dbs/tables 這些表: https://github.com/seanlook/maxwell/commit/2618b70303078bf910a1981b69943cca75ee04fb

提升消費性能

在用rabbitmq時,routing_key%db%.%table%,但某些表產生的binlog增量很是大,就會致使各隊列消息量很不平均,目前由於還沒作到事務xid或者thread_id級別的併發回放,因此最小隊列粒度也是表,儘可能單獨放一個隊列,其它數據量小的合在一塊兒。

binlog

Maxwell 在 maxwell 庫中維護了 binlog 的位移等信息,因爲一些緣由譬如 reset master;,致使 maxwell 庫中的記錄與實際的binlog對不上,這時將報異常,這是能夠手動修正binlog位移或者直接清空/刪除 maxwell 庫重建

com.github.shyiko.mysql.binlog.network.ServerException: Could not find first log file name in binary log index file
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:885)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:564)
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:796)
        at java.lang.Thread.run(Thread.java:748)

以及

com.github.shyiko.mysql.binlog.network.ServerException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event 'mysql-bin.000001' at 760357, the last event read from './mysql-bin.000001' at 1888540, the last byte read from './mysql-bin.000001' at 1888540.
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:885)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:564)
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:796)
        at java.lang.Thread.run(Thread.java:748)

參考文檔

關注_小旋鋒_微信公衆號

相關文章
相關標籤/搜索