canal client例子:java
package com.study.canal; import java.net.InetSocketAddress; import java.util.List; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.Header; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException; public class App { public static void main(String[] args) throws InterruptedException { // 第一步:與canal進行鏈接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("xxx.xxx.xxx.xxx", 11111), "example", "", ""); connector.connect(); // 第二步:開啓訂閱 connector.subscribe(); connector.rollback(); // 第三步:循環訂閱 while (true) { try { // 每次讀取 1000 條 Message message = connector.getWithoutAck(1000); long batchID = message.getId(); int size = message.getEntries().size(); if (batchID == -1 || size == 0) { System.out.println("當前暫時沒有數據"); Thread.sleep(1000); // 沒有數據 } else { System.out.println("-------------------------- 有數據啦 -----------------------"); PrintEntry(message.getEntries()); } // position id ack (方便處理下一條) connector.ack(batchID); } catch (Exception e) { e.printStackTrace(); } finally { Thread.sleep(1000); } } } // 獲取每條打印的記錄 @SuppressWarnings("static-access") public static void PrintEntry(List<Entry> entrys) { for (Entry entry : entrys) { // 第一步:拆解entry 實體 Header header = entry.getHeader(); EntryType entryType = entry.getEntryType(); // 第二步: 若是當前是RowData,那就是我須要的數據 if (entryType == EntryType.ROWDATA) { String tableName = header.getTableName(); String schemaName = header.getSchemaName(); RowChange rowChange = null; try { rowChange = RowChange.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } EventType eventType = rowChange.getEventType(); System.out.println(String.format("當前正在操做 %s.%s, Action= %s", schemaName, tableName, eventType)); // 若是是‘查詢’ 或者 是 ‘DDL’ 操做,那麼sql直接打出來 if (eventType == EventType.QUERY || rowChange.getIsDdl()) { System.out.println("rowchange sql ----->" + rowChange.getSql()); return; } // 第三步:追蹤到 columns 級別 rowChange.getRowDatasList().forEach((rowData) -> { // 獲取更新以前的column狀況 List<Column> beforeColumns = rowData.getBeforeColumnsList(); // 獲取更新以後的 column 狀況 List<Column> afterColumns = rowData.getAfterColumnsList(); // 當前執行的是 刪除操做 if (eventType == EventType.DELETE) { PrintColumn(beforeColumns); } // 當前執行的是 插入操做 if (eventType == eventType.INSERT) { PrintColumn(afterColumns); } // 當前執行的是 更新操做 if (eventType == eventType.UPDATE) { PrintColumn(afterColumns); } }); } } } // 每一個row上面的每個column 的更改狀況 public static void PrintColumn(List<Column> columns) { columns.forEach((column) -> { String columnName = column.getName(); String columnValue = column.getValue(); String columnType = column.getMysqlType(); boolean isUpdated = column.getUpdated(); // 判斷 該字段是否更新 System.out.println(String.format("columnName=%s, columnValue=%s, columnType=%s, isUpdated=%s", columnName, columnValue, columnType, isUpdated)); }); } }
pom.xmlmysql
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.2</version> </dependency>
修改數據庫數據發現客戶端沒有反應,可能狀況以下:git
canal官方文檔能夠點擊這裏查看。
按照官網的教程完成配置後會發現,在修改mysql時java客戶端仍是沒有反應。暫時發現有如下兩種緣由:github
1.須要修改canal.properties配置,可是官網沒有講解。(大機率)
進入canal解壓文件 ,編輯conf/canal.properties文件sql
vim conf/canal.properties
有這麼一行數據庫
canal.instance.parser.parallelThreadSize = 16
默認是被註釋掉的,須要打開註釋,而後重啓canalvim
cd bin ./restart.sh
這種是第一次配置時,大機率碰到的狀況。google
2.修改conf/example/instance.properties.net
第一個紅框所有註釋掉,第二個紅框值修改成登陸mysql的帳號和密碼,第三個框註釋掉(註釋掉意味着監聽整個庫),而後重啓canalrest