本文主要研究一下canal的DirectLogFetcherjava
canal-1.1.4/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/DirectLogFetcher.javamysql
public final class DirectLogFetcher extends LogFetcher { protected static final Log logger = LogFactory.getLog(DirectLogFetcher.class); /** Command to dump binlog */ public static final byte COM_BINLOG_DUMP = 18; /** Packet header sizes */ public static final int NET_HEADER_SIZE = 4; public static final int SQLSTATE_LENGTH = 5; /** Packet offsets */ public static final int PACKET_LEN_OFFSET = 0; public static final int PACKET_SEQ_OFFSET = 3; /** Maximum packet length */ public static final int MAX_PACKET_LENGTH = (256 * 256 * 256 - 1); /** BINLOG_DUMP options */ public static final int BINLOG_DUMP_NON_BLOCK = 1; public static final int BINLOG_SEND_ANNOTATE_ROWS_EVENT = 2; private Connection conn; private OutputStream mysqlOutput; private InputStream mysqlInput; //...... /** * Connect MySQL master to fetch binlog. */ public void open(Connection conn, String fileName, final long filePosition, final int serverId) throws IOException { open(conn, fileName, filePosition, serverId, false); } /** * Connect MySQL master to fetch binlog. */ public void open(Connection conn, String fileName, long filePosition, final int serverId, boolean nonBlocking) throws IOException { try { this.conn = conn; Class<?> connClazz = Class.forName("com.mysql.jdbc.ConnectionImpl"); Object unwrapConn = unwrapConnection(conn, connClazz); if (unwrapConn == null) { throw new IOException("Unable to unwrap " + conn.getClass().getName() + " to com.mysql.jdbc.ConnectionImpl"); } // Get underlying IO streams for network communications. Object connIo = getDeclaredField(unwrapConn, connClazz, "io"); if (connIo == null) { throw new IOException("Get null field:" + conn.getClass().getName() + "#io"); } mysqlOutput = (OutputStream) getDeclaredField(connIo, connIo.getClass(), "mysqlOutput"); mysqlInput = (InputStream) getDeclaredField(connIo, connIo.getClass(), "mysqlInput"); if (filePosition == 0) filePosition = BIN_LOG_HEADER_SIZE; sendBinlogDump(fileName, filePosition, serverId, nonBlocking); position = 0; } catch (IOException e) { close(); /* Do cleanup */ logger.error("Error on COM_BINLOG_DUMP: file = " + fileName + ", position = " + filePosition); throw e; } catch (ClassNotFoundException e) { close(); /* Do cleanup */ throw new IOException("Unable to load com.mysql.jdbc.ConnectionImpl", e); } } public boolean fetch() throws IOException { try { // Fetching packet header from input. if (!fetch0(0, NET_HEADER_SIZE)) { logger.warn("Reached end of input stream while fetching header"); return false; } // Fetching the first packet(may a multi-packet). int netlen = getUint24(PACKET_LEN_OFFSET); int netnum = getUint8(PACKET_SEQ_OFFSET); if (!fetch0(NET_HEADER_SIZE, netlen)) { logger.warn("Reached end of input stream: packet #" + netnum + ", len = " + netlen); return false; } // Detecting error code. final int mark = getUint8(NET_HEADER_SIZE); if (mark != 0) { if (mark == 255) // error from master { // Indicates an error, for example trying to fetch from // wrong // binlog position. position = NET_HEADER_SIZE + 1; final int errno = getInt16(); String sqlstate = forward(1).getFixString(SQLSTATE_LENGTH); String errmsg = getFixString(limit - position); throw new IOException("Received error packet:" + " errno = " + errno + ", sqlstate = " + sqlstate + " errmsg = " + errmsg); } else if (mark == 254) { // Indicates end of stream. It's not clear when this would // be sent. logger.warn("Received EOF packet from server, apparent" + " master disconnected."); return false; } else { // Should not happen. throw new IOException("Unexpected response " + mark + " while fetching binlog: packet #" + netnum + ", len = " + netlen); } } // The first packet is a multi-packet, concatenate the packets. while (netlen == MAX_PACKET_LENGTH) { if (!fetch0(0, NET_HEADER_SIZE)) { logger.warn("Reached end of input stream while fetching header"); return false; } netlen = getUint24(PACKET_LEN_OFFSET); netnum = getUint8(PACKET_SEQ_OFFSET); if (!fetch0(limit, netlen)) { logger.warn("Reached end of input stream: packet #" + netnum + ", len = " + netlen); return false; } } // Preparing buffer variables to decoding. origin = NET_HEADER_SIZE + 1; position = origin; limit -= origin; return true; } catch (SocketTimeoutException e) { close(); /* Do cleanup */ logger.error("Socket timeout expired, closing connection", e); throw e; } catch (InterruptedIOException e) { close(); /* Do cleanup */ logger.warn("I/O interrupted while reading from client socket", e); throw e; } catch (IOException e) { close(); /* Do cleanup */ logger.error("I/O error while reading from client socket", e); throw e; } } private final boolean fetch0(final int off, final int len) throws IOException { ensureCapacity(off + len); for (int count, n = 0; n < len; n += count) { if (0 > (count = mysqlInput.read(buffer, off + n, len - n))) { // Reached end of input stream return false; } } if (limit < off + len) limit = off + len; return true; } public void close() throws IOException { try { if (conn != null) conn.close(); conn = null; mysqlInput = null; mysqlOutput = null; } catch (SQLException e) { logger.warn("Unable to close connection", e); } } //...... }
canal-1.1.4/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/DirectLogFetcher.javagit
public final class DirectLogFetcher extends LogFetcher { //...... protected final void sendBinlogDump(String fileName, final long filePosition, final int serverId, boolean nonBlocking) throws IOException { position = NET_HEADER_SIZE; putByte(COM_BINLOG_DUMP); putInt32(filePosition); int binlog_flags = nonBlocking ? BINLOG_DUMP_NON_BLOCK : 0; binlog_flags |= BINLOG_SEND_ANNOTATE_ROWS_EVENT; putInt16(binlog_flags); // binlog_flags putInt32(serverId); // slave's server-id putString(fileName); final byte[] buf = buffer; final int len = position - NET_HEADER_SIZE; buf[0] = (byte) (len & 0xff); buf[1] = (byte) (len >>> 8); buf[2] = (byte) (len >>> 16); mysqlOutput.write(buffer, 0, position); mysqlOutput.flush(); } //...... }
nonBlocking爲BINLOG_DUMP_NON_BLOCK,不然爲0
)canal-1.1.4/dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/DirectLogFetcherTest.javagithub
public class DirectLogFetcherTest extends BaseLogFetcherTest { @Test public void testSimple() { DirectLogFetcher fecther = new DirectLogFetcher(); try { Class.forName("com.mysql.jdbc.Driver"); Connection connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306", "root", "hello"); Statement statement = connection.createStatement(); statement.execute("SET @master_binlog_checksum='@@global.binlog_checksum'"); statement.execute("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'"); fecther.open(connection, "mysql-bin.000007", 89797036L, 2); LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT); LogContext context = new LogContext(); while (fecther.fetch()) { LogEvent event = decoder.decode(fecther, context); int eventType = event.getHeader().getType(); switch (eventType) { case LogEvent.ROTATE_EVENT: binlogFileName = ((RotateLogEvent) event).getFilename(); break; case LogEvent.WRITE_ROWS_EVENT_V1: case LogEvent.WRITE_ROWS_EVENT: parseRowsEvent((WriteRowsLogEvent) event); break; case LogEvent.UPDATE_ROWS_EVENT_V1: case LogEvent.PARTIAL_UPDATE_ROWS_EVENT: case LogEvent.UPDATE_ROWS_EVENT: parseRowsEvent((UpdateRowsLogEvent) event); break; case LogEvent.DELETE_ROWS_EVENT_V1: case LogEvent.DELETE_ROWS_EVENT: parseRowsEvent((DeleteRowsLogEvent) event); break; case LogEvent.QUERY_EVENT: parseQueryEvent((QueryLogEvent) event); break; case LogEvent.ROWS_QUERY_LOG_EVENT: parseRowsQueryEvent((RowsQueryLogEvent) event); break; case LogEvent.ANNOTATE_ROWS_EVENT: parseAnnotateRowsEvent((AnnotateRowsEvent) event); break; case LogEvent.XID_EVENT: parseXidEvent((XidLogEvent) event); break; default: break; } } } catch (Exception e) { e.printStackTrace(); Assert.fail(e.getMessage()); } finally { try { fecther.close(); } catch (IOException e) { Assert.fail(e.getMessage()); } } } }
DirectLogFetcher繼承了LogFetcher;其open方法先unwrapConnection,而後經過反射獲取mysqlOutput、mysqlInput,而後執行sendBinlogDump;其close方法關閉connection;其fetch方法經過fetch0執行mysqlInput.read,讀取到buffer中,若是尚未讀完則返回true,讀完了返回falsesql