隨行付微服務之數據同步Porter

前言

Porter是一款數據同步中間件,主要用於解決同構/異構數據庫之間的表級別數據同步問題。前端

背景

在微服務架構模式下深入的影響了應用和數據庫之間的關係,不像傳統多個服務共享一個數據庫,微服務架構下每一個服務都要有本身的數據庫。若是你想得到微服務帶來的好處,每一個服務獨有一個數據庫是必須的,由於微服務強調的就是鬆耦合。咱們但願數據庫就和服務同樣,要有充分的獨立性、能夠和服務一塊兒部署、一塊兒擴展、一塊兒重構。同時,還須要兼顧數據中心的數據聚合、DBA的多種數據庫備份、報表中心的業務報表等等矛盾問題。所以便產生了「Porter」項目。node

微服務改造過程當中,沒法避免的一個坎,那就是垂直拆庫,根據不一樣的子服務,把過去的「一庫多服」拆分紅「一庫一服」。mysql

一庫一服仍是一庫多服?

不論是否是微服務架構,應用的各個模塊之間都須要頻繁的通訊、協做、共享數據,實現系統的總體價值。區別點在於單體應用是經過本地方法調用來完成;在微服務中是經過遠程API調用完成。git

而共享數據最賤的方式就是採用共享數據庫模式,也就是單體應用中最經常使用的方式,通常只有一個數據庫,如圖一庫多服和一庫一服的方式:github

一庫多服的架構模式一般會被認爲是微服務架構下的反範式,它的問題在於:web

  • 穩定性:單點故障,一個數據庫掛掉,整批服務所有中止。服務獨立性被扼殺?spring

  • 耦合性:數據在一塊兒,會給貪圖方便的開發或者DBA工程師編寫不少數據間高度依賴的程序或者工具;sql

  • 擴展性:沒法針對某一個服務進行精準優化或擴展,服務會大致分爲兩個讀多寫少、寫多讀少,數據庫優化是根據服務而來的,不是一篇而論。數據庫

因此隨行付內部通常推薦的作法:是爲每個微服務準備一個單獨的數據庫,即一庫一服模式。這種模式更加適合微服務架構,它知足每個服務是獨立開發、獨立部署、獨立擴展的特性。當須要對一個服務進行升級或者數據架構改動的時候,無須影響到其餘的服務。須要對某個服務進行擴展的時候,也能夠手術式的對某一個服務進行局部擴容。apache

那麼問題來了,在改造中咱們發現,如下問題,誕生了該項目:

  • 報表中心和前端詳細頁都存在SQL Join方式,經歷咱們一庫一服的拆分後,沒法在繼續使用SQL Join方式了...
  • 數據中心,作得是數據聚合,數據拆分後,給數據中心帶來了很大的麻煩...
  • 微服務以後,各個應用模塊對數據庫的要求出現了分歧,數據庫類型多元化自主選擇仍是統一...
  • 等等...

Proter介紹

Porter是一個集中式的數據處理通道,全部的數據都在這個數據處理平臺匯聚、分發。Porter是一個無中心、插件友好型分佈式數據同步中間件。默認註冊中心插件實現爲zookeeper, 固然,你也能夠基於註冊中心接口實現自定義註冊中心模塊。在Porter的主流程外分佈着集羣插件、源端消費插件、源端消息轉換器插件、目標端寫入插件、告警插件、自定義數據定義插件等插件模塊,除了集羣插件、告警插件是Porter任務節點全局做用域外,其他插件模塊都隨着同步任務的不一樣而相應組合。得益於良好的設計模式,Porter才能爲你們呈現如此靈活的擴展性與易用性。

功能

Porter始於2017年,提供數據同步功能,但並不只僅侷限於數據同步,在隨行付內部普遍使用。主要提供一下功能:

  • 原生支持Oracle|Mysql到Jdbc關係型數據庫最終一致同步
  • 插件友好化,支持自定義源端消費插件、目標端載入插件、告警插件等插件二次開發。
  • 支持自定義源端、目標端表、字段映射
  • 支持節點基於配置文件的同步任務配置。
  • 支持管理後臺同步任務推送,節點、任務管理。提供任務運行指標監控,節點運行日誌、任務異常告警。
  • 支持節點資源限流、分配。
  • 基於Zookeeper集羣插件的分佈式架構。支持自定義集羣插件。

