ErosaConnection java
| mysql
|-------------------------------------- sql
| | 數據庫
MysqlConnection LocalBinLogConnection 服務器
ErosaConnection是一個鏈接的接口,定義了一些通用的方法。目前它有兩個實現類,MysqlConnection是與MySQL服務器鏈接的實現類,LocalBinLogConnection是與本地的binlog文件進行鏈接的實現類。從類中能夠看出,目前canal還不支持oracle的實現。 session
package; import; /** * 通用的Erosa的連接接口, 用於通常化處理mysql/oracle的解析過程 * * @author: yuanzu Date: 12-9-20 Time: 下午2:47 */ public interface ErosaConnection { /** * 創建鏈接 * @throws IOException */ public void connect() throws IOException; /** * 從新創建鏈接,會斷開已有鏈接 * @throws IOException */ public void reconnect() throws IOException; /** * 斷開鏈接。 * @throws IOException */ public void disconnect() throws IOException; /** * 是否創建鏈接。 * @return */ public boolean isConnected(); /** * 用於快速數據查找,和dump的區別在於,seek會只給出部分的數據 * @param binlogfilename biglog文件名 * @param binlogPosition binlog起始位置。 * @param func 事件解析處理器。 */ public void seek(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException; /** * 獲取binlog事件,若是沒有數據會阻塞,等待數據的到達。 * @param binlogfilename biglog文件名 * @param binlogPosition binlog起始位置。 * @param func 事件解析處理器。 */ public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException; /** * 獲取binlog事件,若是沒有數據會阻塞,等待數據的到達。 * @param timestamp 起始時間,只同步該時間以後的產生的新事件。 * @param func 事件解析處理器。 */ public void dump(long timestamp, SinkFunction func) throws IOException; /** * 產生一個新的鏈接。 */ ErosaConnection fork(); }
其中實現類MysqlConnection是咱們常常會使用到的一個類,先看看這個類是如何實現的。 oracle
private MysqlConnector connector; private long slaveId; private Charset charset = Charset.forName("UTF-8"); private BinlogFormat binlogFormat; private BinlogImage binlogImage; public MysqlConnection(){ } public MysqlConnection(InetSocketAddress address, String username, String password){ connector = new MysqlConnector(address, username, password); } public MysqlConnection(InetSocketAddress address, String username, String password, byte charsetNumber, String defaultSchema){ connector = new MysqlConnector(address, username, password, charsetNumber, defaultSchema); }
從代碼能夠看出,它大部分依賴一個MysqlConnector組件來實現與MySQL的鏈接。咱們稍後看看該代碼的實現。 函數
構造函數須要的是MySql服務器的地址,用戶名和密碼,該用戶必須具有了replication slave權限才能夠。slaveId是當前解析器的slaveId,它不能與其它的slaveId衝突。 fetch
public void connect() throws IOException { connector.connect(); } public void reconnect() throws IOException { connector.reconnect(); } public void disconnect() throws IOException { connector.disconnect(); } public boolean isConnected() { return connector.isConnected(); }
基本上都是調用了MysqlConnection的方法實現的,還須要進入該類查看實現。 編碼
public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException { updateSettings(); sendBinlogDump(binlogfilename, binlogPosition); DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize()); fetcher.start(connector.getChannel()); LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT); LogContext context = new LogContext(); while (fetcher.fetch()) { LogEvent event = null; event = decoder.decode(fetcher, context); if (event == null) { throw new CanalParseException("parse failed"); } if (!func.sink(event)) { break; } } }
/** * the settings that will need to be checked or set:<br> * <ol> * <li>wait_timeout</li> * <li>net_write_timeout</li> * <li>net_read_timeout</li> * </ol> * * @param channel * @throws IOException */ private void updateSettings() throws IOException { try { update("set wait_timeout=9999999"); } catch (Exception e) { logger.warn(ExceptionUtils.getFullStackTrace(e)); } try { update("set net_write_timeout=1800"); } catch (Exception e) { logger.warn(ExceptionUtils.getFullStackTrace(e)); } try { update("set net_read_timeout=1800"); } catch (Exception e) { logger.warn(ExceptionUtils.getFullStackTrace(e)); } try { // 設置服務端返回結果時不作編碼轉化,直接按照數據庫的二進制編碼進行發送,由客戶端本身根據需求進行編碼轉化 update("set names 'binary'"); } catch (Exception e) { logger.warn(ExceptionUtils.getFullStackTrace(e)); } try { // mysql5.6針對checksum支持須要設置session變量 // 若是不設置會出現錯誤: Slave can not handle replication events with the // checksum that master is configured to log // 但也不能亂設置,須要和mysql server的checksum配置一致,否則RotateLogEvent會出現亂碼 update("set @master_binlog_checksum= '@@global.binlog_checksum'"); } catch (Exception e) { logger.warn(ExceptionUtils.getFullStackTrace(e)); } try { // mariadb針對特殊的類型,須要設置session變量 update("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'"); } catch (Exception e) { logger.warn(ExceptionUtils.getFullStackTrace(e)); } }
1. 更新MySQL配置信息。調用方法updateSettings();主要包括設置超時時間、設置數據庫直接發送二進制數據,設置master_binlog_checksum和mariadb_slave_capability等變量值。