基於 Docker 結合 Canal 實現 MySQL 實時增量數據傳輸

著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。

canal的介紹

canal的歷史由來

在早期的時候,阿里巴巴公司由於杭州和美國兩個地方的機房都部署了數據庫實例,但由於跨機房同步數據的業務需求 ,便孕育而生出了canal,主要是基於trigger(觸發器)的方式獲取增量變動。從 2010 年開始,阿里巴巴公司開始逐步嘗試數據庫日誌解析,獲取增量變動的數據進行同步,由此衍生出了增量訂閱和消費業務。html

當前的 canal 支持的數據源端Mysql版本包括( 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x)java

canal的應用場景

目前廣泛基於日誌增量訂閱和消費的業務,主要包括node

  • 基於數據庫增量日誌解析,提供增量數據訂閱和消費
  • 數據庫鏡像
  • 數據庫實時備份
  • 索引構建和實時維護(拆分異構索引、倒排索引等)
  • 業務 cache 刷新
  • 帶業務邏輯的增量數據處理

canal的工做原理

在介紹canal的原理以前,咱們先來了解下MySQL主從複製的原理mysql

MySQL主從複製原理 git

  • MySQL master 將數據變動的操做寫入二進制日誌binary log中, 其中記錄的內容叫作二進制日誌事件binary log events,能夠經過show binlog events命令進行查看
  • MySQL slave 會將 master 的binary log中的binary log events 拷貝到它的中繼日誌relay log
  • MySQL slave 重讀並執行relay log 中的事件,將數據變動映射到它本身的數據庫表中

瞭解了MySQL的工做原理,咱們能夠大體猜測到Canal應該也是採用相似的邏輯去實現增量數據訂閱的功能,那麼接下來咱們看看實際上Canal的工做原理是怎樣的?github

canal工做原理 spring

  • canal 模擬 MySQL slave 的交互協議,假裝本身爲 MySQL slave ,向MySQL master 發送dump 協議
  • MySQL master 收到 dump 請求,開始推送 binary log 給 slave (也就是 canal )
  • canal 解析 binary log 對象(數據爲byte流)

基於這樣的原理與方式,即可以完成數據庫增量日誌的獲取解析,提供增量數據訂閱和消費,實現mysql實時增量數據傳輸的功能。sql

既然canal是這樣的一個框架,又是純Java語言編寫而成,那麼咱們接下來就開始學習怎麼使用它並把它用到咱們的實際工做中。docker

canal的Docker環境準備

由於目前容器化技術的火熱,本文經過使用Docker來快速搭建開發環境,而傳統方式的環境搭建,在咱們學會了Docker容器環境搭建後,也能自行依葫蘆畫瓢搭建成功。因爲本篇主要講解canal,因此關於Docker的內容不會涉及太多,主要會介紹Docker的基本概念和命令使用。數據庫

什麼是Docker

相信絕大多數人都使用過虛擬機Vmware,在使用Vmware進行環境搭建的時候,只需提供了一個普通的系統鏡像併成功安裝,剩下的軟件環境與應用配置仍是如咱們在本機操做同樣在虛擬機裏也操做一遍,並且Vmware佔用宿主機的資源較多,容易形成宿主機卡頓,並且系統鏡像自己也佔用過多空間。

爲了便於你們快速理解Docker,便與Vmware作對比來作介紹,docker 提供了一個開始,打包,運行app的平臺,把app(應用)和底層infrastructure(基礎設施)隔離開來。Docker中最主要的兩個概念就是鏡像(相似Vmware的系統鏡像)與容器(相似Vmware裏安裝的系統)

什麼是Image(鏡像)

  • 文件和meta data的集合(root filesystem)
  • 分層的,而且每一層均可以添加改變刪除文件,成爲一個新的image
  • 不一樣的image能夠共享相同的layer
  • Image自己是read-only的

什麼是Container(容器)

  • 經過Image建立(copy)
  • 在Image layer 之上創建一個container layer(可讀寫)
  • 類比面向對象:類和實例
  • Image負責app的存儲和分發,Container負責運行app

Docker的網絡介紹

Docker的網絡類型有三種:
bridge:橋接網絡

