canal 修改mysql數據後Java客戶端無反應的問題解決方案

canal client例子:java

package com.study.canal;

import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.Header;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;

public class App {
	public static void main(String[] args) throws InterruptedException {

		// 第一步:與canal進行鏈接
		CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("xxx.xxx.xxx.xxx", 11111),
				"example", "", "");
		connector.connect();

		// 第二步:開啓訂閱
		connector.subscribe();

		connector.rollback();

		// 第三步:循環訂閱
		while (true) {
			try {
				// 每次讀取 1000 條
				Message message = connector.getWithoutAck(1000);

				long batchID = message.getId();

				int size = message.getEntries().size();

				if (batchID == -1 || size == 0) {
					System.out.println("當前暫時沒有數據");
					Thread.sleep(1000); // 沒有數據
				} else {
					System.out.println("-------------------------- 有數據啦 -----------------------");
					PrintEntry(message.getEntries());
				}

				// position id ack (方便處理下一條)
				connector.ack(batchID);

			} catch (Exception e) {
				e.printStackTrace();

			} finally {
				Thread.sleep(1000);
			}
		}
	}

	// 獲取每條打印的記錄
	@SuppressWarnings("static-access")
	public static void PrintEntry(List<Entry> entrys) {

		for (Entry entry : entrys) {

			// 第一步:拆解entry 實體
			Header header = entry.getHeader();
			EntryType entryType = entry.getEntryType();

			// 第二步: 若是當前是RowData,那就是我須要的數據
			if (entryType == EntryType.ROWDATA) {

				String tableName = header.getTableName();
				String schemaName = header.getSchemaName();

				RowChange rowChange = null;

				try {
					rowChange = RowChange.parseFrom(entry.getStoreValue());
				} catch (InvalidProtocolBufferException e) {
					e.printStackTrace();
				}

				EventType eventType = rowChange.getEventType();

				System.out.println(String.format("當前正在操做 %s.%s, Action= %s", schemaName, tableName, eventType));

				// 若是是‘查詢’ 或者 是 ‘DDL’ 操做,那麼sql直接打出來
				if (eventType == EventType.QUERY || rowChange.getIsDdl()) {
					System.out.println("rowchange sql ----->" + rowChange.getSql());
					return;
				}

				// 第三步:追蹤到 columns 級別
				rowChange.getRowDatasList().forEach((rowData) -> {

					// 獲取更新以前的column狀況
					List<Column> beforeColumns = rowData.getBeforeColumnsList();

					// 獲取更新以後的 column 狀況
					List<Column> afterColumns = rowData.getAfterColumnsList();

					// 當前執行的是 刪除操做
					if (eventType == EventType.DELETE) {
						PrintColumn(beforeColumns);
					}

					// 當前執行的是 插入操做
					if (eventType == eventType.INSERT) {
						PrintColumn(afterColumns);
					}

					// 當前執行的是 更新操做
					if (eventType == eventType.UPDATE) {
						PrintColumn(afterColumns);
					}
				});
			}
		}
	}

	// 每一個row上面的每個column 的更改狀況
	public static void PrintColumn(List<Column> columns) {

		columns.forEach((column) -> {

			String columnName = column.getName();
			String columnValue = column.getValue();
			String columnType = column.getMysqlType();
			boolean isUpdated = column.getUpdated(); // 判斷 該字段是否更新

			System.out.println(String.format("columnName=%s, columnValue=%s, columnType=%s, isUpdated=%s", columnName,
					columnValue, columnType, isUpdated));

		});
	}
}

pom.xmlmysql

<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.client</artifactId>
  <version>1.1.2</version>
</dependency>

修改數據庫數據發現客戶端沒有反應,可能狀況以下:git

canal官方文檔能夠點擊這裏查看
按照官網的教程完成配置後會發現,在修改mysql時java客戶端仍是沒有反應。暫時發現有如下兩種緣由:github

1.須要修改canal.properties配置,可是官網沒有講解。(大機率)
進入canal解壓文件 ,編輯conf/canal.properties文件sql

vim conf/canal.properties

有這麼一行數據庫

canal.instance.parser.parallelThreadSize = 16

默認是被註釋掉的,須要打開註釋,而後重啓canalvim

cd bin
./restart.sh

這種是第一次配置時,大機率碰到的狀況。google

2.修改conf/example/instance.properties.net

第一個紅框所有註釋掉,第二個紅框值修改成登陸mysql的帳號和密碼,第三個框註釋掉(註釋掉意味着監聽整個庫),而後重啓canalrest

相關文章
相關標籤/搜索