架構設計

Porter節點經過註冊中心實現分佈式集羣,並根據資源需求動態擴縮容。Portert與註冊中心協商了一套任務、節點、統計接口,Porter節點經過監聽註冊中心接口數據的變化實現任務的分配管理。配置管理後臺遵照並實現註冊中心的接口規範,實現對Porter節點遠程管理。註冊中心一樣有一套分佈式鎖機制,用於任務資源的分配。

在這個機制外,Porter節點能夠經過本地配置文件的方式實現任務的定義。

原理介紹:

  • 一、基於Canal開源產品,獲取MySql數據庫增量日誌數據。
  • 二、管理系統架構。管理節點(web manager)管理工做節點任務編排、數據工做節點(TaskWork)彙報工做進度
  • 三、基於Zookeeper集羣插件的分佈式架構。支持自定義集羣插件
  • 四、基於Kafka消息組件,每張表對應一個Topic,數據節點分Topic消費工做

處理流程

爲了保證數據的一致性,源端數據提取與目標端插入採用單線程順序執行,中間階段經過多線程執行提升數據處理速度。對照上圖就是SelectJob與LoadJob單線程執行,ExtractJob、TransformJob線程並行執行,而後在LoadJob階段對數據包進行排序,順序寫入目標端。

正如文章開頭所說,告警插件與註冊中心插件在多個任務間共享,每一個任務根據源端與目標端的類型、源端數據格式選擇與之相匹配的處理插件。也就是說告警插件、註冊中心插件與Porter節點配置相關,數據消費插件、目標端插件、自定義數據處理插件等插件與任務配置相關。

插件化設計

Porter經過SPI規範結合單例、工廠、監聽者模式等設計模式,實現了極大的靈活性與鬆耦合,知足不一樣場景的二次開發。具體涵蓋以下四個方面的插件化設計:

  • 註冊中心插件
  • 源端消費插件
  • 目標端載入插件
  • 自定義數據處理插件

註冊中心插件

在common包META-INF/spring.factories有以下內容:

cn.vbill.middleware.porter.common.cluster.ClusterProvider=
	cn.vbill.middleware.porter.common.cluster.impl.zookeeper.ZookeeperClusterProvider
複製代碼

摘抄ClusterProvider接口定義:

/**
     * 匹配配置文件指定的註冊中心實現
     * @param type
     * @return
     */
    boolean matches(ClusterPlugin type);
複製代碼

porter-boot的配置文件對註冊中心的配置以下:

#集羣配置
	porter.cluster.strategy=ZOOKEEPER
	porter.cluster.client.url=127.0.0.1:2181
	porter.cluster.client.sessionTimeout=100000
複製代碼

看到這裏,有了配置文件和插件定義,咱們還差使配置生效的代碼。代碼在Porter-boot的啓動類NodeBootApplication中:

//初始化集羣提供者中間件,spring spi插件
     try {
        //獲取集羣配置信息
        ClusterProviderProxy.INSTANCE.initialize(config.getCluster());
     } catch (Exception e) {
        ClusterProviderProxy.INSTANCE.stop();
        throw new RuntimeException("集羣配置參數ClusterConfig初始化失敗, 數據同步節點退出!error:" + e.getMessage());
     }
複製代碼

ClusterProviderProxy是一個單例枚舉類,在initialize中根據spring.factories配置的實現類順序經過實現類的matches方法匹配配置文件的配置:

List<ClusterProvider> providers = SpringFactoriesLoader.loadFactories(
			ClusterProvider.class, JavaFileCompiler.getInstance());
			
            for (ClusterProvider tmp : providers) {
                if (tmp.matches(config.getStrategy())) {
                    tmp.start(config);
                    provider = tmp;
                    break;
                }
            }
複製代碼

Porter節點經過註冊ClusterListener監聽感知註冊中心的通知事件,Porter的zookeeper實如今包porter-cluster裏,經過ZookeeperClusterMonitor激活:

cn.vbill.middleware.porter.common.cluster.impl.zookeeper.ZookeeperClusterListener=\
  cn.vbill.middleware.porter.cluster.zookeeper.ZKClusterTaskListener,\
  cn.vbill.middleware.porter.cluster.zookeeper.ZKClusterNodeListener,\
  cn.vbill.middleware.porter.cluster.zookeeper.ZKClusterStatisticListener,\
  cn.vbill.middleware.porter.cluster.zookeeper.ZKClusterConfigListener