默認狀況下啓動的Docker容器,都是使用 bridge,Docker安裝時建立的橋接網絡,
每次Docker容器重啓時,會按照順序獲取對應的IP地址,
這個就致使重啓下,Docker的IP地址就變了
複製代碼

none:無指定網絡

使用 --network=none ,docker 容器就不會分配局域網的IP
複製代碼

host:主機網絡

使用 --network=host,此時,Docker 容器的網絡會附屬在主機上,二者是互通的。
例如,在容器中運行一個Web服務,監聽8080端口,則主機的8080端口就會自動映射到容器中。
複製代碼

建立自定義網絡:(設置固定IP)

docker network create --subnet=172.18.0.0/16 mynetwork
複製代碼

查看存在的網絡類型docker network ls

搭建canal環境

附上Docker的下載安裝地址==> Docker Download

下載canal鏡像docker pull canal/canal-server

下載mysql鏡像 docker pull mysql,下載過的則以下圖
查看已經下載好的鏡像 docker images

接下來經過鏡像生成mysql容器與canal-server容器

##生成mysql容器
docker run -d --name mysql --net mynetwork --ip 172.18.0.6 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=root mysql
##生成canal-server容器
docker run -d --name canal-server --net mynetwork --ip 172.18.0.4 -p 11111:11111 canal/canal-server

## 命令介紹
--net mynetwork #使用自定義網絡
--ip   #指定分配ip
複製代碼

查看docker中運行的容器docker ps

MySQL的配置修改

以上只是初步準備好了基礎的環境,可是怎麼讓canal假裝成salve並正確獲取mysql中的binary log呢?

對於自建 MySQL , 須要先開啓 Binlog 寫入功能,配置 binlog-format 爲 ROW 模式,經過修改mysql配置文件來開啓bin_log,使用 find / -name my.cnf 查找my.cnf, 修改文件內容以下

[mysqld]
log-bin=mysql-bin # 開啓 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 須要定義,不要和 canal 的 slaveId 重複
複製代碼

進入mysql容器docker exec -it mysql bash
建立連接MySQL的帳號canal並授予做爲 MySQL slave 的權限, 若是已有帳戶可直接 GRANT

mysql -uroot -proot
# 建立帳號
CREATE USER canal IDENTIFIED BY 'canal'; 
# 授予權限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
# 刷新並應用
FLUSH PRIVILEGES;
複製代碼

數據庫重啓後, 簡單測試 my.cnf 配置是否生效

show variables like 'log_bin';
show variables like 'log_bin';
show master status;
複製代碼

canal-server的配置修改

進入canal-server容器docker exec -it canal-server bash
編輯canal-server的配置 vi canal-server/conf/example/instance.properties

更多配置請參考==> canal配置說明
重啓canal-server容器 docker restart canal-server 進入容器查看啓動日誌

docker exec -it canal-server bash
tail -100f canal-server/logs/example/example.log
複製代碼

至此,咱們的環境工做準備完成!!!

拉取數據並同步保存到ElasticSearch

本文的ElasticSearch也是基於Docker環境搭建,因此讀者可執行以下命令

# 下載對鏡像
docker pull elasticsearch:7.1.1
docker pull mobz/elasticsearch-head:5-alpine
# 建立容器並運行
docker run -d --name elasticsearch --net mynetwork --ip 172.18.0.2 -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.1.1

docker run -d --name elasticsearch-head --net mynetwork --ip 172.18.0.5 -p 9100:9100 mobz/elasticsearch-head:5-alpine
複製代碼

環境已經準備好了,如今就要開始咱們的編碼實戰部分了,怎麼經過應用程序去獲取canal解析後的binlog數據。首先咱們基於spring boot搭建一個canal demo應用。結構以下圖所示

Student.java

package com.example.canal.study.pojo;

import lombok.Data;

import java.io.Serializable;

/**
 * 普通的實體domain對象
 * @Data 用戶生產getter、setter方法
  */
@Data
public class Student implements Serializable {
    private String id;
    private String name;
    private int age;
    private String sex;
    private String city;
}

複製代碼

CanalConfig.java

package com.example.canal.study.common;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.net.InetSocketAddress;

