一文帶你快速入門Canal,看這篇就夠了!


前言

          咱們在作實時數倉時數據每每都是保存到數據庫中例如MySQL,當有一條數據新增或修改須要立刻將數據同步到kafka中或其餘的數據庫中,這時候咱們須要藉助阿里開源出來的Canal,來實現咱們功能。82828a4bfcd0a438592d87112ea52b0e.jpgjava

1、什麼是Canal

咱們看下官網的描述:node

canal [kə'næl],譯意爲水道/管道/溝渠,主要用途是基於 MySQL 數據庫增量日誌解析,提供增量數據訂閱和消費mysql

根據官網的描述咱們大約能夠理解爲Canal主要是基於MySQL作增量數據同步的例如:將數據實時同步到kafka、HBase、ES等,能夠理解一個數據同步工具96e1e7e56efa33882b9db9c7214550c7.jpggit

2、Canal能幹什麼

  • 數據庫鏡像
  • 數據庫實時備份
  • 索引構建和實時維護(拆分異構索引、倒排索引等)
  • 業務 cache 刷新
  • 帶業務邏輯的增量數據處理

注意: 當前Canal支持的MySQL版本有 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x程序員

3、Canal工做原理

549480d752ef2378093a6b7643b325a4.jpgMySQL slave 工做原理github

  • MySQL master 將數據變動寫入二進制日誌( binary log, 其中記錄叫作二進制日誌事件binary log events,能夠經過 show binlog events 進行查看)
  • MySQL slave 將 master 的 binary log events 拷貝到它的中繼日誌(relay log)
  • MySQL slave 重放 relay log 中事件,將數據變動反映它本身的數據

canal 工做原理面試

  • canal 模擬 MySQL slave 的交互協議,假裝本身爲 MySQL slave ,向 MySQL master 發送dump 協議
  • MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )
  • canal 解析 binary log 對象(原始爲 byte 流)

4、部署Canal

4.1 安裝MySQL

          我以前發過如何部署MySQL我在這就不在寫一遍了,若是你的機器中沒有安裝MySQL那能夠去看這篇—> https://blog.csdn.net/qq_43791724/article/details/108196454spring

開啓MySQL的 binary log 日誌sql

         當咱們在安裝成功MySQL成功後會有一個my.cnf文件須要添加一下內容數據庫

[mysqld]
log-bin=/var/lib/mysql/mysql-bin # 開啓 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 須要定義,不要和 canal 的 slaveId 重複

         注意: 當咱們在開啓了binary log日誌模式後會在咱們log-bin目錄下建立 mysql-bin.* 的文件。當咱們數據庫中的數據發生改變時就會mysql-bin.*文件中生成記錄。

4.2 安裝Canal

去官下載須要的版本 https://github.com/alibaba/canal/releases 我在這裏使用的版本爲:1.0.24

  1. 將下載好的gz包上傳到指定的目錄下
  2. 建立個文件夾
mkdir canal
  1. 解壓gz包
tar -zxvf canal.deployer-1.0.24.tar.gz  -C ../servers/canal/
  1. 配置 canal.properties

common 屬性前四個配置項:

canal.id= 1
canal.ip=
canal.port= 11111
canal.zkServers=

canal.id是canal的編號,在集羣環境下,不一樣canal的id不一樣,注意它和mysql的server_id不一樣。ip這裏不指定,默認爲本機,好比上面是192.168.100.201,端口號是11111。zk用於canal cluster。5. 再看下canal.propertiesdestinations相關的配置:

#################################################
#########       destinations        ############# 
#################################################
canal.destinations = example
canal.conf.dir = ../conf
canal.auto.scan = true
canal.auto.scan.interval = 5
canal.instance.global.mode = spring 
canal.instance.global.lazy = false
canal.instance.global.spring.xml = classpath:spring/file-instance.xml

這裏的canal.destinations = example能夠設置多個,好比example1,example2,則須要建立對應的兩個文件夾,而且每一個文件夾下都有一個instance.properties文件。全局的canal實例管理用spring,這裏的file-instance.xml最終會實例化全部的destinations instances:\

  1. 全局的canal實例管理用spring,這裏的file-instance.xml最終會實例化全部的destinations instances:
<!-- properties -->
<bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">
 <property name="ignoreResourceNotFound" value="true" />
    <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 容許system覆蓋 -->
    <property name="locationNames">
     <list>
         <value>classpath:canal.properties</value>                     <value>classpath:${canal.instance.destination:}/instance.properties</value>
         </list>
    </property>
</bean>

<bean id="socketAddressEditor" class="com.alibaba.otter.canal.instance.spring.support.SocketAddressEditor" />
<bean class="org.springframework.beans.factory.config.CustomEditorConfigurer"
   <property name="propertyEditorRegistrars">
    <list>
      <ref bean="socketAddressEditor" />
       </list>
   </property>
</bean>
<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
 <property name="destination" value="${canal.instance.destination}" />
    <property name="eventParser">
     <ref local="eventParser" />
    </property>
    <property name="eventSink">
        <ref local="eventSink" />
    </property>
    <property name="eventStore">
        <ref local="eventStore" />
    </property>
    <property name="metaManager">
        <ref local="metaManager" />
    </property>
    <property name="alarmHandler">
        <ref local="alarmHandler" />
    </property>
</bean>