複製代碼

源端消費插件

本地配置文件任務配置參數以下,指定了源端消費插件,源端鏈接信息,數據轉換插件,目標端插件等:

node.task[0].taskId=任務ID
node.task[0].consumer.consumerName=KafkaFetch #源端消費插件
node.task[0].consumer.converter=oggJson #數據轉換插件
node.task[0].consumer.source.sourceType=KAFKA
node.task[0].consumer.source.servers=127.0.0.1:9200
node.task[0].consumer.source.topics=kafka主題
node.task[0].consumer.source.group=消費組
node.task[0].consumer.source.oncePollSize=單次消費數據條數
node.task[0].consumer.source.pollTimeOut=100
node.task[0].loader.loaderName=JdbcBatch #目標端插件
node.task[0].loader.source.sourceName=全局目標端數據源名字
複製代碼

kafka消費插件「KafkaFetch」定義在porter-plugin/kafka-consumer包,經過META-INF/spring.factories暴露實現:

cn.vbill.middleware.porter.core.consumer.DataConsumer  =
	cn.vbill.middleware.porter.plugin.consumer.kafka.KafkaConsumer
複製代碼

經過消費器工廠類DataConsumerFactory查找並激活,這裏的consumerName就是在配置文件中配置的「KafkaFetch」

public DataConsumer newConsumer(String consumerName) throws DataConsumerBuildException {
        for (DataConsumer t : consumerTemplate) {
            if (t.isMatch(consumerName)) {
                try {
                    return t.getClass().newInstance();
                } catch (Exception e) {
                    LOGGER.error("%s", e);
                    throw new DataConsumerBuildException(e.getMessage());
                }
            }
        }
        return null;
    }
複製代碼

目標端載入插件

要實現目標端載入插件須要繼承cn.vbill.middleware.porter.core.consumer.AbstractDataConsumer

//爲該插件指定的插件名稱,用於在任務配置中指定目標端插件類型
    protected String getPluginName();
    
    //LoadJob階段執單線程執行,實際的目標端插入邏輯,插入對象經過mouldRow()在TransformJob構造
    public Pair<Boolean, List<SubmitStatObject>> load(ETLBucket bucket) throws TaskStopTriggerException, InterruptedException;

    //transform階段多線程並行執行,用於自定義處理數據行
    public void mouldRow(ETLRow row) throws TaskDataException;
    
複製代碼

完成自定義目標端插件開發後,經過spring SPI機制發佈插件

cn.vbill.middleware.porter.core.loader.DataLoader=\ cn.vbill.middleware.porter.plugin.loader.DemoLoader
複製代碼

經過載入器工廠類DataLoaderFactory查找並激活,這裏的loaderName就是在getPluginName()指定的插件名稱

