canal動態監控Mysql,將binlog日誌解析後,把採集到的數據發送到Kafka

生產者要將發送的數據轉化爲字節數組才能經過網絡發動給Kafka,對於一些簡單的數據,Kafka自帶了一些序列化工具。java

//建立生產者實例
private static Producer<String , String> createProducer(){
    Properties properties = new Properties();

    properties.put("metadata.broker.list" , GlobalConfigUtil.kafkaBootstrap);
    properties.put("zookeeper.connect" , GlobalConfigUtil.kafkaZookeeper);
    properties.put("serializer.class" , StringEncoder.class.getName());
    
    return new Producer<String, String>(new ProducerConfig(properties));
}

在一般的微服務中,服務之間須要頻繁的傳遞各類負責的數據結構,可是kafka僅僅支持簡單的類型如String,Integer。因而咱們在服務之間使用JSONObject,由於JSON能夠很容易的轉化爲String,而String的序列化和反序列化已經被支持。mysql

JSONObject jsonObject = new JSONObject();
jsonObject.put("logFileName", logFileName);
jsonObject.put("logFileOffset", logFileOffset);
jsonObject.put("dbName", dbName);
jsonObject.put("tableName", tableName);
jsonObject.put("eventType", eventType);
jsonObject.put("columnValueList", columnValueList);
jsonObject.put("emptyCount", emptyCount);
jsonObject.put("timestamp", timestamp);

//拼接全部binlog解析的字段
String data = JSON.toJSONString(jsonObject);

// 解析後的數據發送到kafka
KafkaSender.sendMessage(GlobalConfigUtil.kafkaInput, JSON.toJSONString(key), data);

ResourceBundle類是用來讀取propertise資源文件的,能夠在初始化時把配置項所有一次讀入,並保存在靜態成員變量中。避免每次須要的時候纔去讀取相關配置文件的class,I/O速度慢,容易形成性能上的瓶頸。git

//讀取application.properties文件
private static ResourceBundle resourceBundle = ResourceBundle.getBundle("application");

public static String canalHost= resourceBundle.getString("canal.host");
public static String canalPort = resourceBundle.getString("canal.port");
public static String canalInstance = resourceBundle.getString("canal.instance");
public static String mysqlUsername = resourceBundle.getString("mysql.username");
public static String mysqlPassword=  resourceBundle.getString("mysql.password");
public static String kafkaBootstrap= resourceBundle.getString("kafka.bootstrap.servers");
public static String kafkaZookeeper= resourceBundle.getString("kafka.zookeeper.connect");
public static String kafkaInput = resourceBundle.getString("kafka.input.topic");

完整代碼github

#pom文件
<dependency>  
    <groupId>com.alibaba.otter</groupId>  
    <artifactId>canal.client</artifactId>  
    <version>1.0.24</version>  
</dependency>  
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->  
<dependency>  
    <groupId>org.apache.kafka</groupId>  
    <artifactId>kafka_2.11</artifactId>  
    <version>0.9.0.1</version>  
    <exclusions>  
        <exclusion>  
            <groupId>org.slf4j</groupId>  
            <artifactId>slf4j-log4j12</artifactId>  
        </exclusion>  
    </exclusions>  
</dependency>  
  
<!--對象和json 互相轉換的-->  
<dependency>  
    <groupId>com.alibaba</groupId>  
    <artifactId>fastjson</artifactId>  
    <version>1.2.44</version>  
</dependency>
import java.util.Locale;  
import java.util.ResourceBundle;  
  
/**  
 * 配置文件的公共類  
 */  
public class GlobalConfigUtil {  
  
    //讀取application.properties文件  
 private static ResourceBundle resourceBundle = ResourceBundle.getBundle("application");  
  
    public static String canalHost= resourceBundle.getString("canal.host");  
    public static String canalPort = resourceBundle.getString("canal.port");  
    public static String canalInstance = resourceBundle.getString("canal.instance");  
    public static String mysqlUsername = resourceBundle.getString("mysql.username");  
    public static String mysqlPassword=  resourceBundle.getString("mysql.password");  
    public static String kafkaBootstrap= resourceBundle.getString("kafka.bootstrap.servers");  
    public static String kafkaZookeeper= resourceBundle.getString("kafka.zookeeper.connect");  
    public static String kafkaInput = resourceBundle.getString("kafka.input.topic");  
  