/**
 * 配置一些跟canal相關到配置與公共bean
 * @author haha
 */
@Configuration
public class CanalConfig {
    // @Value 獲取 application.properties配置中端內容
    @Value("${canal.server.ip}")
    private String canalIp;
    @Value("${canal.server.port}")
    private Integer canalPort;
    @Value("${canal.destination}")
    private String destination;
    @Value("${elasticSearch.server.ip}")
    private String elasticSearchIp;
    @Value("${elasticSearch.server.port}")
    private Integer elasticSearchPort;
    @Value("${zookeeper.server.ip}")
    private String zkServerIp;

    /**
     * 獲取簡單canal-server鏈接
      */
    @Bean
    public CanalConnector canalSimpleConnector() {
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalIp, canalPort), destination, "", "");
        return canalConnector;
    }
    /**
     * 經過鏈接zookeeper獲取canal-server鏈接
      */
    @Bean
    public CanalConnector canalHaConnector() {
        CanalConnector canalConnector = CanalConnectors.newClusterConnector(zkServerIp, destination, "", "");
        return canalConnector;
    }

    /**
     * elasticsearch 7.x客戶端
      */
    @Bean
    public RestHighLevelClient restHighLevelClient() {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(new HttpHost(elasticSearchIp, elasticSearchPort))
        );
        return client;
    }
}
複製代碼

CanalDataParser.java
因爲這個類的代碼較多,文中則摘出其中比較重要的部分,其它部分代碼可從github上獲取

/**
     * 元祖類型的對象定義
     * @param <A>
     * @param <B>
     */
    public static class TwoTuple<A, B> {
        public final A eventType;
        public final B columnMap;

        public TwoTuple(A a, B b) {
            eventType = a;
            columnMap = b;
        }
    }

    /**
     * 解析canal中的message對象內容
     * @param entrys
     * @return
     */
    public static List<TwoTuple<EventType, Map>> printEntry(List<Entry> entrys) {
        List<TwoTuple<EventType, Map>> rows = new ArrayList<>();

        for (Entry entry : entrys) {
            // binlog event的事件事件
            long executeTime = entry.getHeader().getExecuteTime();
            // 當前應用獲取到該binlog鎖延遲的時間
            long delayTime = System.currentTimeMillis() - executeTime;

            Date date = new Date(entry.getHeader().getExecuteTime());
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            // 當前的entry(binary log event)的條目類型屬於事務
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
                    TransactionBegin begin = null;
                    try {
                        begin = TransactionBegin.parseFrom(entry.getStoreValue());
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                    }
                    // 打印事務頭信息,執行的線程id,事務耗時
                    logger.info(transaction_format,
                            new Object[]{entry.getHeader().getLogfileName(),
                                    String.valueOf(entry.getHeader().getLogfileOffset()),
                                    String.valueOf(entry.getHeader().getExecuteTime()),
                                    simpleDateFormat.format(date),
                                    entry.getHeader().getGtid(),
                                    String.valueOf(delayTime)});
                    logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId());
                    printXAInfo(begin.getPropsList());
                } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
                    TransactionEnd end = null;
                    try {
                        end = TransactionEnd.parseFrom(entry.getStoreValue());
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                    }
                    // 打印事務提交信息,事務id
                    logger.info("----------------\n");
                    logger.info(" END ----> transaction id: {}", end.getTransactionId());
                    printXAInfo(end.getPropsList());
                    logger.info(transaction_format,
                            new Object[]{entry.getHeader().getLogfileName(),
                                    String.valueOf(entry.getHeader().getLogfileOffset()),
                                    String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
                                    entry.getHeader().getGtid(), String.valueOf(delayTime)});
                }

                continue;
            }
            // 當前entry(binary log event)的條目類型屬於原始數據
            if (entry.getEntryType() == EntryType.ROWDATA) {
                RowChange rowChage = null;
                try {
                    // 獲取儲存的內容
                    rowChage = RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                }
                // 獲取當前內容的事件類型
                EventType eventType = rowChage.getEventType();

                logger.info(row_format,
                        new Object[]{entry.getHeader().getLogfileName(),
                                String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
                                entry.getHeader().getTableName(), eventType,
                                String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
                                entry.getHeader().getGtid(), String.valueOf(delayTime)});
                // 事件類型是query或數據定義語言DDL直接打印sql語句,跳出繼續下一次循環
                if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
                    logger.info(" sql ----> " + rowChage.getSql() + SEP);
                    continue;
                }
                printXAInfo(rowChage.getPropsList());
                // 循環當前內容條目的具體數據
                for (RowData rowData : rowChage.getRowDatasList()) {
                    List<CanalEntry.Column> columns;
                    // 事件類型是delete返回刪除前的列內容,不然返回改變後列的內容
                    if (eventType == CanalEntry.EventType.DELETE) {
                        columns = rowData.getBeforeColumnsList();
                    } else {
                        columns = rowData.getAfterColumnsList();
                    }
                    HashMap<String, Object> map = new HashMap<>(16);
                    // 循環把列的name與value放入map中
                    for (Column column: columns){
                        map.put(column.getName(), column.getValue());
                    }
                    rows.add(new TwoTuple<>(eventType, map));
                }
            }
        }
        return rows;
    }