public DataLoader newLoader(String loaderName) throws DataLoaderBuildException {
        for (DataLoader t : loaderTemplate) {
            if (t.isMatch(loaderName)) {
                try {
                    return t.getClass().newInstance();
                } catch (Exception e) {
                    LOGGER.error("%s", e);
                    throw new DataLoaderBuildException(e.getMessage());
                }
            }
        }
複製代碼

那麼在任務配置時如何指定呢?看這裏:

porter.task[0].loader.loaderName=目標端插件名稱
複製代碼

實現細節參考porter-plugin包下的kafka-loader、jdbc-loader、kudu-loader三個目標端

自定義數據處理插件

假設咱們要將mysql表T_USER同步到目標端Oracle T_USER_2,源端表T_USER表結構與目標端表T_USER_2一致。咱們的需求是隻保留FLAG字段等於0的用戶數據。

需求有了,接下來咱們就要實現EventProcessor接口作自定義數據過濾

package cn.vbill.middleware.porter.plugin;
	public class UserFilter implements cn.vbill.middleware.porter.core.event.s.EventProcessor {
    @Override
    public void process(ETLBucket etlBucket) {
        List<ETLRow> rows = etlBucket.getRows().stream().filter(r -> {
            //第一步 找到表名爲T_USER的記錄
            boolean tableMatch = r.getFinalTable().equalsIgnoreCase("T_USER");
            if (!tableMatch) return tableMatch;
            //第二步 找到字段FLAG的值不等於0的記錄
            boolean columnMatch = r.getColumns().stream().filter(c -> c.getFinalName().equalsIgnoreCase("FLAG")
            && (null == c.getFinalValue() || !c.getFinalValue().equals("0"))).count() > 0;
            return tableMatch && columnMatch;
        }).collect(Collectors.toList());
        //第三步 清除不符合條件的集合
        etlBucket.getRows().removeAll(rows);
    }
}

複製代碼

在任務中指定自定義數據處理插件:

porter.task[0].taskId=任務ID
porter.task[0].consumer.consumerName=CanalFetch
porter.task[0].consumer.converter=canalRow
porter.task[0].consumer.source.sourceType=CANAL
porter.task[0].consumer.source.slaveId=0
porter.task[0].consumer.source.address=127.0.0.1:3306
porter.task[0].consumer.source.database=數據庫
porter.task[0].consumer.source.username=帳號
porter.task[0].consumer.source.password=密碼
porter.task[0].consumer.source.filter=*.\.t_user
porter.task[0].consumer.eventProcessor.className=cn.vbill.middleware.porter.plugin.UserFilter
porter.task[0].consumer.eventProcessor.content=/path/UserFilter.class

porter.task[0].loader.loaderName=JdbcBatch #目標端插件
porter.task[0].loader.source.sourceType=JDBC
porter.task[0].loader.source.dbType=ORACLE
porter.task[0].loader.source.url=jdbc:oracle:thin:@//127.0.0.1:1521/oracledb
porter.task[0].loader.source.userName=demo
porter.task[0].loader.source.password=demo

porter.task[0].mapper[0].auto=false
porter.task[0].mapper[0].table=T_USER,T_USER_2

複製代碼

集羣機制

Porter的集羣模式依賴集羣插件,默認的集羣插件基於zookeeper實現。Porter任務節點和管理節點並非強制綁定關係,任務部署能夠經過任務配置文件,也能夠經過管理節點推送。管理節點還能夠管理節點、收集、展現監控指標信息等,是一個不錯的、簡化運維的管理平臺。一樣的,能夠基於zookeeper數據結構協議實現你本身的管理平臺。

集羣模式下的系統結構:

zookeeper集羣模式插件

zookeeper數據結構協議: {% asset_img zk_data_schema.png zk_data_schema.png %}

Porter的集羣機制主要有如下功能:

  • 實現節點任務的負載,當前任務節點失效後自動漂移到其餘任務節點
  • 實現任務節點與管理節點的通訊
  • 實現任務處理進度的存儲與拉取
  • 實現統計指標數據的上傳(最新的開發版本支持自定義統計指標上傳客戶端,原生支持kafka)
  • 用於節點、任務搶佔的分佈式鎖實現

基於文件系統的單機模式插件

最新開發版支持Porter任務節點以單機模式運行,不依賴管理後臺和zookeeper,經過配置文件配置任務。單機模式是一種特殊的集羣模式,僅支持部分集羣功能,但簡化了任務部署的複雜性,靈活多變。

  • 實現任務處理進度的存儲與拉取
  • 實現統計指標數據的上傳

Porter任務節點運行模式的配置方式

#zookeeper集羣配置
porter.cluster.strategy=ZOOKEEPER
porter.cluster.client.url=127.0.0.1:2181
porter.cluster.client.sessionTimeout=100000
	
#單機模式配置
porter.cluster.strategy=STANDALONE
porter.cluster.client.home=/path/.porter

複製代碼

最後

筆者在開源Porter以前有幸參與apache skywalking社區並受其感召,隨後參與sharding-sphere等多個開源項目。咱們竭盡所能提供優質的開源軟件,爲中國的開源社區貢獻一份力量。但受限於技術能力及開源社區的運營經驗,不足之處,懇求你們的批評指正。

Porter的更多實現細節,請移步開源網站。

開源中國:gitee.com/sxfad/porte…

GitHub:github.com/sxfad/porte…

Porter插件開發demo:github.com/sxfad/porte…

Porter開源:835209101

本分類文章,與「隨行付研究院」微信號文章同步,第一時間接收公衆號推送,請關注「隨行付研究院」公衆號。
複製代碼

相關文章
相關標籤/搜索