    public static void main(String[] args) {  
        System.out.println(canalHost);  
    }  
}
import kafka.javaapi.producer.Producer;  
import kafka.producer.KeyedMessage;  
import kafka.producer.ProducerConfig;  
import kafka.serializer.StringEncoder;  
  
import java.util.Properties;  
  
/**  
 * Kafka生產消息工具類  
 */  
public class KafkaSender {  
    private String topic;  
  
    public KafkaSender(String topic){  
        super();  
        this.topic = topic;  
    }  
  
    /**  
 * 發送消息到Kafka指定topic  
 * * @param topic topic名字  
 * @param key 鍵值  
 * @param data 數據  
 */  
 public static void sendMessage(String topic , String key , String data){  
        Producer<String, String> producer = createProducer();  
        producer.send(new KeyedMessage<String , String>(topic , key , data));  
    }  
  
    /**  
 * 建立生產者實例  
 * @return  
 */  
 private static Producer<String , String> createProducer(){  
        Properties properties = new Properties();  
  
        properties.put("metadata.broker.list" , GlobalConfigUtil.kafkaBootstrap);  
        properties.put("zookeeper.connect" , GlobalConfigUtil.kafkaZookeeper);  
        properties.put("serializer.class" , StringEncoder.class.getName());  
  
        return new Producer<String, String>(new ProducerConfig(properties));  
    }  
}
import com.alibaba.fastjson.JSON;  
import com.alibaba.fastjson.JSONObject;  
import com.alibaba.otter.canal.client.CanalConnector;  
import com.alibaba.otter.canal.client.CanalConnectors;  
import com.alibaba.otter.canal.protocol.CanalEntry;  
import com.alibaba.otter.canal.protocol.Message;  
  
import java.net.InetSocketAddress;  
import java.util.ArrayList;  
import java.util.List;  
import java.util.UUID;  
  
/**  
 * Canal解析binlog日誌工具類  
 */  
public class CanalClient {  
  
    static class ColumnValuePair {  
        private String columnName;  
        private String columnValue;  
        private Boolean isValid;  
  
        public ColumnValuePair(String columnName, String columnValue, Boolean isValid) {  
            this.columnName = columnName;  
            this.columnValue = columnValue;  
            this.isValid = isValid;  
        }  
  
        public String getColumnName() { return columnName; }  
        public void setColumnName(String columnName) { this.columnName = columnName; }  
        public String getColumnValue() { return columnValue; }  
        public void setColumnValue(String columnValue) { this.columnValue = columnValue; }  
        public Boolean getIsValid() { return isValid; }  
        public void setIsValid(Boolean isValid) { this.isValid = isValid; }  
    }  
  
