1、經過canal-client同步;java
一、安裝canalServermysql
[root@localhost software]# mkdir canal [root@localhost software]# cd canal/ [root@localhost canal]# rz rz waiting to receive. ?a? zmodem ′??. °′ Ctrl+C ??. Transferring canal.deployer-1.1.5-SNAPSHOT.tar.gz... 100% 51102 KB 8517 KB/s 00:00:06 0 ′? ?[root@localhost canal]# tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz
二、修改配置,啓動redis
#配置好mysql的數據庫地址和賬號密碼 #canal.instance.master.address=127.0.0.1:3306 #canal.instance.dbUsername=root #canal.instance.dbPassword=root [root@localhost canal]# vi conf/example/instance.properties [root@localhost canal]# ./bin/startup.sh
#看到successful表示啓動成功 [root@localhost canal]# cat logs/example/example.log 2020-03-15 22:45:07.762 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties] 2020-03-15 22:45:07.765 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties] 2020-03-15 22:45:07.905 [main] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)] 2020-03-15 22:45:07.934 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties] 2020-03-15 22:45:07.935 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties] 2020-03-15 22:45:08.270 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 2020-03-15 22:45:08.276 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$ 2020-03-15 22:45:08.276 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : 2020-03-15 22:45:08.282 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful.... 2020-03-15 22:45:08.345 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position 2020-03-15 22:45:08.345 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status 2020-03-15 22:45:08.937 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=4,serverId=1,gtid=<null>,timestamp=1584282800000] cost : 582ms , the next step is binlog dump
三、搭建canal-client,引入依賴spring
<dependencies> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> </dependencies>
四、同步代碼sql
import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message; import java.net.InetSocketAddress; import java.util.List; public class CanalClient { public static void main(String args[]) { CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", ""); int batchSize = 100; try { connector.connect(); connector.subscribe("user.users"); connector.rollback(); while (true) { // 獲取指定數量的數據 Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); System.out.println("batchId = " + batchId); System.out.println("size = " + size); if (batchId == -1 || size == 0) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } else { printEntry(message.getEntries()); } // 提交確認 connector.ack(batchId); // connector.rollback(batchId); // 處理失敗, 回滾數據 } } finally { connector.disconnect(); } } private static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { redisDelete(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { redisInsert(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); redisUpdate(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } private static void redisInsert(List<Column> columns) { JSONObject json = new JSONObject(); for (Column column : columns) { json.put(column.getName(), column.getValue()); } if (columns.size() > 0) { RedisUtil.stringSet(columns.get(0).getValue(), json.toJSONString()); } } private static void redisUpdate(List<Column> columns) { JSONObject json = new JSONObject(); for (Column column : columns) { json.put(column.getName(), column.getValue()); } if (columns.size() > 0) { RedisUtil.stringSet(columns.get(0).getValue(), json.toJSONString()); } } private static void redisDelete(List<Column> columns) { JSONObject json = new JSONObject(); for (Column column : columns) { json.put(column.getName(), column.getValue()); } if (columns.size() > 0) { RedisUtil.delKey(columns.get(0).getValue()); } } }
import redis.clients.jedis.Jedis; public class RedisUtil { private static Jedis jedis = null; public static synchronized Jedis getJedis() { if (jedis == null) { jedis = new Jedis("127.0.0.1", 6379); } return jedis; } public static boolean existKey(String key) { return getJedis().exists(key); } public static void delKey(String key) { getJedis().del(key); } public static String stringGet(String key) { return getJedis().get(key); } public static String stringSet(String key, String value) { return getJedis().set(key, value); } public static void hashSet(String key, String field, String value) { getJedis().hset(key, field, value); } }
2、經過MQ方法同步; 數據庫