好比canal.instance.destination等於example,就會加載example/instance.properties配置文件 7. 修改instance 配置文件

## mysql serverId,這裏的slaveId不能和myql集羣中已有的server_id同樣
canal.instance.mysql.slaveId = 1234

#  按需修改爲本身的數據庫信息
#################################################
...
canal.instance.master.address=192.168.1.120:3306
# username/password,數據庫的用戶名和密碼
...
canal.instance.dbUsername = root
canal.instance.dbPassword = 123456
#################################################
  1. 啓動
sh bin/startup.sh
  1. 關閉
sh bin/stop.sh
  1. 經過jps 查詢服務狀態
[root@node01 ~]# jps
2133 CanalLauncher
4184 Jps

到這裏說明咱們的服務就配好了,這時候咱們可使用java代碼建立一個客戶端來進行測試

5、經過Java編寫Canal客戶端

5.1 導入依賴

 <dependencies>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.0.24</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
        </dependency>
    </dependencies>

5.2 編寫測試類

package com.canal.Test;

/**
 * @author 大數據老哥
 * @version V1.0
 * @Package com.canal.Test
 * @File :CanalTest.java
 * @date 2021/1/11 21:54
 */


import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import java.net.InetSocketAddress;
import java.util.List;

/**
 * 測試canal配置是否成功
 */

public class CanalTest {

    public static void main(String[] args) {
        //1.建立鏈接
        CanalConnector connect = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.100.201"11111),
                "example""""");        //指定一次性讀取的條數
        int bachChSize = 1000;
        // 設置轉態
        boolean running = true;
        while (running) {
            //2.創建鏈接
            connect.connect();
            //回滾上次請求的信息放置防止數據丟失
            connect.rollback();
            // 訂閱匹配日誌
            connect.subscribe();
            while (running) {
                Message message = connect.getWithoutAck(bachChSize);
                // 獲取batchId
                long batchId = message.getId();
                // 獲取binlog數據的條數
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {

                } else {
                    printSummary(message);
                }
                // 確認指定的batchId已經消費成功
                connect.ack(batchId);
            }
        }
    }

    private static void printSummary(Message message) {
        // 遍歷整個batch中的每一個binlog實體
        for (CanalEntry.Entry entry : message.getEntries()) {
            // 事務開始
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            // 獲取binlog文件名
            String logfileName = entry.getHeader().getLogfileName();
            // 獲取logfile的偏移量
            long logfileOffset = entry.getHeader().getLogfileOffset();
            // 獲取sql語句執行時間戳
            long executeTime = entry.getHeader().getExecuteTime();
            // 獲取數據庫名
            String schemaName = entry.getHeader().getSchemaName();
            // 獲取表名
            String tableName = entry.getHeader().getTableName();
            // 獲取事件類型 insert/update/delete
            String eventTypeName = entry.getHeader().getEventType().toString().toLowerCase();

            System.out.println("logfileName" + ":" + logfileName);
            System.out.println("logfileOffset" + ":" + logfileOffset);
            System.out.println("executeTime" + ":" + executeTime);
            System.out.println("schemaName" + ":" + schemaName);
            System.out.println("tableName" + ":" + tableName);
            System.out.println("eventTypeName" + ":" + eventTypeName);

            CanalEntry.RowChange rowChange = null;
            try {
                // 獲取存儲數據,並將二進制字節數據解析爲RowChange實體
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }

            // 迭代每一條變動數據
            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                // 判斷是否爲刪除事件
                if (entry.getHeader().getEventType() == CanalEntry.EventType.DELETE) {
                    System.out.println("---delete---");
                    printColumnList(rowData.getBeforeColumnsList());
                    System.out.println("---");
                }
                // 判斷是否爲更新事件
                else if (entry.getHeader().getEventType() == CanalEntry.EventType.UPDATE) {
                    System.out.println("---update---");
                    printColumnList(rowData.getBeforeColumnsList());
                    System.out.println("---");
                    printColumnList(rowData.getAfterColumnsList());
                }
                // 判斷是否爲插入事件
                else if (entry.getHeader().getEventType() == CanalEntry.EventType.INSERT) {
                    System.out.println("---insert---");
                    printColumnList(rowData.getAfterColumnsList());
                    System.out.println("---");
                }
            }
        }
    }
    // 打印全部列名和列值
    private static void printColumnList(List<CanalEntry.Column> columnList) {
        for (CanalEntry.Column column : columnList) {
            System.out.println(column.getName() + "\t" + column.getValue());
        }
    }
}

5.3 啓動測試

         在數據庫中隨便修改一條數據看看能不能使用Canal客戶端能不能消費到1da235fe27a0439a7220bf6929eaa63e.jpg

小結

          今天給你們分享了Canle它的主要的功能作增量數據同步,後面會使用Canle進行作實時數倉。我在這裏爲你們提供大數據的資源須要的朋友能夠去下面GitHub去下載,信本身,努力和汗水總會能獲得回報的。我是大數據老哥,咱們下期見~~~

資源獲取 獲取Flink面試題,Spark面試題,程序員必備軟件,hive面試題,Hadoop面試題,Docker面試題,簡歷模板等資源請去 GitHub自行下載 https://github.com/lhh2002/Framework-Of-BigData Gitee 自行下載  https://gitee.com/li_hey_hey/dashboard/projects

相關文章
相關標籤/搜索