複製代碼

ElasticUtils.java

package com.example.canal.study.common;

import com.alibaba.fastjson.JSON;
import com.example.canal.study.pojo.Student;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.xcontent.XContentType;
import java.util.Map;

/**
 * es的crud工具類
 * @author haha
 */
@Slf4j
@Component
public class ElasticUtils {
    @Autowired
    private  RestHighLevelClient restHighLevelClient;

    /**
     * 新增
     * @param student
     * @param index 索引
     */
    public  void saveEs(Student student, String index) {
        IndexRequest indexRequest = new IndexRequest(index)
                .id(student.getId())
                .source(JSON.toJSONString(student), XContentType.JSON)
                .opType(DocWriteRequest.OpType.CREATE);

        try {
            IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
            log.info("保存數據至ElasticSearch成功:{}", response.getId());
        } catch (Exception e) {
            log.error("保存數據至elasticSearch失敗: {}", e);
        }
    }

    /**
     * 查看
     * @param index 索引
     * @param id _id
     * @throws Exception
     */
    public  void getEs(String index, String id) {
        GetRequest getRequest = new GetRequest(index, id);
        GetResponse response = null;
        try {
            response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
            Map<String, Object> fields = response.getSource();
            for (Map.Entry<String, Object> entry : fields.entrySet()) {
                System.out.println(entry.getKey() + ":" + entry.getValue());
            }
        } catch (Exception e) {
            log.error("從elasticSearch獲取數據失敗: {}", e);
        }
    }

    /**
     * 更新
     * @param student
     * @param index 索引
     * @throws Exception
     */
    public  void updateEs(Student student, String index)  {
        UpdateRequest updateRequest = new UpdateRequest(index, student.getId());
        updateRequest.doc(JSON.toJSONString(student), XContentType.JSON);
        UpdateResponse response = null;
        try {
            response = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
            log.info("更新數據至ElasticSearch成功:{}", response.getId());
        } catch (Exception e) {
            log.error("更新數據至elasticSearch失敗: {}", e);
        }
    }

    /**
     * 根據id刪除數據
     * @param index 索引
     * @param id _id
     * @throws Exception
     */
    public  void DeleteEs(String index, String id) {
        DeleteRequest deleteRequest = new DeleteRequest(index, id);
        DeleteResponse response = null;
        try {
            response = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
            log.info("從elasticSearch刪除數據成功:{}", response.getId());
        } catch (Exception e) {
            log.error("從elasticSearch刪除數據失敗: {}", e);
        }
    }
}

複製代碼

BinLogElasticSearch.java

package com.example.canal.study.action;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.example.canal.study.common.CanalDataParser;
import com.example.canal.study.common.ElasticUtils;
import com.example.canal.study.pojo.Student;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
 * 獲取binlog數據併發送到es中
 *
 * @author haha
 */
