Porter是一款數據同步中間件,主要用於解決同構/異構數據庫之間的表級別數據同步問題。前端
在微服務架構模式下深入的影響了應用和數據庫之間的關係,不像傳統多個服務共享一個數據庫,微服務架構下每一個服務都要有本身的數據庫。若是你想得到微服務帶來的好處,每一個服務獨有一個數據庫是必須的,由於微服務強調的就是鬆耦合。咱們但願數據庫就和服務同樣,要有充分的獨立性、能夠和服務一塊兒部署、一塊兒擴展、一塊兒重構。同時,還須要兼顧數據中心的數據聚合、DBA的多種數據庫備份、報表中心的業務報表等等矛盾問題。所以便產生了「Porter」項目。node
微服務改造過程當中,沒法避免的一個坎,那就是垂直拆庫,根據不一樣的子服務,把過去的「一庫多服」拆分紅「一庫一服」。mysql
不論是否是微服務架構,應用的各個模塊之間都須要頻繁的通訊、協做、共享數據,實現系統的總體價值。區別點在於單體應用是經過本地方法調用來完成;在微服務中是經過遠程API調用完成。git
而共享數據最賤的方式就是採用共享數據庫模式,也就是單體應用中最經常使用的方式,通常只有一個數據庫,如圖一庫多服和一庫一服的方式:github
一庫多服的架構模式一般會被認爲是微服務架構下的反範式,它的問題在於:web
穩定性:單點故障,一個數據庫掛掉,整批服務所有中止。服務獨立性被扼殺?spring
耦合性:數據在一塊兒,會給貪圖方便的開發或者DBA工程師編寫不少數據間高度依賴的程序或者工具;sql
擴展性:沒法針對某一個服務進行精準優化或擴展,服務會大致分爲兩個讀多寫少、寫多讀少,數據庫優化是根據服務而來的,不是一篇而論。數據庫
因此隨行付內部通常推薦的作法:是爲每個微服務準備一個單獨的數據庫,即一庫一服模式。這種模式更加適合微服務架構,它知足每個服務是獨立開發、獨立部署、獨立擴展的特性。當須要對一個服務進行升級或者數據架構改動的時候,無須影響到其餘的服務。須要對某個服務進行擴展的時候,也能夠手術式的對某一個服務進行局部擴容。apache
那麼問題來了,在改造中咱們發現,如下問題,誕生了該項目:
Porter是一個集中式的數據處理通道,全部的數據都在這個數據處理平臺匯聚、分發。Porter是一個無中心、插件友好型分佈式數據同步中間件。默認註冊中心插件實現爲zookeeper, 固然,你也能夠基於註冊中心接口實現自定義註冊中心模塊。在Porter的主流程外分佈着集羣插件、源端消費插件、源端消息轉換器插件、目標端寫入插件、告警插件、自定義數據定義插件等插件模塊,除了集羣插件、告警插件是Porter任務節點全局做用域外,其他插件模塊都隨着同步任務的不一樣而相應組合。得益於良好的設計模式,Porter才能爲你們呈現如此靈活的擴展性與易用性。
Porter始於2017年,提供數據同步功能,但並不只僅侷限於數據同步,在隨行付內部普遍使用。主要提供一下功能:
Porter節點經過註冊中心實現分佈式集羣,並根據資源需求動態擴縮容。Portert與註冊中心協商了一套任務、節點、統計接口,Porter節點經過監聽註冊中心接口數據的變化實現任務的分配管理。配置管理後臺遵照並實現註冊中心的接口規範,實現對Porter節點遠程管理。註冊中心一樣有一套分佈式鎖機制,用於任務資源的分配。
在這個機制外,Porter節點能夠經過本地配置文件的方式實現任務的定義。
原理介紹:
爲了保證數據的一致性,源端數據提取與目標端插入採用單線程順序執行,中間階段經過多線程執行提升數據處理速度。對照上圖就是SelectJob與LoadJob單線程執行,ExtractJob、TransformJob線程並行執行,而後在LoadJob階段對數據包進行排序,順序寫入目標端。
正如文章開頭所說,告警插件與註冊中心插件在多個任務間共享,每一個任務根據源端與目標端的類型、源端數據格式選擇與之相匹配的處理插件。也就是說告警插件、註冊中心插件與Porter節點配置相關,數據消費插件、目標端插件、自定義數據處理插件等插件與任務配置相關。
Porter經過SPI規範結合單例、工廠、監聽者模式等設計模式,實現了極大的靈活性與鬆耦合,知足不一樣場景的二次開發。具體涵蓋以下四個方面的插件化設計:
在common包META-INF/spring.factories有以下內容:
cn.vbill.middleware.porter.common.cluster.ClusterProvider=
cn.vbill.middleware.porter.common.cluster.impl.zookeeper.ZookeeperClusterProvider
複製代碼
摘抄ClusterProvider接口定義:
/**
* 匹配配置文件指定的註冊中心實現
* @param type
* @return
*/
boolean matches(ClusterPlugin type);
複製代碼
porter-boot的配置文件對註冊中心的配置以下:
#集羣配置
porter.cluster.strategy=ZOOKEEPER
porter.cluster.client.url=127.0.0.1:2181
porter.cluster.client.sessionTimeout=100000
複製代碼
看到這裏,有了配置文件和插件定義,咱們還差使配置生效的代碼。代碼在Porter-boot的啓動類NodeBootApplication中:
//初始化集羣提供者中間件,spring spi插件
try {
//獲取集羣配置信息
ClusterProviderProxy.INSTANCE.initialize(config.getCluster());
} catch (Exception e) {
ClusterProviderProxy.INSTANCE.stop();
throw new RuntimeException("集羣配置參數ClusterConfig初始化失敗, 數據同步節點退出!error:" + e.getMessage());
}
複製代碼
ClusterProviderProxy是一個單例枚舉類,在initialize中根據spring.factories配置的實現類順序經過實現類的matches方法匹配配置文件的配置:
List<ClusterProvider> providers = SpringFactoriesLoader.loadFactories(
ClusterProvider.class, JavaFileCompiler.getInstance());
for (ClusterProvider tmp : providers) {
if (tmp.matches(config.getStrategy())) {
tmp.start(config);
provider = tmp;
break;
}
}
複製代碼
Porter節點經過註冊ClusterListener監聽感知註冊中心的通知事件,Porter的zookeeper實如今包porter-cluster裏,經過ZookeeperClusterMonitor激活:
cn.vbill.middleware.porter.common.cluster.impl.zookeeper.ZookeeperClusterListener=\
cn.vbill.middleware.porter.cluster.zookeeper.ZKClusterTaskListener,\
cn.vbill.middleware.porter.cluster.zookeeper.ZKClusterNodeListener,\
cn.vbill.middleware.porter.cluster.zookeeper.ZKClusterStatisticListener,\
cn.vbill.middleware.porter.cluster.zookeeper.ZKClusterConfigListener
複製代碼
本地配置文件任務配置參數以下,指定了源端消費插件,源端鏈接信息,數據轉換插件,目標端插件等:
node.task[0].taskId=任務ID
node.task[0].consumer.consumerName=KafkaFetch #源端消費插件
node.task[0].consumer.converter=oggJson #數據轉換插件
node.task[0].consumer.source.sourceType=KAFKA
node.task[0].consumer.source.servers=127.0.0.1:9200
node.task[0].consumer.source.topics=kafka主題
node.task[0].consumer.source.group=消費組
node.task[0].consumer.source.oncePollSize=單次消費數據條數
node.task[0].consumer.source.pollTimeOut=100
node.task[0].loader.loaderName=JdbcBatch #目標端插件
node.task[0].loader.source.sourceName=全局目標端數據源名字
複製代碼
kafka消費插件「KafkaFetch」定義在porter-plugin/kafka-consumer包,經過META-INF/spring.factories暴露實現:
cn.vbill.middleware.porter.core.consumer.DataConsumer =
cn.vbill.middleware.porter.plugin.consumer.kafka.KafkaConsumer
複製代碼
經過消費器工廠類DataConsumerFactory查找並激活,這裏的consumerName就是在配置文件中配置的「KafkaFetch」
public DataConsumer newConsumer(String consumerName) throws DataConsumerBuildException {
for (DataConsumer t : consumerTemplate) {
if (t.isMatch(consumerName)) {
try {
return t.getClass().newInstance();
} catch (Exception e) {
LOGGER.error("%s", e);
throw new DataConsumerBuildException(e.getMessage());
}
}
}
return null;
}
複製代碼
要實現目標端載入插件須要繼承cn.vbill.middleware.porter.core.consumer.AbstractDataConsumer
//爲該插件指定的插件名稱,用於在任務配置中指定目標端插件類型
protected String getPluginName();
//LoadJob階段執單線程執行,實際的目標端插入邏輯,插入對象經過mouldRow()在TransformJob構造
public Pair<Boolean, List<SubmitStatObject>> load(ETLBucket bucket) throws TaskStopTriggerException, InterruptedException;
//transform階段多線程並行執行,用於自定義處理數據行
public void mouldRow(ETLRow row) throws TaskDataException;
複製代碼
完成自定義目標端插件開發後,經過spring SPI機制發佈插件
cn.vbill.middleware.porter.core.loader.DataLoader=\ cn.vbill.middleware.porter.plugin.loader.DemoLoader
複製代碼
經過載入器工廠類DataLoaderFactory查找並激活,這裏的loaderName就是在getPluginName()指定的插件名稱
public DataLoader newLoader(String loaderName) throws DataLoaderBuildException {
for (DataLoader t : loaderTemplate) {
if (t.isMatch(loaderName)) {
try {
return t.getClass().newInstance();
} catch (Exception e) {
LOGGER.error("%s", e);
throw new DataLoaderBuildException(e.getMessage());
}
}
}
複製代碼
那麼在任務配置時如何指定呢?看這裏:
porter.task[0].loader.loaderName=目標端插件名稱
複製代碼
實現細節參考porter-plugin包下的kafka-loader、jdbc-loader、kudu-loader三個目標端
假設咱們要將mysql表T_USER同步到目標端Oracle T_USER_2,源端表T_USER表結構與目標端表T_USER_2一致。咱們的需求是隻保留FLAG字段等於0的用戶數據。
需求有了,接下來咱們就要實現EventProcessor接口作自定義數據過濾
package cn.vbill.middleware.porter.plugin;
public class UserFilter implements cn.vbill.middleware.porter.core.event.s.EventProcessor {
@Override
public void process(ETLBucket etlBucket) {
List<ETLRow> rows = etlBucket.getRows().stream().filter(r -> {
//第一步 找到表名爲T_USER的記錄
boolean tableMatch = r.getFinalTable().equalsIgnoreCase("T_USER");
if (!tableMatch) return tableMatch;
//第二步 找到字段FLAG的值不等於0的記錄
boolean columnMatch = r.getColumns().stream().filter(c -> c.getFinalName().equalsIgnoreCase("FLAG")
&& (null == c.getFinalValue() || !c.getFinalValue().equals("0"))).count() > 0;
return tableMatch && columnMatch;
}).collect(Collectors.toList());
//第三步 清除不符合條件的集合
etlBucket.getRows().removeAll(rows);
}
}
複製代碼
在任務中指定自定義數據處理插件:
porter.task[0].taskId=任務ID
porter.task[0].consumer.consumerName=CanalFetch
porter.task[0].consumer.converter=canalRow
porter.task[0].consumer.source.sourceType=CANAL
porter.task[0].consumer.source.slaveId=0
porter.task[0].consumer.source.address=127.0.0.1:3306
porter.task[0].consumer.source.database=數據庫
porter.task[0].consumer.source.username=帳號
porter.task[0].consumer.source.password=密碼
porter.task[0].consumer.source.filter=*.\.t_user
porter.task[0].consumer.eventProcessor.className=cn.vbill.middleware.porter.plugin.UserFilter
porter.task[0].consumer.eventProcessor.content=/path/UserFilter.class
porter.task[0].loader.loaderName=JdbcBatch #目標端插件
porter.task[0].loader.source.sourceType=JDBC
porter.task[0].loader.source.dbType=ORACLE
porter.task[0].loader.source.url=jdbc:oracle:thin:@//127.0.0.1:1521/oracledb
porter.task[0].loader.source.userName=demo
porter.task[0].loader.source.password=demo
porter.task[0].mapper[0].auto=false
porter.task[0].mapper[0].table=T_USER,T_USER_2
複製代碼
Porter的集羣模式依賴集羣插件,默認的集羣插件基於zookeeper實現。Porter任務節點和管理節點並非強制綁定關係,任務部署能夠經過任務配置文件,也能夠經過管理節點推送。管理節點還能夠管理節點、收集、展現監控指標信息等,是一個不錯的、簡化運維的管理平臺。一樣的,能夠基於zookeeper數據結構協議實現你本身的管理平臺。
集羣模式下的系統結構:
zookeeper數據結構協議: {% asset_img zk_data_schema.png zk_data_schema.png %}
Porter的集羣機制主要有如下功能:
最新開發版支持Porter任務節點以單機模式運行,不依賴管理後臺和zookeeper,經過配置文件配置任務。單機模式是一種特殊的集羣模式,僅支持部分集羣功能,但簡化了任務部署的複雜性,靈活多變。
#zookeeper集羣配置
porter.cluster.strategy=ZOOKEEPER
porter.cluster.client.url=127.0.0.1:2181
porter.cluster.client.sessionTimeout=100000
#單機模式配置
porter.cluster.strategy=STANDALONE
porter.cluster.client.home=/path/.porter
複製代碼
筆者在開源Porter以前有幸參與apache skywalking社區並受其感召,隨後參與sharding-sphere等多個開源項目。咱們竭盡所能提供優質的開源軟件,爲中國的開源社區貢獻一份力量。但受限於技術能力及開源社區的運營經驗,不足之處,懇求你們的批評指正。
Porter的更多實現細節,請移步開源網站。
GitHub:github.com/sxfad/porte…
Porter插件開發demo:github.com/sxfad/porte…
Porter開源:835209101
本分類文章,與「隨行付研究院」微信號文章同步,第一時間接收公衆號推送,請關注「隨行付研究院」公衆號。
複製代碼