    /**  
 * 獲取Canal鏈接  
 *  
 * @param host 主機名  
 * @param port 端口號  
 * @param instance Canal實例名  
 * @param username 用戶名  
 * @param password 密碼  
 * @return Canal鏈接器  
 */  
 public static CanalConnector getConn(String host, int port, String instance, String username, String password) {  
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password);  
        return canalConnector;  
    }  
  
    /**  
 * 解析Binlog日誌  
 *  
 * @param entries Binlog消息實體  
 * @param emptyCount 操做的序號  
 */  
 public static void analysis(List<CanalEntry.Entry> entries, int emptyCount) {  
        for (CanalEntry.Entry entry : entries) {  
            // 只解析mysql事務的操做,其餘的不解析  
 if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||  
                    entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {  
                continue;  
            }  
  
            // 那麼解析binlog  
 CanalEntry.RowChange rowChange = null;  
  
            try {  
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());  
            } catch (Exception e) {  
                e.printStackTrace();  
            }  
  
            // 獲取操做類型字段(增長  刪除  修改)  
 CanalEntry.EventType eventType = rowChange.getEventType();  
            // 獲取binlog文件名稱  
 String logfileName = entry.getHeader().getLogfileName();  
            // 讀取當前操做在binlog文件的位置  
 long logfileOffset = entry.getHeader().getLogfileOffset();  
            // 獲取當前操做所屬的數據庫  
 String dbName = entry.getHeader().getSchemaName();  
            // 獲取當前操做所屬的表  
 String tableName = entry.getHeader().getTableName();//當前操做的是哪一張表  
 long timestamp = entry.getHeader().getExecuteTime();//執行時間  
  
 // 解析操做的行數據  
 for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {  
                // 刪除操做  
 if (eventType == CanalEntry.EventType.DELETE) {  
                    // 獲取刪除以前的全部列數據  
 dataDetails(rowData.getBeforeColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp);  
                }  
                // 新增操做  
 else if (eventType == CanalEntry.EventType.INSERT) {  
                    // 獲取新增以後的全部列數據  
 dataDetails(rowData.getAfterColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp);  
                }  
                // 更新操做  
 else {  
                    // 獲取更新以後的全部列數據  
 dataDetails(rowData.getAfterColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp);  
                }  
            }  
        }  
    }  
  
    /**  
 * 解析具體一條Binlog消息的數據  
 *  
 * @param columns 當前行全部的列數據  
 * @param logFileName binlog文件名  
 * @param logFileOffset 當前操做在binlog中的位置  
 * @param dbName 當前操做所屬數據庫名稱  
 * @param tableName 當前操做所屬表名稱  
 * @param eventType 當前操做類型(新增、修改、刪除)  
 * @param emptyCount 操做的序號  
 */  
 private static void dataDetails(List<CanalEntry.Column> columns,  
                                    String logFileName,  
                                    Long logFileOffset,  
                                    String dbName,  
                                    String tableName,  
                                    CanalEntry.EventType eventType,  
                                    int emptyCount,  
                                    long timestamp) {  
  
        // 找到當前那些列發生了改變  以及改變的值  
 List<ColumnValuePair> columnValueList = new ArrayList<ColumnValuePair>();  
  
        for (CanalEntry.Column column : columns) {  
            ColumnValuePair columnValuePair = new ColumnValuePair(column.getName(), column.getValue(), column.getUpdated());  
            columnValueList.add(columnValuePair);  
        }  
  
        String key = UUID.randomUUID().toString();  
        JSONObject jsonObject = new JSONObject();  
//        jsonObject.put("logFileName", logFileName);  
//        jsonObject.put("logFileOffset", logFileOffset);  
 jsonObject.put("dbName", dbName);  
        jsonObject.put("tableName", tableName);  
        jsonObject.put("eventType", eventType);  
        jsonObject.put("columnValueList", columnValueList);  
//        jsonObject.put("emptyCount", emptyCount);  
//        jsonObject.put("timestamp", timestamp);  
  
  
 // 拼接全部binlog解析的字段  
 String data = JSON.toJSONString(jsonObject);  
  
        System.out.println("【JSON】" + data);  
  
        // 解析後的數據發送到kafka  
 KafkaSender.sendMessage(GlobalConfigUtil.kafkaInput, JSON.toJSONString(key), data);  
    }  
  
    /**  
 * 客戶端入口方法  
 * @param args  
 */  
 public static void main(String[] args) {  
        // 加載配置文件  
 String host = GlobalConfigUtil.canalHost;  
        int port = Integer.parseInt(GlobalConfigUtil.canalPort);  
        String instance = GlobalConfigUtil.canalInstance;  
        String username = GlobalConfigUtil.mysqlUsername;  
        String password = GlobalConfigUtil.mysqlPassword;  
  
        // 獲取Canal鏈接  
 CanalConnector conn = getConn(host, port, instance, username, password);  
  
        // 從binlog中讀取數據  
 int batchSize = 100;  
        int emptyCount = 1;  
  
        try {  
            conn.connect();  
            conn.subscribe(".*..*");  
            conn.rollback();  
  
            int totalCount = 120; //循環次數  
  
 while (emptyCount < totalCount) {  
                // 獲取數據  
 Message message = conn.getWithoutAck(batchSize);  
  
                long id = message.getId();  
                int size = message.getEntries().size();  
                if (id == -1 || size == 0) {  
                    emptyCount=0;  
                    //沒有讀取到任何數據  
 System.out.println("目前沒有讀取到任何數據...");  
                } else {  
                    //有數據,那麼解析binlog日誌  
 analysis(message.getEntries(), emptyCount);  
                    emptyCount++;  
                }  
                // 確認消息  
 conn.ack(message.getId());  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            conn.disconnect();  
        }  
    }  
}
#application.properties, 如下請更改成自已的數據庫信息  
canal.host=xxx.xx.xxx.xxx  
canal.port=11111  
canal.instance=example  
mysql.username=root  
mysql.password=xxxxxx  
kafka.bootstrap.servers = xxx.xx.xxx.xxx:9092  
kafka.zookeeper.connect = xxx.xx.xxx.xxx:2182  
kafka.input.topic=test

具體代碼請移步:SimpleMysqlCanalKafkaSamplesql

相關文章
相關標籤/搜索