@Slf4j
@Component
public class BinLogElasticSearch {
    @Autowired
    private CanalConnector canalSimpleConnector;
    @Autowired
    private ElasticUtils elasticUtils;
    //@Qualifier("canalHaConnector")使用名爲canalHaConnector的bean
    @Autowired
    @Qualifier("canalHaConnector")
    private CanalConnector canalHaConnector;

    public void binLogToElasticSearch() throws IOException {
        openCanalConnector(canalSimpleConnector);
        // 輪詢拉取數據
        Integer batchSize = 5 * 1024;
        while (true) {
//            Message message = canalHaConnector.getWithoutAck(batchSize);
            Message message = canalSimpleConnector.getWithoutAck(batchSize);
            long id = message.getId();
            int size = message.getEntries().size();
            log.info("當前監控到binLog消息數量{}", size);
            if (id == -1 || size == 0) {
                try {
                    // 等待4秒
                    Thread.sleep(4000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                //1. 解析message對象
                List<CanalEntry.Entry> entries = message.getEntries();
                List<CanalDataParser.TwoTuple<CanalEntry.EventType, Map>> rows = CanalDataParser.printEntry(entries);

                for (CanalDataParser.TwoTuple<CanalEntry.EventType, Map> tuple : rows) {
                    if (tuple.eventType == CanalEntry.EventType.INSERT) {
                        Student student = createStudent(tuple);
                        // 2。將解析出的對象同步到elasticSearch中
                        elasticUtils.saveEs(student, "student_index");
                        // 3.消息確認已處理
                        canalSimpleConnector.ack(id);
//                        canalHaConnector.ack(id);
                    }
                    if (tuple.eventType == CanalEntry.EventType.UPDATE) {
                        Student student = createStudent(tuple);
                        elasticUtils.updateEs(student, "student_index");
                        // 3.消息確認已處理
                        canalSimpleConnector.ack(id);
//                        canalHaConnector.ack(id);
                    }
                    if (tuple.eventType == CanalEntry.EventType.DELETE) {
                        elasticUtils.DeleteEs("student_index", tuple.columnMap.get("id").toString());
                        canalSimpleConnector.ack(id);
//                        canalHaConnector.ack(id);
                    }
                }
            }
        }
    }

    /**
     * 封裝數據至Student對象中
     *
     * @param tuple
     * @return
     */
    private Student createStudent(CanalDataParser.TwoTuple<CanalEntry.EventType, Map> tuple) {
        Student student = new Student();
        student.setId(tuple.columnMap.get("id").toString());
        student.setAge(Integer.parseInt(tuple.columnMap.get("age").toString()));
        student.setName(tuple.columnMap.get("name").toString());
        student.setSex(tuple.columnMap.get("sex").toString());
        student.setCity(tuple.columnMap.get("city").toString());
        return student;
    }

    /**
     * 打開canal鏈接
     *
     * @param canalConnector
     */
    private void openCanalConnector(CanalConnector canalConnector) {
        //鏈接CanalServer
        canalConnector.connect();
        // 訂閱destination
        canalConnector.subscribe();
    }

    /**
     * 關閉canal鏈接
     *
     * @param canalConnector
     */
    private void closeCanalConnector(CanalConnector canalConnector) {
        //關閉鏈接CanalServer
        canalConnector.disconnect();
        // 註銷訂閱destination
        canalConnector.unsubscribe();
    }
}
複製代碼

CanalDemoApplication.java(spring boot 啓動類)

package com.example.canal.study;

import com.example.canal.study.action.BinLogElasticSearch;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 應用的啓動類
 * @author haha
 */
@SpringBootApplication
public class CanalDemoApplication implements ApplicationRunner {
    @Autowired
    private BinLogElasticSearch binLogElasticSearch;

    public static void main(String[] args) {
        SpringApplication.run(CanalDemoApplication.class, args);
    }
    // 程序啓動則執行run方法
    @Override
    public void run(ApplicationArguments args) throws Exception {
        binLogElasticSearch.binLogToElasticSearch();
    }
}

複製代碼

application.properties

server.port=8081
spring.application.name = canal-demo

canal.server.ip = localhost
canal.server.port = 11111
canal.destination = example

zookeeper.server.ip = localhost:2181
zookeeper.sasl.client = false

elasticSearch.server.ip = localhost
elasticSearch.server.port = 9200
複製代碼

canal集羣高可用的搭建

經過上面的學習,咱們知道了單機直連方式的canala應用。在當今互聯網時代,單實例模式逐漸被集羣高可用模式取代,那麼canal的多實例集羣方式如何搭建呢!

基於zookeeper獲取canal實例

準備zookeeper的docker鏡像與容器

docker pull zookeeper

docker run -d --name zookeeper --net mynetwork --ip 172.18.0.3 -p 2181:2181 zookeeper

docker run -d --name canal-server2 --net mynetwork --ip 172.18.0.8 -p 11113:11113 canal/canal-server
複製代碼

最終效果如圖

  1. 機器準備
    • 運行canal的容器ip: 172.18.0.4 , 172.18.0.8
    • zookeeper容器ip:172.18.0.3:2181
    • mysql容器ip:172.18.0.6:3306
  2. 按照部署和配置,在單臺機器上各自完成配置,演示時instance name爲example
  3. 修改canal.properties,加上zookeeper配置並修改canal端口
    canal.port=11113
    canal.zkServers=172.18.0.3:2181
    canal.instance.global.spring.xml = classpath:spring/default-instance.xml
    複製代碼
  4. 建立example目錄,並修改instance.properties
    canal.instance.mysql.slaveId = 1235 
    #以前的canal slaveId是1234,保證slaveId不重複便可
    canal.instance.master.address = 172.18.0.6:3306
    複製代碼

注意: 兩臺機器上的instance目錄的名字須要保證徹底一致,HA模式是依賴於instance name進行管理,同時必須都選擇default-instance.xml配置

啓動兩個不一樣容器的canal,啓動後,能夠經過tail -100f logs/example/example.log查看啓動日誌,只會看到一臺機器上出現了啓動成功的日誌。

好比我這裏啓動成功的是 172.18.0.4

查看一下zookeeper中的節點信息,也能夠知道當前工做的節點爲172.18.0.4:11111

[zk: localhost:2181(CONNECTED) 15] get /otter/canal/destinations/example/running  
{"active":true,"address":"172.18.0.4:11111","cid":1}

複製代碼

客戶端連接, 消費數據

能夠經過指定zookeeper地址和canal的instance name,canal client會自動從zookeeper中的running節點,獲取當前服務的工做節點,而後與其創建連接:

[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example/running
{"active":true,"address":"172.18.0.4:11111","cid":1}
複製代碼

對應的客戶端編碼可使用以下形式,上文中的CanalConfig.java中的canalHaConnector就是一個HA鏈接

CanalConnector connector = CanalConnectors.newClusterConnector("172.18.0.3:2181", "example", "", "");
複製代碼

連接成功後,canal server會記錄當前正在工做的canal client信息,好比客戶端ip,連接的端口信息等 (聰明的你,應該也能夠發現,canal client也能夠支持HA功能)

[zk: localhost:2181(CONNECTED) 4] get /otter/canal/destinations/example/1001/running
{"active":true,"address":"192.168.124.5:59887","clientId":1001}
複製代碼

數據消費成功後,canal server會在zookeeper中記錄下當前最後一次消費成功的binlog位點. (下次你重啓client時,會從這最後一個位點繼續進行消費)

[zk: localhost:2181(CONNECTED) 5] get /otter/canal/destinations/example/1001/cursor
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"mysql.mynetwork","port":3306}},"postion":{"included":false,"journalName":"binlog.000004","position":2169,"timestamp":1562672817000}}
複製代碼

