本文主要研究一下rocketmq-mysql的EventProcessorjava
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.javamysql
public class EventProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class); private Replicator replicator; private Config config; private DataSource dataSource; private BinlogPositionManager binlogPositionManager; private BlockingQueue<Event> queue = new LinkedBlockingQueue<>(100); private BinaryLogClient binaryLogClient; private EventListener eventListener; private Schema schema; private Map<Long, Table> tableMap = new HashMap<>(); private Transaction transaction; public EventProcessor(Replicator replicator) { this.replicator = replicator; this.config = replicator.getConfig(); } public void start() throws Exception { initDataSource(); binlogPositionManager = new BinlogPositionManager(config, dataSource); binlogPositionManager.initBeginPosition(); schema = new Schema(dataSource); schema.load(); eventListener = new EventListener(queue); binaryLogClient = new BinaryLogClient(config.mysqlAddr, config.mysqlPort, config.mysqlUsername, config.mysqlPassword); binaryLogClient.setBlocking(true); binaryLogClient.setServerId(1001); EventDeserializer eventDeserializer = new EventDeserializer(); eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG, EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY); binaryLogClient.setEventDeserializer(eventDeserializer); binaryLogClient.registerEventListener(eventListener); binaryLogClient.setBinlogFilename(binlogPositionManager.getBinlogFilename()); binaryLogClient.setBinlogPosition(binlogPositionManager.getPosition()); binaryLogClient.connect(3000); LOGGER.info("Started."); doProcess(); } //...... }
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.javagit
public class EventProcessor { //...... private void initDataSource() throws Exception { Map<String, String> map = new HashMap<>(); map.put("driverClassName", "com.mysql.jdbc.Driver"); map.put("url", "jdbc:mysql://" + config.mysqlAddr + ":" + config.mysqlPort + "?useSSL=true&verifyServerCertificate=false"); map.put("username", config.mysqlUsername); map.put("password", config.mysqlPassword); map.put("initialSize", "2"); map.put("maxActive", "2"); map.put("maxWait", "60000"); map.put("timeBetweenEvictionRunsMillis", "60000"); map.put("minEvictableIdleTimeMillis", "300000"); map.put("validationQuery", "SELECT 1 FROM DUAL"); map.put("testWhileIdle", "true"); dataSource = DruidDataSourceFactory.createDataSource(map); } //...... }
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.javagithub
public class EventProcessor { //...... private void doProcess() { while (true) { try { Event event = queue.poll(1000, TimeUnit.MILLISECONDS); if (event == null) { checkConnection(); continue; } switch (event.getHeader().getEventType()) { case TABLE_MAP: processTableMapEvent(event); break; case WRITE_ROWS: case EXT_WRITE_ROWS: processWriteEvent(event); break; case UPDATE_ROWS: case EXT_UPDATE_ROWS: processUpdateEvent(event); break; case DELETE_ROWS: case EXT_DELETE_ROWS: processDeleteEvent(event); break; case QUERY: processQueryEvent(event); break; case XID: processXidEvent(event); break; } } catch (Exception e) { LOGGER.error("Binlog process error.", e); } } } //...... }
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.javasql
public class EventProcessor { //...... private void processTableMapEvent(Event event) { TableMapEventData data = event.getData(); String dbName = data.getDatabase(); String tableName = data.getTable(); Long tableId = data.getTableId(); Table table = schema.getTable(dbName, tableName); tableMap.put(tableId, table); } private void processWriteEvent(Event event) { WriteRowsEventData data = event.getData(); Long tableId = data.getTableId(); List<Serializable[]> list = data.getRows(); for (Serializable[] row : list) { addRow("WRITE", tableId, row); } } private void processUpdateEvent(Event event) { UpdateRowsEventData data = event.getData(); Long tableId = data.getTableId(); List<Map.Entry<Serializable[], Serializable[]>> list = data.getRows(); for (Map.Entry<Serializable[], Serializable[]> entry : list) { addRow("UPDATE", tableId, entry.getValue()); } } private void processDeleteEvent(Event event) { DeleteRowsEventData data = event.getData(); Long tableId = data.getTableId(); List<Serializable[]> list = data.getRows(); for (Serializable[] row : list) { addRow("DELETE", tableId, row); } } private void processQueryEvent(Event event) { QueryEventData data = event.getData(); String sql = data.getSql(); if (createTablePattern.matcher(sql).find()) { schema.reset(); } } private void processXidEvent(Event event) { EventHeaderV4 header = event.getHeader(); XidEventData data = event.getData(); String binlogFilename = binaryLogClient.getBinlogFilename(); Long position = header.getNextPosition(); Long xid = data.getXid(); BinlogPosition binlogPosition = new BinlogPosition(binlogFilename, position); transaction.setNextBinlogPosition(binlogPosition); transaction.setXid(xid); replicator.commit(transaction, true); transaction = new Transaction(config); } private void addRow(String type, Long tableId, Serializable[] row) { if (transaction == null) { transaction = new Transaction(config); } Table t = tableMap.get(tableId); if (t != null) { while (true) { if (transaction.addRow(type, t, row)) { break; } else { transaction.setNextBinlogPosition(replicator.getNextBinlogPosition()); replicator.commit(transaction, false); transaction = new Transaction(config); } } } } //...... }
EventProcessor提供了start方法,該方法首先執行initDataSource;以後建立BinlogPositionManager並執行binlogPositionManager.initBeginPosition();而後建立EventListener及BinaryLogClient並執行binaryLogClient.connect(3000);最後執行doProcess方法apache