使用canal經過mysql的binlog日誌對mysql進行監控(同步服務)

1、環境介紹mysql

canal是阿里開源的中間件,主要用於同步mysql數據庫變動。具體參見:https://github.com/alibaba/canal/releaseslinux

搭建環境:git

vmware centos7 部署mysql和canalgithub

windows開發canal client,自動捕獲mysql數據庫變動web

2、Centos安裝Mysqlsql

一、嘗試用yum安裝mysql數據庫

wget http://repo.mysql.com/mysql57-community-release-el7-10.noarch.rpm
sudo rpm -Uvh mysql57-community-release-el7-10.noarch.rpm
sudo yum install -y mysql-community-server

若是執行順利,安裝mysql server 成功。windows

2.改用阿里源安裝centos

但是官方的yum源在國內訪問效果不佳,我下載mysql server的速度太慢了,決定改用阿里源緩存

#下載wget
yum install wget -y
#備份當前的yum源
mv /etc/yum.repos.d /etc/yum.repos.d.backup4comex
#新建空的yum源設置目錄
mkdir /etc/yum.repos.d
#下載阿里雲的yum源配置
wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
#最後重建緩存
yum clean all
yum makecache

3.安裝MariaDB

MariaDB數據庫管理系統是MySQL的一個分支,主要由開源社區在維護,採用GPL受權許可。開發這個分支的緣由之一是:甲骨文公司收購了MySQL後,有將MySQL閉源的潛在風險,所以社區採用分支的方式來避開這個風險。MariaDB的目的是徹底兼容MySQL,包括API和命令行,使之能輕鬆成爲MySQL的代替品。

安裝mariadb,大小59 M。

[root@yl-web yl]# yum install mariadb-server mariadb

其它幾條經常使用的mariadb命令:

systemctl start mariadb #啓動MariaDB

systemctl stop mariadb #中止MariaDB

systemctl restart mariadb #重啓MariaDB

systemctl enable mariadb #設置開機啓動

運行systemctl start mariadb,而後就能夠正常使用mysql了

4.設置數據庫密碼:

set password for 'root'@'localhost' =password('root');

5.遇到的幾個問題

①從windows訪問centos mysql失敗

解決方案:設置mysql容許遠程鏈接

mysql -u root;
//賦予任何主機訪問數據的權限
mysql>GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' WITH GRANT OPTION;
//使修改生效
mysql>FLUSH PRIVILEGES;

②進行上述操做以後,發現仍然鏈接失敗,返回錯誤

Can't connect to MySQL server on '10.168.12.43' (10060)

解決方案:從windows鏈接vmware裏面的mysql失敗,關閉windows防火牆後成功。

3、部署canal server

(參考:https://github.com/alibaba/canal/wiki/QuickStart)

1.下載canal server

https://github.com/alibaba/canal/releases

使用canal經過mysql的binlog日誌對mysql進行監控(同步服務)

 

我下載的是canal.exaple-1.0.24.gar.gz,下載完成後解壓縮:

mkdir /tmp/canal
tar zxvf canal.deployer-1.0.24.tar.gz -C /tmp/canal

2.查看binlog相關數據庫命令:

使用canal經過mysql的binlog日誌對mysql進行監控(同步服務)

 

是否啓用了日誌
mysql>show variables like 'log_bin';
怎樣知道當前的日誌
mysql> show master status;
查看mysql binlog模式
show variables like 'binlog_format';
獲取binlog文件列表
show binary logs;
查看當前正在寫入的binlog文件
show master status\G
查看指定binlog文件的內容
show binlog events in 'mysql-bin.000002';

3.開啓binlog

若是log_bin關閉,須要在etc下面找到my.cnf,開啓binlog:

server-id=1
log-bin=/var/lib/mysql/mysql-bin

而後重啓mysql服務。

4.添加canal mysql數據庫帳號

CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

5.配置canal實例,設置本地數據庫信息

vi conf/example/instance.properties

## mysql serverId
canal.instance.mysql.slaveId = 1234
# position info
canal.instance.master.address = 10.168.12.43:3306
canal.instance.master.journal.name =mysql-bin.000003
canal.instance.master.position =
canal.instance.master.timestamp =
……
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =testcanal
canal.instance.connectionCharset = UTF-8
# table regex
canal.instance.filter.regex = .*\\..*

6.啓動canal

sh bin/startup.sh

7.查看日誌

vi logs/canal/canal.log
vi logs/example/example.log

4、canal lient demo

1.引入pom依賴

<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.24</version>
</dependency>

2.客戶端代碼

public class ClientTest {
 public static void main(String args[]) {
 // 建立連接
 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("10.168.12.43",
 11111), "example", "", "");
 int batchSize = 1000;
 int emptyCount = 0;
 try {
 connector.connect();
 connector.subscribe(".*\\..*");
 connector.rollback();
 int totalEmptyCount = 120;
 while (emptyCount < totalEmptyCount) {
 Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的數據
 long batchId = message.getId();
 int size = message.getEntries().size();
 if (batchId == -1 || size == 0) {
 emptyCount++;
 System.out.println("empty count : " + emptyCount);
 try {
 Thread.sleep(1000);
 } catch (InterruptedException e) {
 }
 } else {
 emptyCount = 0;
 printEntry(message.getEntries());
 }
 connector.ack(batchId); // 提交確認
 }
 System.out.println("empty too many times, exit");
 } finally {
 connector.disconnect();
 }
 }
 private static void printEntry(List<Entry> entrys) {
 for (Entry entry : entrys) {
 if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
 continue;
 }
 RowChange rowChage = null;
 try {
 rowChage = RowChange.parseFrom(entry.getStoreValue());
 } catch (Exception e) {
 throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
 e);
 }
 EventType eventType = rowChage.getEventType();
 System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
 entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
 entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
 eventType));
 for (RowData rowData : rowChage.getRowDatasList()) {
 if (eventType == EventType.DELETE) {
 printColumn(rowData.getBeforeColumnsList());
 } else if (eventType == EventType.INSERT) {
 printColumn(rowData.getAfterColumnsList());
 } else {
 System.out.println("-------> before");
 printColumn(rowData.getBeforeColumnsList());
 System.out.println("-------> after");
 printColumn(rowData.getAfterColumnsList());
 }
 }
 }
 }
 private static void printColumn(List<Column> columns) {
 for (Column column : columns) {
 System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
 }
 }
}

3.創建數據庫鏈接,進行insert,delete等數據庫操做

使用canal經過mysql的binlog日誌對mysql進行監控(同步服務)

 

使用canal經過mysql的binlog日誌對mysql進行監控(同步服務)

 

5、遇到的問題

1.canal創建鏈接失敗

解決方案:用telnet命令測試創建鏈接仍然失敗,關閉linux防火牆。

systemctl stop firewalld.service

其餘centos7防火牆相關命令:

firewall-cmd --list-ports#查看已經開放的端口:

firewall-cmd --reload #重啓firewall

systemctl stop firewalld.service #中止firewall

systemctl disable firewalld.service #禁止firewall開機啓動

firewall-cmd --state #查看默認防火牆狀態(關閉後顯示notrunning,開啓後顯示running)

相關文章
相關標籤/搜索