中止正在工做的172.18.0.4的canal server

docker exec -it canal-server bash
cd canal-server/bin
sh stop.sh
複製代碼

這時172.18.0.8會立馬啓動example instance,提供新的數據服務

[zk: localhost:2181(CONNECTED) 19] get /otter/canal/destinations/example/running
{"active":true,"address":"172.18.0.8:11111","cid":1}
複製代碼

與此同時,客戶端也會隨着canal server的切換,經過獲取zookeeper中的最新地址,與新的canal server創建連接,繼續消費數據,整個過程自動完成

異常與總結

elasticsearch-head沒法訪問elasticsearch

es與es-head是兩個獨立的進程,當es-head訪問es服務時,會存在一個跨域問題。因此咱們須要修改es的配置文件,增長一些配置項來解決這個問題,以下

[root@localhost /usr/local/elasticsearch-head-master]# cd ../elasticsearch-5.5.2/config/
[root@localhost /usr/local/elasticsearch-5.5.2/config]# vim elasticsearch.yml 
# 文件末尾加上以下配置
http.cors.enabled: true
http.cors.allow-origin: "*"
複製代碼

修改完配置文件後需重啓es服務

elasticsearch-head查詢報406 Not Acceptable

解決方法:
一、進入head安裝目錄;
二、cd _site/
三、編輯vendor.js  共有兩處
     #6886行 contentType: "application/x-www-form-urlencoded
    改爲 contentType: "application/json;charset=UTF-8"
     #7574行 var inspectData = s.contentType === "application/x-www-form-urlencoded" &&
    改爲 var inspectData = s.contentType === "application/json;charset=UTF-8" &&
