Mysql數據庫監聽binlog

前言

咱們常常須要根據用戶對本身數據的一些操做來作一些事情.java

好比若是用戶刪除了本身的帳號,咱們就給他發短信罵他,去發短信求他回來.mysql

相似於這種功能,固然能夠在業務邏輯層實現,在收到用戶的刪除請求以後執行這一操做,可是數據庫的binlog爲咱們提供了另一種操做方法.git

要監聽binlog,須要兩步,第一步固然是你的mysql須要開啓這一個功能,第二個是要寫程序來對日誌進行讀取.github

mysql開啓binlog.

首先mysql的binlog平常是不打開的,所以咱們須要:sql

  1. 找到mysql的配置文件my.cnf,這個因操做系統不同,位置也不必定同樣,能夠本身找一下,
  2. 在其中加入如下內容:
[mysqld]
server_id = 1
log-bin = mysql-bin
binlog-format = ROW


複製代碼
  1. 以後重啓mysql.
/ ubuntu
service mysql restart
// mac
mysql.server restart
複製代碼
  1. 監測是否開啓成功

進入mysql命令行,執行:shell

show variables like '%log_bin%' ;

複製代碼

若是結果以下圖,則說明成功了:數據庫

2019-04-29-00-31-29

  1. 查看正在寫入的binlog狀態:

2019-04-29-00-32-14

代碼讀取binlog

引入依賴

咱們使用開源的一些實現,這裏由於一些奇怪的緣由,我選用了mysql-binlog-connector-java這個包,(官方github倉庫)[github.com/shyiko/mysq…]具體依賴以下:ubuntu

<!-- https://mvnrepository.com/artifact/com.github.shyiko/mysql-binlog-connector-java -->
    <dependency>
      <groupId>com.github.shyiko</groupId>
      <artifactId>mysql-binlog-connector-java</artifactId>
      <version>0.17.0</version>
    </dependency>
複製代碼

固然,對binlog的處理有不少開源實現,阿里的cancl就是一個,也可使用它.bash

寫個demo

根據官方倉庫中readme裏面,來簡單的寫個demo.數據結構

public static void main(String[] args) {
        BinaryLogClient client = new BinaryLogClient("hostname", 3306, "username", "passwd");
        EventDeserializer eventDeserializer = new EventDeserializer();
        eventDeserializer.setCompatibilityMode(
                EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
                EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
        );
        client.setEventDeserializer(eventDeserializer);
        client.registerEventListener(new BinaryLogClient.EventListener() {

            @Override
            public void onEvent(Event event) {
                // TODO
                dosomething();
                logger.info(event.toString());
            }
        });
        client.connect();
    }
複製代碼

這個徹底是根據官方教程裏面寫的,在onEvent裏面能夠寫本身的業務邏輯,因爲我只是測試,因此我在裏面將每個event都打印了出來.

以後我手動登陸到mysql,分別進行了增長,修改,刪除操做,監聽到的log以下:

00:23:13.331 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=0, eventType=ROTATE, serverId=1, headerLength=19, dataLength=28, nextPosition=0, flags=32}, data=RotateEventData{binlogFilename='mysql-bin.000001', binlogPosition=886}}
00:23:13.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468403000, eventType=FORMAT_DESCRIPTION, serverId=1, headerLength=19, dataLength=100, nextPosition=0, flags=0}, data=FormatDescriptionEventData{binlogVersion=4, serverVersion='5.7.23-0ubuntu0.16.04.1-log', headerLength=19, dataLength=95}}
00:23:23.715 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=951, flags=0}, data=null}
00:23:23.716 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1021, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:23:23.721 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1072, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:23:23.724 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1114, flags=0}, data=WriteRowsEventData{tableId=108, includedColumns={0, 1}, rows=[
    [[B@546a03af, 2]
]}}
00:23:23.725 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1145, flags=0}, data=XidEventData{xid=28}}
00:23:55.872 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1210, flags=0}, data=null}
00:23:55.872 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1280, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:23:55.873 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1331, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:23:55.875 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=EXT_UPDATE_ROWS, serverId=1, headerLength=19, dataLength=31, nextPosition=1381, flags=0}, data=UpdateRowsEventData{tableId=108, includedColumnsBeforeUpdate={0, 1}, includedColumns={0, 1}, rows=[
    {before=[[B@6833ce2c, 1], after=[[B@725bef66, 3]}
]}}
00:23:55.875 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1412, flags=0}, data=XidEventData{xid=41}}
00:24:22.333 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1477, flags=0}, data=null}
00:24:22.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1547, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:24:22.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1598, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:24:22.335 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=EXT_DELETE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1640, flags=0}, data=DeleteRowsEventData{tableId=108, includedColumns={0, 1}, rows=[
    [[B@1888ff2c, 3]
]}}
00:24:22.335 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1671, flags=0}, data=XidEventData{xid=42}}


複製代碼

根據本身的業務,封裝一個更好使,更定製的工具類

開始的時候打算貼代碼的,,,可是代碼越寫越多,索性傳在github上了,這裏只貼部分的實現.代碼傳送門

實現思路

  1. 支持對單個表的監聽,由於咱們不想真的對全部數據庫中的全部數據表進行監聽.
  2. 能夠多線程消費.
  3. 把監聽到的內容轉換成咱們喜聞樂見的形式(文中的數據結構不必定很好,我沒想到更加合適的了).

