生產者要將發送的數據轉化爲字節數組才能經過網絡發動給Kafka,對於一些簡單的數據,Kafka自帶了一些序列化工具。java
//建立生產者實例 private static Producer<String , String> createProducer(){ Properties properties = new Properties(); properties.put("metadata.broker.list" , GlobalConfigUtil.kafkaBootstrap); properties.put("zookeeper.connect" , GlobalConfigUtil.kafkaZookeeper); properties.put("serializer.class" , StringEncoder.class.getName()); return new Producer<String, String>(new ProducerConfig(properties)); }
在一般的微服務中,服務之間須要頻繁的傳遞各類負責的數據結構,可是kafka僅僅支持簡單的類型如String,Integer。因而咱們在服務之間使用JSONObject,由於JSON能夠很容易的轉化爲String,而String的序列化和反序列化已經被支持。mysql
JSONObject jsonObject = new JSONObject(); jsonObject.put("logFileName", logFileName); jsonObject.put("logFileOffset", logFileOffset); jsonObject.put("dbName", dbName); jsonObject.put("tableName", tableName); jsonObject.put("eventType", eventType); jsonObject.put("columnValueList", columnValueList); jsonObject.put("emptyCount", emptyCount); jsonObject.put("timestamp", timestamp); //拼接全部binlog解析的字段 String data = JSON.toJSONString(jsonObject); // 解析後的數據發送到kafka KafkaSender.sendMessage(GlobalConfigUtil.kafkaInput, JSON.toJSONString(key), data);
ResourceBundle類是用來讀取propertise資源文件的,能夠在初始化時把配置項所有一次讀入,並保存在靜態成員變量中。避免每次須要的時候纔去讀取相關配置文件的class,I/O速度慢,容易形成性能上的瓶頸。git
//讀取application.properties文件 private static ResourceBundle resourceBundle = ResourceBundle.getBundle("application"); public static String canalHost= resourceBundle.getString("canal.host"); public static String canalPort = resourceBundle.getString("canal.port"); public static String canalInstance = resourceBundle.getString("canal.instance"); public static String mysqlUsername = resourceBundle.getString("mysql.username"); public static String mysqlPassword= resourceBundle.getString("mysql.password"); public static String kafkaBootstrap= resourceBundle.getString("kafka.bootstrap.servers"); public static String kafkaZookeeper= resourceBundle.getString("kafka.zookeeper.connect"); public static String kafkaInput = resourceBundle.getString("kafka.input.topic");
完整代碼github
#pom文件 <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.0.24</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.9.0.1</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <!--對象和json 互相轉換的--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.44</version> </dependency>
import java.util.Locale; import java.util.ResourceBundle; /** * 配置文件的公共類 */ public class GlobalConfigUtil { //讀取application.properties文件 private static ResourceBundle resourceBundle = ResourceBundle.getBundle("application"); public static String canalHost= resourceBundle.getString("canal.host"); public static String canalPort = resourceBundle.getString("canal.port"); public static String canalInstance = resourceBundle.getString("canal.instance"); public static String mysqlUsername = resourceBundle.getString("mysql.username"); public static String mysqlPassword= resourceBundle.getString("mysql.password"); public static String kafkaBootstrap= resourceBundle.getString("kafka.bootstrap.servers"); public static String kafkaZookeeper= resourceBundle.getString("kafka.zookeeper.connect"); public static String kafkaInput = resourceBundle.getString("kafka.input.topic"); public static void main(String[] args) { System.out.println(canalHost); } }
import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import kafka.serializer.StringEncoder; import java.util.Properties; /** * Kafka生產消息工具類 */ public class KafkaSender { private String topic; public KafkaSender(String topic){ super(); this.topic = topic; } /** * 發送消息到Kafka指定topic * * @param topic topic名字 * @param key 鍵值 * @param data 數據 */ public static void sendMessage(String topic , String key , String data){ Producer<String, String> producer = createProducer(); producer.send(new KeyedMessage<String , String>(topic , key , data)); } /** * 建立生產者實例 * @return */ private static Producer<String , String> createProducer(){ Properties properties = new Properties(); properties.put("metadata.broker.list" , GlobalConfigUtil.kafkaBootstrap); properties.put("zookeeper.connect" , GlobalConfigUtil.kafkaZookeeper); properties.put("serializer.class" , StringEncoder.class.getName()); return new Producer<String, String>(new ProducerConfig(properties)); } }
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; import java.util.UUID; /** * Canal解析binlog日誌工具類 */ public class CanalClient { static class ColumnValuePair { private String columnName; private String columnValue; private Boolean isValid; public ColumnValuePair(String columnName, String columnValue, Boolean isValid) { this.columnName = columnName; this.columnValue = columnValue; this.isValid = isValid; } public String getColumnName() { return columnName; } public void setColumnName(String columnName) { this.columnName = columnName; } public String getColumnValue() { return columnValue; } public void setColumnValue(String columnValue) { this.columnValue = columnValue; } public Boolean getIsValid() { return isValid; } public void setIsValid(Boolean isValid) { this.isValid = isValid; } } /** * 獲取Canal鏈接 * * @param host 主機名 * @param port 端口號 * @param instance Canal實例名 * @param username 用戶名 * @param password 密碼 * @return Canal鏈接器 */ public static CanalConnector getConn(String host, int port, String instance, String username, String password) { CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password); return canalConnector; } /** * 解析Binlog日誌 * * @param entries Binlog消息實體 * @param emptyCount 操做的序號 */ public static void analysis(List<CanalEntry.Entry> entries, int emptyCount) { for (CanalEntry.Entry entry : entries) { // 只解析mysql事務的操做,其餘的不解析 if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } // 那麼解析binlog CanalEntry.RowChange rowChange = null; try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { e.printStackTrace(); } // 獲取操做類型字段(增長 刪除 修改) CanalEntry.EventType eventType = rowChange.getEventType(); // 獲取binlog文件名稱 String logfileName = entry.getHeader().getLogfileName(); // 讀取當前操做在binlog文件的位置 long logfileOffset = entry.getHeader().getLogfileOffset(); // 獲取當前操做所屬的數據庫 String dbName = entry.getHeader().getSchemaName(); // 獲取當前操做所屬的表 String tableName = entry.getHeader().getTableName();//當前操做的是哪一張表 long timestamp = entry.getHeader().getExecuteTime();//執行時間 // 解析操做的行數據 for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { // 刪除操做 if (eventType == CanalEntry.EventType.DELETE) { // 獲取刪除以前的全部列數據 dataDetails(rowData.getBeforeColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp); } // 新增操做 else if (eventType == CanalEntry.EventType.INSERT) { // 獲取新增以後的全部列數據 dataDetails(rowData.getAfterColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp); } // 更新操做 else { // 獲取更新以後的全部列數據 dataDetails(rowData.getAfterColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp); } } } } /** * 解析具體一條Binlog消息的數據 * * @param columns 當前行全部的列數據 * @param logFileName binlog文件名 * @param logFileOffset 當前操做在binlog中的位置 * @param dbName 當前操做所屬數據庫名稱 * @param tableName 當前操做所屬表名稱 * @param eventType 當前操做類型(新增、修改、刪除) * @param emptyCount 操做的序號 */ private static void dataDetails(List<CanalEntry.Column> columns, String logFileName, Long logFileOffset, String dbName, String tableName, CanalEntry.EventType eventType, int emptyCount, long timestamp) { // 找到當前那些列發生了改變 以及改變的值 List<ColumnValuePair> columnValueList = new ArrayList<ColumnValuePair>(); for (CanalEntry.Column column : columns) { ColumnValuePair columnValuePair = new ColumnValuePair(column.getName(), column.getValue(), column.getUpdated()); columnValueList.add(columnValuePair); } String key = UUID.randomUUID().toString(); JSONObject jsonObject = new JSONObject(); // jsonObject.put("logFileName", logFileName); // jsonObject.put("logFileOffset", logFileOffset); jsonObject.put("dbName", dbName); jsonObject.put("tableName", tableName); jsonObject.put("eventType", eventType); jsonObject.put("columnValueList", columnValueList); // jsonObject.put("emptyCount", emptyCount); // jsonObject.put("timestamp", timestamp); // 拼接全部binlog解析的字段 String data = JSON.toJSONString(jsonObject); System.out.println("【JSON】" + data); // 解析後的數據發送到kafka KafkaSender.sendMessage(GlobalConfigUtil.kafkaInput, JSON.toJSONString(key), data); } /** * 客戶端入口方法 * @param args */ public static void main(String[] args) { // 加載配置文件 String host = GlobalConfigUtil.canalHost; int port = Integer.parseInt(GlobalConfigUtil.canalPort); String instance = GlobalConfigUtil.canalInstance; String username = GlobalConfigUtil.mysqlUsername; String password = GlobalConfigUtil.mysqlPassword; // 獲取Canal鏈接 CanalConnector conn = getConn(host, port, instance, username, password); // 從binlog中讀取數據 int batchSize = 100; int emptyCount = 1; try { conn.connect(); conn.subscribe(".*..*"); conn.rollback(); int totalCount = 120; //循環次數 while (emptyCount < totalCount) { // 獲取數據 Message message = conn.getWithoutAck(batchSize); long id = message.getId(); int size = message.getEntries().size(); if (id == -1 || size == 0) { emptyCount=0; //沒有讀取到任何數據 System.out.println("目前沒有讀取到任何數據..."); } else { //有數據,那麼解析binlog日誌 analysis(message.getEntries(), emptyCount); emptyCount++; } // 確認消息 conn.ack(message.getId()); } } catch (Exception e) { e.printStackTrace(); } finally { conn.disconnect(); } } }
#application.properties, 如下請更改成自已的數據庫信息 canal.host=xxx.xx.xxx.xxx canal.port=11111 canal.instance=example mysql.username=root mysql.password=xxxxxx kafka.bootstrap.servers = xxx.xx.xxx.xxx:9092 kafka.zookeeper.connect = xxx.xx.xxx.xxx:2182 kafka.input.topic=test
具體代碼請移步:SimpleMysqlCanalKafkaSamplesql