複製代碼

使用elasticsearch-rest-high-level-client報org.elasticsearch.action.index.IndexRequest.ifSeqNo

#pom中除了加入依賴
<dependency>
   <groupId>org.elasticsearch.client</groupId>
   <artifactId>elasticsearch-rest-high-level-client</artifactId>
   <version>7.1.1</version>
</dependency>
#還需加入
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>7.1.1</version>
</dependency>
複製代碼

相關參考 git hub issues

爲何ElasticSearch要在7.X版本不能使用type?

參考:爲何ElasticSearch要在7.X版本去掉type?

使用spring-data-elasticsearch.jar報org.elasticsearch.client.transport.NoNodeAvailableException

因爲本文使用的是elasticsearch7.x以上的版本,目前spring-data-elasticsearch底層採用es官方TransportClient,而es官方計劃放棄TransportClient,工具以es官方推薦的RestHighLevelClient進行調用請求。 可參考RestHighLevelClient API

設置docker容器開啓啓動

若是建立時未指定 --restart=always ,可經過update 命令
docker update --restart=always [containerID]
複製代碼

docker for Mac network host 模式不生效

host 模式是爲了性能,可是這卻對 docker 的隔離性形成了破壞,致使安全性下降。 在性能場景下,能夠用 --netwokr host 開啓 Host 模式,但須要注意的是,若是你用 Windows 或 Mac 本地啓動容器的話,會遇到 host 模式失效的問題。緣由是 host 模式只支持 Linux 宿主機。

參見官方文檔:docs.docker.com/network/hos…

客戶端鏈接zookeeper報authenticate using SASL(unknow error)

  • zookeeper.jar與dokcer中的zookeeper版本不一致
  • zookeeper.jar 使用了3.4.6以前的版本

出現這個錯的意思是zookeeper做爲外部應用須要向系統申請資源,申請資源的時候須要經過認證,而sasl是一種認證方式,咱們想辦法來繞過sasl認證。避免等待,來提升效率。

在項目代碼中加入System.setProperty("zookeeper.sasl.client", "false");,若是是spring boot 項目能夠在application.properties中加入zookeeper.sasl.client=false

參考:Increased CPU usage by unnecessary SASL checks

若是更換canal.client.jar中依賴的zookeeper.jar的版本

把canal的官方源碼下載到本機git clone https://github.com/alibaba/canal.git,而後修改client模塊下pom.xml文件中關於zookeeper的內容,而後從新mvn install

把本身項目依賴的包替換爲剛剛 mvn install生產的包

zookeeper返回的是docker容器中的ip,而宿主機ip與容器ip不是同一個網段,沒法ping通

修改hosts文件只能夠實現域名到ip的映射(域名重定向),iptables能夠實現端口的重定向,可是這個問題是要經過ip到ip的重定向能夠解決,可是研究了一下沒找到怎麼設置(windows、mac),因此咱們修改canal的官方源碼來達到咱們想要的目的。修改ClusterCanalConnector.java中的connect()方法。

如下是修改後內容對比圖

關於選型的取捨

本文示例項目源代碼==>canal-elasticsearch-sync

相關文章
相關標籤/搜索