Maxwell是一個能實時讀取MySQL二進制日誌binlog,並生成 JSON 格式的消息,做爲生產者發送給 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平臺的應用程序。它的常見應用場景有ETL、維護緩存、收集表級別的dml指標、增量到搜索引擎、數據分區遷移、切庫binlog回滾方案等。官網(http://maxwells-daemon.io)、GitHub(https://github.com/zendesk/maxwell)java
Maxwell主要提供了下列功能:node
SELECT * FROM table
的方式進行全量數據初始化除了Maxwell外,目前經常使用的MySQL Binlog解析工具主要有阿里的canal、mysql_streamer,三個工具對好比下:python
canal 由Java開發,分爲服務端和客戶端,擁有衆多的衍生應用,性能穩定,功能強大;canal 須要本身編寫客戶端來消費canal解析到的數據。mysql
maxwell相對於canal的優點是使用簡單,它直接將數據變動輸出爲json字符串,不須要再編寫客戶端。linux
首先MySQL須要先啓用binlog,關於什麼是MySQL binlog,能夠參考文章《MySQL Binlog 介紹》git
$ vi my.cnf
[mysqld]
server_id=1
log-bin=master
binlog_format=row
複製代碼
建立Maxwell用戶,並賦予 maxwell 庫的一些權限github
CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
複製代碼
使用 maxwell 以前須要先啓動 kafka正則表達式
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
tar -xzf kafka_2.11-2.1.0.tgz
cd kafka_2.11-2.1.0
# 啓動Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
複製代碼
單機啓動 kafka 以前,須要修改一下配置文件,打開配置文件 vi config/server.properties
,在文件最後加入 advertised.host.name
的配置,值爲 kafka 所在機器的IPsql
advertised.host.name=10.100.97.246
複製代碼
否則後面經過 docker 啓動 maxwell 將會報異常(其中的 hadoop2 是個人主機名)
17:45:21,446 DEBUG NetworkClient - [Producer clientId=producer-1] Error connecting to node hadoop2:9092 (id: 0 rack: null)
java.io.IOException: Can't resolve address: hadoop2:9092 at org.apache.kafka.common.network.Selector.connect(Selector.java:217) ~[kafka-clients-1.0.0.jar:?] at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:793) [kafka-clients-1.0.0.jar:?] at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:230) [kafka-clients-1.0.0.jar:?] at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:263) [kafka-clients-1.0.0.jar:?] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238) [kafka-clients-1.0.0.jar:?] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:176) [kafka-clients-1.0.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181] Caused by: java.nio.channels.UnresolvedAddressException at sun.nio.ch.Net.checkAddress(Net.java:101) ~[?:1.8.0_181] at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) ~[?:1.8.0_181] at org.apache.kafka.common.network.Selector.connect(Selector.java:214) ~[kafka-clients-1.0.0.jar:?] ... 6 more 複製代碼
接着能夠啓動 kafka
bin/kafka-server-start.sh config/server.properties
複製代碼
測試 kafka
# 建立一個 topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# 列出全部 topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
# 啓動一個生產者,而後隨意發送一些消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
# 在另外一個終端啓動一下消費者,觀察所消費的消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
複製代碼
經過 docker 快速安裝並使用 Maxwell (固然以前須要自行安裝 docker)
# 拉取鏡像
docker pull zendesk/maxwell
# 啓動maxwell,並將解析出的binlog輸出到控制檯
docker run -ti --rm zendesk/maxwell bin/maxwell --user='maxwell' --password='123456' --host='10.100.97.246' --producer=stdout
複製代碼
測試Maxwell,首先建立一張簡單的表,而後增改刪數據
CREATE TABLE `test` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`age` int(11) DEFAULT NULL,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
insert into test values(1,22,"小旋鋒");
update test set name='whirly' where id=1;
delete from test where id=1;
複製代碼
觀察docker控制檯的輸出,從輸出的日誌中能夠看出Maxwell解析出的binlog的JSON字符串的格式
{"database":"test","table":"test","type":"insert","ts":1552153502,"xid":832,"commit":true,"data":{"id":1,"age":22,"name":"小旋鋒"}}
{"database":"test","table":"test","type":"update","ts":1552153502,"xid":833,"commit":true,"data":{"id":1,"age":22,"name":"whirly"},"old":{"name":"小旋鋒"}}
{"database":"test","table":"test","type":"delete","ts":1552153502,"xid":834,"commit":true,"data":{"id":1,"age":22,"name":"whirly"}}
複製代碼
輸出到 Kafka,關閉 docker,從新設置啓動參數
docker run -it --rm zendesk/maxwell bin/maxwell --user='maxwell' \
--password='123456' --host='10.100.97.246' --producer=kafka \
--kafka.bootstrap.servers='10.100.97.246:9092' --kafka_topic=maxwell --log_level=debug
複製代碼
而後啓動一個消費者來消費 maxwell topic的消息,觀察其輸出;再一次執行增改刪數據的SQL,仍然能夠獲得相同的輸出
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic maxwell
複製代碼
與輸出格式相關的配置以下
選項 | 參數值 | 描述 | 默認值 |
---|---|---|---|
output_binlog_position |
BOOLEAN | 是否包含 binlog position | false |
output_gtid_position |
BOOLEAN | 是否包含 gtid position | false |
output_commit_info |
BOOLEAN | 是否包含 commit and xid | true |
output_xoffset |
BOOLEAN | 是否包含 virtual tx-row offset | false |
output_nulls |
BOOLEAN | 是否包含值爲NULL的字段 | true |
output_server_id |
BOOLEAN | 是否包含 server_id | false |
output_thread_id |
BOOLEAN | 是否包含 thread_id | false |
output_schema_id |
BOOLEAN | 是否包含 schema_id | false |
output_row_query |
BOOLEAN | 是否包含 INSERT/UPDATE/DELETE 語句. Mysql須要開啓 binlog_rows_query_log_events |
false |
output_ddl |
BOOLEAN | 是否包含 DDL (table-alter, table-create, etc) events | false |
output_null_zerodates |
BOOLEAN | 是否將 '0000-00-00' 轉換爲 null? | false |
選項 | 參數值 | 描述 | 默認值 |
---|---|---|---|
config |
配置文件 config.properties 的路徑 |
||
log_level |
[debug | info | warn | error] |
日誌級別 | info |
daemon |
指定Maxwell實例做爲守護進程到後臺運行 | ||
env_config_prefix |
STRING | 匹配該前綴的環境變量將被視爲配置值 |
能夠把Maxwell的啓動參數寫到一個配置文件 config.properties
中,而後經過 config 選項指定,bin/maxwell --config config.properties
user=maxwell
password=123456
host=10.100.97.246
producer=kafka
kafka.bootstrap.servers=10.100.97.246:9092
kafka_topic=maxwell
複製代碼
Maxwell 根據用途將 MySQL 劃分爲3種角色:
host
:主機,建maxwell庫表,存儲捕獲到的schema等信息
replication_host
:複製主機,Event監聽,讀取該主機binlog
host
和replication_host
分開,能夠避免 replication_user
往生產庫裏寫數據schema_host
:schema主機,捕獲表結構schema的主機
schema_host
通常用不到,但在binlog-proxy
場景下就很實用。好比要將已經離線的binlog經過maxwell生成json流,因而自建一個mysql server裏面沒有結構,只用於發送binlog,此時表機構就能夠制動從 schema_host 獲取。一般,這三個主機都是同一個,schema_host
只在有 replication_host
的時候使用。
與MySQL相關的有下列配置
選項 | 參數值 | 描述 | 默認值 |
---|---|---|---|
host |
STRING | mysql 地址 | localhost |
user |
STRING | mysql 用戶名 | |
password |
STRING | mysql 密碼 | (no password) |
port |
INT | mysql 端口 3306 | |
jdbc_options |
STRING | mysql jdbc connection options | DEFAULT_JDBC_OPTS |
ssl |
SSL_OPT | SSL behavior for mysql cx | DISABLED |
schema_database |
STRING | Maxwell用於維護的schema和position將使用的數據庫 | maxwell |
client_id |
STRING | 用於標識Maxwell實例的惟一字符串 | maxwell |
replica_server_id |
LONG | 用於標識Maxwell實例的惟一數字 | 6379 (see notes) |
master_recovery |
BOOLEAN | enable experimental master recovery code | false |
gtid_mode |
BOOLEAN | 是否開啓基於GTID的複製 | false |
recapture_schema |
BOOLEAN | 從新捕獲最新的表結構(schema),不可在 config.properties中配置 | false |
replication_host |
STRING | server to replicate from. See split server roles | schema-store host |
replication_password |
STRING | password on replication server | (none) |
replication_port |
INT | port on replication server | 3306 |
replication_user |
STRING | user on replication server | |
replication_ssl |
SSL_OPT | SSL behavior for replication cx cx | DISABLED |
schema_host |
STRING | server to capture schema from. See split server roles | schema-store host |
schema_password |
STRING | password on schema-capture server | (none) |
schema_port |
INT | port on schema-capture server | 3306 |
schema_user |
STRING | user on schema-capture server | |
schema_ssl |
SSL_OPT | SSL behavior for schema-capture server | DISABLED |
僅介紹kafka,其餘的生產者的配置詳見官方文檔。
kafka是maxwell支持最完善的一個生產者,而且內置了多個版本的kafka客戶端(0.8.2.2, 0.9.0.1, 0.10.0.1, 0.10.2.1 or 0.11.0.1, 1.0.0.),默認 kafka_version=1.0.0(當前Maxwell版本1.20.0)
Maxwell 會將消息投遞到Kafka的Topic中,該Topic由 kafka_topic
選項指定,默認值爲 maxwell
,除了指定爲靜態的Topic,還能夠指定爲動態的,譬如 namespace_%{database}_%{table}
,%{database}
和 %{table}
將被具體的消息的 database 和 table 替換。
Maxwell 讀取配置時,若是配置項是以 kafka.
開頭,那麼該配置將設置到 Kafka Producer 客戶端的鏈接參數中去,譬如
kafka.acks = 1
kafka.compression.type = snappy
kafka.retries=5
複製代碼
下面是Maxwell通用生產者和Kafka生產者的配置參數
選項 | 參數值 | 描述 | 默認值 |
---|---|---|---|
producer |
PRODUCER_TYPE | 生產者類型 | stdout |
custom_producer.factory |
CLASS_NAME | 自定義消費者的工廠類 | |
producer_ack_timeout |
PRODUCER_ACK_TIMEOUT | 異步消費認爲消息丟失的超時時間(毫秒ms) | |
producer_partition_by |
PARTITION_BY | 輸入到kafka/kinesis的分區函數 | database |
producer_partition_columns |
STRING | 若按列分區,以逗號分隔的列名稱 | |
producer_partition_by_fallback |
PARTITION_BY_FALLBACK | producer_partition_by=column 時須要,當列不存在是使用 |
|
ignore_producer_error |
BOOLEAN | 爲false時,在kafka/kinesis發生錯誤時退出程序;爲true時,僅記錄日誌 See also dead_letter_topic |
true |
kafka.bootstrap.servers |
STRING | kafka 集羣列表,HOST:PORT[,HOST:PORT] |
|
kafka_topic |
STRING | kafka topic | maxwell |
dead_letter_topic |
STRING | 詳見官方文檔 | |
kafka_version |
KAFKA_VERSION | 指定maxwell的 kafka 生產者客戶端版本,不可在config.properties中配置 | 0.11.0.1 |
kafka_partition_hash |
[default | murmur3] |
選擇kafka分區時使用的hash方法 | default |
kafka_key_format |
[array | hash] |
how maxwell outputs kafka keys, either a hash or an array of hashes | hash |
ddl_kafka_topic |
STRING | 當output_ddl 爲true時, 全部DDL的消息都將投遞到該topic |
kafka_topic |
Maxwell 能夠經過 --filter
配置項來指定過濾規則,經過 exclude
排除,經過 include
包含,值能夠爲具體的數據庫、數據表、數據列,甚至用 Javascript 來定義複雜的過濾規則;能夠用正則表達式描述,有幾個來自官網的例子
# 僅匹配foodb數據庫的tbl表和全部table_數字的表
--filter='exclude: foodb.*, include: foodb.tbl, include: foodb./table_\d+/'
# 排除全部庫全部表,僅匹配db1數據庫
--filter = 'exclude: *.*, include: db1.*'
# 排除含db.tbl.col列值爲reject的全部更新
--filter = 'exclude: db.tbl.col = reject'
# 排除任何包含col_a列的更新
--filter = 'exclude: *.*.col_a = *'
# blacklist 黑名單,徹底排除bad_db數據庫,若要恢復,必須刪除maxwell庫
--filter = 'blacklist: bad_db.*'
複製代碼
Maxwell 啓動後將從maxwell庫中獲取上一次中止時position,從該斷點處開始讀取binlog。若是binlog已經清除了,那麼怎樣能夠經過maxwell把整張表都複製出來呢?也就是數據初始化該怎麼作?
對整張表進行操做,人爲地產生binlog?譬如找一個不影響業務的字段譬如update_time,而後加一秒,再減一秒?
update test set update_time = DATE_ADD(update_time,intever 1 second);
update test set update_time = DATE_ADD(update_time,intever -1 second);
複製代碼
這樣明顯存在幾個大問題:
針對數據初始化的問題,Maxwell 提供了一個命令工具 maxwell-bootstrap
幫助咱們完成數據初始化,maxwell-bootstrap
是基於 SELECT * FROM table
的方式進行全量數據初始化,不會產生多餘的binlog!
這個工具備下面這些參數:
參數 | 說明 |
---|---|
--log_level LOG_LEVEL |
日誌級別(DEBUG, INFO, WARN or ERROR) |
--user USER |
mysql 用戶名 |
--password PASSWORD |
mysql 密碼 |
--host HOST |
mysql 地址 |
--port PORT |
mysql 端口 |
--database DATABASE |
要bootstrap的表所在的數據庫 |
--table TABLE |
要引導的表 |
--where WHERE_CLAUSE |
設置過濾條件 |
--client_id CLIENT_ID |
指定執行引導操做的Maxwell實例 |
實驗一番,下面將引導 test
數據庫中 test
表,首先是準備幾條測試用的數據
INSERT INTO `test` VALUES (1, 1, '1');
INSERT INTO `test` VALUES (2, 2, '2');
INSERT INTO `test` VALUES (3, 3, '3');
INSERT INTO `test` VALUES (4, 4, '4');
複製代碼
而後 reset master;
清空binlog,刪除 maxwell 庫中的表。接着使用快速開始中的命令,啓動Kafka、Maxwell和Kafka消費者,而後啓動 maxwell-bootstrap
docker run -it --rm zendesk/maxwell bin/maxwell-bootstrap --user maxwell \
--password 123456 --host 10.100.97.246 --database test --table test --client_id maxwell
複製代碼
注意:--bootstrapper=sync
時,在處理bootstrap時,會阻塞正常的binlog解析;--bootstrapper=async
時,不會阻塞。
也能夠執行下面的SQL,在 maxwell.bootstrap
表中插入記錄,手動觸發
insert into maxwell.bootstrap (database_name, table_name) values ('test', 'test');
複製代碼
就能夠在 kafka 消費者端看見引導過來的數據了
{"database":"maxwell","table":"bootstrap","type":"insert","ts":1552199115,"xid":36738,"commit":true,"data":{"id":3,"database_name":"test","table_name":"test","where_clause":null,"is_complete":0,"inserted_rows":0,"total_rows":0,"created_at":null,"started_at":null,"completed_at":null,"binlog_file":null,"binlog_position":0,"client_id":"maxwell"}}
{"database":"test","table":"test","type":"bootstrap-start","ts":1552199115,"data":{}}
{"database":"test","table":"test","type":"bootstrap-insert","ts":1552199115,"data":{"id":1,"age":1,"name":"1"}}
{"database":"test","table":"test","type":"bootstrap-insert","ts":1552199115,"data":{"id":2,"age":2,"name":"2"}}
{"database":"test","table":"test","type":"bootstrap-insert","ts":1552199115,"data":{"id":3,"age":3,"name":"3"}}
{"database":"test","table":"test","type":"bootstrap-insert","ts":1552199115,"data":{"id":4,"age":4,"name":"4"}}
{"database":"maxwell","table":"bootstrap","type":"update","ts":1552199115,"xid":36756,"commit":true,"data":{"id":3,"database_name":"test","table_name":"test","where_clause":null,"is_complete":1,"inserted_rows":4,"total_rows":0,"created_at":null,"started_at":"2019-03-10 14:25:15","completed_at":"2019-03-10 14:25:15","binlog_file":"mysql-bin.000001","binlog_position":64446,"client_id":"maxwell"},"old":{"is_complete":0,"inserted_rows":1,"completed_at":null}}
{"database":"test","table":"test","type":"bootstrap-complete","ts":1552199115,"data":{}}
複製代碼
中間的4條即是 test.test
的binlog數據了,注意這裏的 type 是 bootstrap-insert
,而不是 insert
。
而後再一次查看binlog,show binlog events;
,會發現只有與 maxwell
相關的binlog,並無 test.test
相關的binlog,因此 maxwell-bootstrap
命令並不會產生多餘的 binlog,當數據表的數量很大時,這個好處會更加明顯
Bootstrap 的過程是 bootstrap-start -> bootstrap-insert -> bootstrap-complete
,其中,start和complete的data字段爲空,不攜帶數據。
在進行bootstrap過程當中,若是maxwell崩潰,重啓時,bootstrap會徹底從新開始,無論以前進行到多少,若不但願這樣,能夠到數據庫中設置 is_complete
字段值爲1(表示完成),或者刪除該行
Maxwell 提供了 base logging mechanism, JMX, HTTP or by push to Datadog
這四種監控方式,與監控相關的配置項有下列這些:
選項 | 參數值 | 描述 | 默認值 |
---|---|---|---|
metrics_prefix |
STRING | 指標的前綴 | MaxwellMetrics |
metrics_type |
[slf4j | jmx | http | datadog] |
發佈指標的方式 | |
metrics_jvm |
BOOLEAN | 是否收集JVM信息 | false |
metrics_slf4j_interval |
SECONDS | 將指標記錄到日誌的頻率,metrics_type 須配置爲slf4j |
60 |
http_port |
INT | metrics_type 爲http時,發佈指標綁定的端口 |
8080 |
http_path_prefix |
STRING | http的路徑前綴 | / |
http_bind_address |
STRING | http發佈指標綁定的地址 | all addresses |
http_diagnostic |
BOOLEAN | http是否開啓diagnostic後綴 | false |
http_diagnostic_timeout |
MILLISECONDS | http diagnostic 響應超時時間 | 10000 |
metrics_datadog_type |
[udp | http] |
metrics_type 爲datadog時發佈指標的方式 |
udp |
metrics_datadog_tags |
STRING | 提供給 datadog 的標籤,如 tag1:value1,tag2:value2 | |
metrics_datadog_interval |
INT | 推指標到datadog的頻率,單位秒 | 60 |
metrics_datadog_apikey |
STRING | 當 metrics_datadog_type=http 時datadog用的api key |
|
metrics_datadog_host |
STRING | 當metrics_datadog_type=udp 時推指標的目標地址 |
localhost |
metrics_datadog_port |
INT | 當metrics_datadog_type=udp 時推指標的端口 |
8125 |
具體能夠獲得哪些監控指標呢?有以下,注意全部指標都預先配置了指標前綴 metrics_prefix
指標 | 類型 | 說明 |
---|---|---|
messages.succeeded |
Counters | 成功發送到kafka的消息數量 |
messages.failed |
Counters | 發送失敗的消息數量 |
row.count |
Counters | 已處理的binlog行數,注意並不是全部binlog都發往kafka |
messages.succeeded.meter |
Meters | 消息成功發送到Kafka的速率 |
messages.failed.meter |
Meters | 消息發送失敗到kafka的速率 |
row.meter |
Meters | 行(row)從binlog鏈接器到達maxwell的速率 |
replication.lag |
Gauges | 從數據庫事務提交到Maxwell處理該事務之間所用的時間(毫秒) |
inflightmessages.count |
Gauges | 當前正在處理的消息數(等待來自目的地的確認,或在消息以前) |
message.publish.time |
Timers | 向kafka發送record所用的時間(毫秒) |
message.publish.age |
Timers | 從數據庫產生事件到發送到Kafka之間的時間(毫秒),精確度爲+/-500ms |
replication.queue.time |
Timers | 將一個binlog事件送處處理隊列所用的時間(毫秒) |
上述有些指標爲kafka特有的,並不支持全部的生產者。
實驗一番,經過 http 方式獲取監控指標
docker run -p 8080:8080 -it --rm zendesk/maxwell bin/maxwell --user='maxwell' \
--password='123456' --host='10.100.97.246' --producer=kafka \
--kafka.bootstrap.servers='10.100.97.246:9092' --kafka_topic=maxwell --log_level=debug \
--metrics_type=http --metrics_jvm=true --http_port=8080
複製代碼
上面的配置大部分與前面的相同,不一樣的有 -p 8080:8080
docker端口映射,以及 --metrics_type=http --metrics_jvm=true --http_port=8080
,配置了經過http方式發佈指標,啓用收集JVM信息,端口爲8080,以後能夠經過 http://10.100.97.246:8080/metrics
即可獲取全部的指標
http 方式有四種後綴,分別對應四種不一樣的格式
endpoint | 說明 |
---|---|
/metrics |
全部指標以JSON格式返回 |
/prometheus |
全部指標以Prometheus格式返回(Prometheus是一套開源的監控&報警&時間序列數據庫的組合) |
/healthcheck |
返回Maxwell過去15分鐘是否健康 |
/ping |
簡單的測試,返回 pong |
若是是經過 JMX 的方式收集Maxwell監控指標,能夠 JAVA_OPTS
環境變量配置JMX訪問權限
export JAVA_OPTS="-Dcom.sun.management.jmxremote \ -Dcom.sun.management.jmxremote.port=9010 \ -Dcom.sun.management.jmxremote.local.only=false \ -Dcom.sun.management.jmxremote.authenticate=false \ -Dcom.sun.management.jmxremote.ssl=false \ -Djava.rmi.server.hostname=10.100.97.246"
複製代碼
在不一樣的配置下,Maxwell能夠在同一個主服務器上運行多個實例。若是但願讓生產者以不一樣的配置運行,例如未來自不一樣組的表(table)的事件投遞到不一樣的Topic中,這將很是有用。Maxwell的每一個實例都必須配置一個惟一的client_id,以便區分的binlog位置。
Maxwell 從1.8.0版本開始支持基於GTID的複製(GTID-based replication),在GTID模式下,Maxwell將在主機更改後透明地選擇新的複製位置。
什麼是GTID Replication?
GTID (Global Transaction ID) 是對於一個已提交事務的編號,而且是一個全局惟一的編號。
從 MySQL 5.6.5 開始新增了一種基於 GTID 的複製方式。經過 GTID 保證了每一個在主庫上提交的事務在集羣中有一個惟一的ID。這種方式強化了數據庫的主備一致性,故障恢復以及容錯能力。
在原來基於二進制日誌的複製中,從庫須要告知主庫要從哪一個偏移量進行增量同步,若是指定錯誤會形成數據的遺漏,從而形成數據的不一致。藉助GTID,在發生主備切換的狀況下,MySQL的其它從庫能夠自動在新主庫上找到正確的複製位置,這大大簡化了複雜複製拓撲下集羣的維護,也減小了人爲設置複製位置發生誤操做的風險。另外,基於GTID的複製能夠忽略已經執行過的事務,減小了數據發生不一致的風險。
maxwell對時間類型(datetime, timestamp, date)都是當作字符串處理的,這也是爲了保證數據一致(好比0000-00-00 00:00:00
這樣的時間在timestamp裏是非法的,但mysql卻認,解析成java或者python類型就是null/None)。
若是MySQL表上的字段是 timestamp 類型,是有時區的概念,binlog解析出來的是標準UTC時間,但用戶看到的是本地時間。好比 f_create_time timestamp
建立時間是北京時間 2018-01-05 21:01:01
,那麼mysql實際存儲的是 2018-01-05 13:01:01
,binlog裏面也是這個時間字符串。若是不作消費者不作時區轉換,會少8個小時。
與其每一個客戶端都要考慮這個問題,我以爲更合理的作法是提供時區參數,而後maxwell自動處理時區問題,不然要麼客戶端先須要知道哪些列是timestamp類型,或者鏈接上原庫緩存上這些類型。
maxwell能夠處理binary類型的列,如blob、varbinary,它的作法就是對二進制列使用 base64_encode
,當作字符串輸出到json。消費者拿到這個列數據後,不能直接拼裝,須要 base64_decode
。
若是是拿比較老的binlog,放到新的mysql server上去用maxwell拉去,有可能表結構已經發生了變化,好比binlog裏面字段比 schema_host
裏面的字段多一個。目前這種狀況沒有發現異常,好比阿里RDS默認會爲 無主鍵無惟一索引的表,增長一個__##alibaba_rds_rowid##__
,在 show create table
和 schema
裏面都看不到這個隱藏主鍵,但binlog裏面會有,同步到從庫。
另外咱們有經過git去管理結構版本,若是真有這種場景,也能夠應對。
當一個事物產生的binlog量很是大的時候,好比遷移日表數據,maxwell爲了控制內存使用,會自動將處理不過來的binlog放到文件系統
Using kafka version: 0.11.0.1
21:16:07,109 WARN MaxwellMetrics - Metrics will not be exposed: metricsReportingType not configured.
21:16:07,380 INFO SchemaStoreSchema - Creating maxwell database
21:16:07,540 INFO Maxwell - Maxwell v?? is booting (RabbitmqProducer), starting at Position[BinlogPosition[mysql-bin.006235:24980714],
lastHeartbeat=0]
21:16:07,649 INFO AbstractSchemaStore - Maxwell is capturing initial schema
21:16:08,267 INFO BinlogConnectorReplicator - Setting initial binlog pos to: mysql-bin.006235:24980714
21:16:08,324 INFO BinaryLogClient - Connected to rm-xxxxxxxxxxx.mysql.rds.aliyuncs.com:3306 at mysql-bin.006235/24980714 (sid:637
9, cid:9182598)
21:16:08,325 INFO BinlogConnectorLifecycleListener - Binlog connected.
03:15:36,104 INFO ListWithDiskBuffer - Overflowed in-memory buffer, spilling over into /tmp/maxwell7935334910787514257events
03:17:14,880 INFO ListWithDiskBuffer - Overflowed in-memory buffer, spilling over into /tmp/maxwell3143086481692829045events
複製代碼
可是遇到另一個問題,overflow隨後就出現異常 EventDataDeserializationException: Failed to deserialize data of EventHeaderV4
,當我另起一個maxwell指點以前的binlog postion開始解析,卻有沒有拋異常。過後的數據也代表並無數據丟失。
問題產生的緣由還不明,Caused by: java.net.SocketException: Connection reset
,感受像讀取 binlog 流的時候還沒讀取到完整的event,異常關閉了鏈接。這個問題比較頑固,github上面相似問題都沒有達到明確的解決。(這也從側面告訴咱們,大表數據遷移,也要批量進行,不要一個insert into .. select
搞定)
03:18:20,586 INFO ListWithDiskBuffer - Overflowed in-memory buffer, spilling over into /tmp/maxwell5229190074667071141events
03:19:31,289 WARN BinlogConnectorLifecycleListener - Communication failure.
com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{time
stamp=1514920657000, eventType=WRITE_ROWS, serverId=2115082720, headerLength=19, dataLength=8155, nextPosition=520539918, flags=0}
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:216) ~[mys
ql-binlog-connector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:184) ~[mysql-binlog-c
onnector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:890) [mysql-binlog-connector-java-0
.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:559) [mysql-binlog-connector-java-0.13.0.jar:0.13
.0]
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:793) [mysql-binlog-connector-java-0.13.0.jar:0.13.0
]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:210) ~[?:1.8.0_121]
at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[?:1.8.0_121]
at com.github.shyiko.mysql.binlog.io.BufferedSocketInputStream.read(BufferedSocketInputStream.java:51) ~[mysql-binlog-connector-
java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readWithinBlockBoundaries(ByteArrayInputStream.java:202) ~[mysql-binlo
g-connector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:184) ~[mysql-binlog-connector-java-0.13
.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readInteger(ByteArrayInputStream.java:46) ~[mysql-binlog-connector-jav
a-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeLong(AbstractRowsEventDataD
eserializer.java:212) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeCell(AbstractRowsEventDataD
eserializer.java:150) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeRow(AbstractRowsEventDataDeserializer.java:132) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserializeRows(WriteRowsEventDataDeserializer.java:64) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserialize(WriteRowsEventDataDeserializer.java:56) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserialize(WriteRowsEventDataDeserializer.java:32) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:210) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
... 5 more
03:19:31,514 INFO BinlogConnectorLifecycleListener - Binlog disconnected.
03:19:31,590 WARN BinlogConnectorReplicator - replicator stopped at position: mysql-bin.006236:520531744 -- restarting
03:19:31,595 INFO BinaryLogClient - Connected to rm-xxxxxx.mysql.rds.aliyuncs.com:3306 at mysql-bin.006236/520531744 (sid:6379, cid:9220521)
複製代碼
前面講過,若是我只想獲取某幾個表的binlog變動,須要用 include_tables 來過濾,但若是mysql server上如今刪了一個表t1,但個人binlog是從昨天開始讀取,被刪的那個表t1在maxwell啓動的時候是拉取不到表結構的。而後昨天的binlog裏面有 t1 的變動,由於找不到表結構給來組裝成json,會拋異常。
手動在 maxwell.tables/columns
裏面插入記錄是可行的。但這個問題的根本是,maxwell在binlog過濾的時候,只在處理row_event的時候,而對 tableMapCache 要求binlog裏面的全部表都要有。
本身(seanlook)提交了一個commit,能夠在作 tableMapCache 的時候也僅要求緩存 include_dbs/tables 這些表: https://github.com/seanlook/maxwell/commit/2618b70303078bf910a1981b69943cca75ee04fb
在用rabbitmq時,routing_key
是 %db%.%table%
,但某些表產生的binlog增量很是大,就會致使各隊列消息量很不平均,目前由於還沒作到事務xid或者thread_id級別的併發回放,因此最小隊列粒度也是表,儘可能單獨放一個隊列,其它數據量小的合在一塊兒。
Maxwell 在 maxwell 庫中維護了 binlog 的位移等信息,因爲一些緣由譬如 reset master;
,致使 maxwell 庫中的記錄與實際的binlog對不上,這時將報異常,這是能夠手動修正binlog位移或者直接清空/刪除 maxwell 庫重建
com.github.shyiko.mysql.binlog.network.ServerException: Could not find first log file name in binary log index file
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:885)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:564)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:796)
at java.lang.Thread.run(Thread.java:748)
複製代碼
以及
com.github.shyiko.mysql.binlog.network.ServerException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event 'mysql-bin.000001' at 760357, the last event read from './mysql-bin.000001' at 1888540, the last byte read from './mysql-bin.000001' at 1888540.
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:885)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:564)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:796)
at java.lang.Thread.run(Thread.java:748)
複製代碼