因此實現思路大體以下:

  1. 封裝個客戶端,對外只提供獲取方法,屏蔽掉初始化的細節代碼.
  2. 提供註冊監聽器(僞)的方法,能夠註冊對某個表的監聽(從新定義一個監聽接口,全部註冊的監聽器實現這個就好).
  3. 真正的監聽器只有客戶端,他將此數據庫實例上的全部操做,所有監聽到並轉換成咱們想要的格式LogItem放進阻塞隊列裏面.
  4. 啓動多個線程,消費阻塞隊列,對某一個LogItem調用對應的數據表的監聽器,作一些業務邏輯.

初始化代碼:

public MysqlBinLogListener(Conf conf) {
        BinaryLogClient client = new BinaryLogClient(conf.host, conf.port, conf.username, conf.passwd);
        EventDeserializer eventDeserializer = new EventDeserializer();
        eventDeserializer.setCompatibilityMode(
                EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
                EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
        );
        client.setEventDeserializer(eventDeserializer);
        this.parseClient = client;
        this.queue = new ArrayBlockingQueue<>(1024);
        this.conf = conf;
        listeners = new ConcurrentHashMap<>();
        dbTableCols = new ConcurrentHashMap<>();
        this.consumer = Executors.newFixedThreadPool(consumerThreads);
    }
複製代碼

註冊代碼:

public void regListener(String db, String table, BinLogListener listener) throws Exception {
        String dbTable = getdbTable(db, table);
        Class.forName("com.mysql.jdbc.Driver");
        // 保存當前註冊的表的colum信息
        Connection connection = DriverManager.getConnection("jdbc:mysql://" + conf.host + ":" + conf.port, conf.username, conf.passwd);
        Map<String, Colum> cols = getColMap(connection, db, table);
        dbTableCols.put(dbTable, cols);

        // 保存當前註冊的listener
        List<BinLogListener> list = listeners.getOrDefault(dbTable, new ArrayList<>());
        list.add(listener);
        listeners.put(dbTable, list);
    }
複製代碼

在這個步驟中,咱們在註冊監聽者的同時,得到了該表的schema信息,並保存到map裏面去,方便後續對數據進行處理.

監聽代碼:

@Override
    public void onEvent(Event event) {
        EventType eventType = event.getHeader().getEventType();

        if (eventType == EventType.TABLE_MAP) {
            TableMapEventData tableData = event.getData();
            String db = tableData.getDatabase();
            String table = tableData.getTable();
            dbTable = getdbTable(db, table);
        }

        // 只處理添加刪除更新三種操做
        if (isWrite(eventType) || isUpdate(eventType) || isDelete(eventType)) {
            if (isWrite(eventType)) {
                WriteRowsEventData data = event.getData();
                for (Serializable[] row : data.getRows()) {
                    if (dbTableCols.containsKey(dbTable)) {
                        LogItem e = LogItem.itemFromInsert(row, dbTableCols.get(dbTable));
                        e.setDbTable(dbTable);
                        queue.add(e);
                    }
                }
            }
        }
    }
複製代碼

我偷懶了,,,這裏面只實現了對添加操做的處理,其餘操做沒有寫.

消費代碼:

public void parse() throws IOException {
        parseClient.registerEventListener(this);

        for (int i = 0; i < consumerThreads; i++) {
            consumer.submit(() -> {
                while (true) {
                    if (queue.size() > 0) {
                        try {
                            LogItem item = queue.take();
                            String dbtable = item.getDbTable();
                            listeners.get(dbtable).forEach(l -> {
                                l.onEvent(item);
                            });

                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    Thread.sleep(1000);
                }
            });
        }
        parseClient.connect();
    }
複製代碼

消費時,從隊列中獲取item,以後獲取對應的一個或者多個監聽者,分別消費這個item.

測試代碼:

public static void main(String[] args) throws Exception {
        Conf conf = new Conf();
        conf.host = "hostname";
        conf.port = 3306;
        conf.username = conf.passwd = "hhsgsb";

        MysqlBinLogListener mysqlBinLogListener = new MysqlBinLogListener(conf);
        mysqlBinLogListener.parseArgsAndRun(args);
        mysqlBinLogListener.regListener("pf", "student", item -> {
            System.out.println(new String((byte[])item.getAfter().get("name")));
            logger.info("insert into {}, value = {}", item.getDbTable(), item.getAfter());
        });
        mysqlBinLogListener.regListener("pf", "teacher", item -> System.out.println("teacher ===="));

        mysqlBinLogListener.parse();
    }
複製代碼

在這段不多的代碼裏,註冊了兩個監聽者,分別監聽studentteacher表,並分別進行打印處理,經測試,在teacher表插入數據時,能夠獨立的運行定義的業務邏輯.

注意:這裏的工具類並不能直接投入使用,由於裏面有許多的異常處理沒有作,且功能僅監聽了插入語句,能夠用來作實現的參考.

參考文章

github.com/shyiko/mysq…

cloud.tencent.com/developer/a…





ChangeLog

2019-04-30 完 2019-05-01 使用Multimap替換Map>

以上皆爲我的所思所得,若有錯誤歡迎評論區指正。

歡迎轉載,煩請署名並保留原文連接。

聯繫郵箱:huyanshi2580@gmail.com

更多學習筆記見我的博客------>呼延十

相關文章
相關標籤/搜索