版權聲明: 本文爲博主原創文章,未經博主容許不得轉載。關注公衆號技術匯(ID: jishuhui_2015) 可聯繫到做者。java
上篇介紹了基於kettle的數據同步工程的搭建,entrypoint.kjb就是整個工程執行的入口。git
爲了進一步下降操做成本,讓整個數據同步過程更穩定、安全,須要進行更高層面的抽象,作成一個簡單易用的系統。github
如下是應用截圖:spring
除了選擇數據源和數據庫以外,還加入了受權碼,意味着受權範圍內的用戶才能使用該系統。sql
由於是內部使用,受權用戶還沒實現後臺管理,直接往應用數據庫裏添加,所選擇的數據源和數據庫都是經過配置文件生成的。數據庫
文末會附上GitHub上的源碼地址,有須要的讀者,能夠進行二次開發改造。安全
數據庫名稱:kettle,目前有兩張表:springboot
一、受權用戶表。表內記錄的用戶便可使用數據同步系統app
CREATE TABLE `authorized_user` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '用戶ID,自增',
`user` varchar(128) NOT NULL COMMENT '用戶名,全局惟一',
`token` varchar(20) NOT NULL COMMENT '用戶的受權碼,全局惟一',
`status` char(1) NOT NULL DEFAULT 'A' COMMENT '受權用戶狀態:A-已受權,R-未受權',
`gmt_create` datetime NOT NULL COMMENT '建立時間',
`gmt_modify` datetime NOT NULL COMMENT '最後修改時間',
PRIMARY KEY (`id`),
UNIQUE KEY `unique_index_token` (`token`) USING BTREE,
UNIQUE KEY `unique_index_user` (`user`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='受權用戶表'
複製代碼
二、同步記錄表。記錄用戶的數據同步操做異步
CREATE TABLE `sync_record` (
`sync` varchar(20) NOT NULL COMMENT '同步記錄主鍵',
`ipv4` varchar(15) NOT NULL COMMENT 'ip地址',
`from_db` varchar(100) NOT NULL COMMENT '源數據',
`to_db` varchar(100) NOT NULL COMMENT '目標數據',
`user` varchar(128) NOT NULL COMMENT '用戶名',
`token` varchar(20) NOT NULL COMMENT '用戶的受權碼',
`status` char(1) NOT NULL DEFAULT 'P' COMMENT '同步狀態:P-正在執行,S-成功,F-失敗',
`gmt_create` datetime NOT NULL COMMENT '同步建立時間',
`gmt_modify` datetime NOT NULL COMMENT '最後修改時間',
PRIMARY KEY (`sync`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='同步記錄表';
複製代碼
由於系統作得比較簡單實用,沒有什麼特別設計之處。筆者重點說三點:
一、數據源及其參數配置
在application.yml配置文件中,有這麼一段配置:
env:
entry-point: kettle/entrypoint.kjb
databases:
- taxi-user
- taxi-account
- taxi-trade
- taxi-coupon
- taxi-bi
- taxi-system
- taxi-credits
- taxi-finance
- taxi-notification
- taxi-gateway
from-dbs:
- PROD
- TEST
- LOCAL
to-dbs:
- LOCAL
- TEST
db-settings:
- name: LOCAL
host: *****
port: 3306
user: *****
password: *****
- name: TEST
host: *****
port: 3306
user: *****
password: *****
- name: PROD
host: *****
port: 3306
user: *****
password: *****
複製代碼
利用了springboot的@ConfigurationProperties的註解。
@Setter
@Getter
@ConfigurationProperties(prefix = "env")
public class EnvConfig {
private List<String> databases;
private List<String> fromDbs;
private List<String> toDbs;
private List<DBSetting> dbSettings;
public DBSetting getDBConfig(String name) {
if (StringUtils.isBlank(name)) return null;
return dbSettings.stream().filter(dbSetting -> dbSetting.getName().equalsIgnoreCase(name)).findFirst().orElse(null);
}
}
複製代碼
當中的DBSetting的定義以下所示:
@Setter
@Getter
@NoArgsConstructor
public class DBSetting {
private String name;
private String host;
private String port = "3306";
private String user = "root";
private String password;
public DBSetting(String host, String user, String password) {
this.host = host;
this.user = user;
this.password = password;
}
}
複製代碼
經過客戶端傳來的參數,便可定位到對應的參數設置。
二、集成kettle的API 由於kettle相關的jar包放在了本身搭建的nexus私服上,因此若是使用的是maven管理jar包的話,須要在settings.xml配置文件中作一點修改:
<mirror>
<id>nexus</id>
<url>公司內部的nexus的URL</url>
<mirrorOf>*,!pentaho-releases</mirrorOf>
</mirror>
複製代碼
其中的mirrorOf節點加上了!pentaho-releases,表示排除pentaho-releases。
而後,在springboot工程中的pom.xml中指定pentaho-releases的url。
<repositories>
<repository>
<id>pentaho-releases</id>
<url>https://nexus.pentaho.org/content/groups/omni/</url>
</repository>
</repositories>
複製代碼
接下來是核心的對接代碼,具體能夠參照工程源碼。
JobMeta jobMeta = getJobMeta(new ClassPathResource(envConfig.getEntryPoint()));
Job job = new Job(null, jobMeta);
//設置Variable
job.setVariable("sync", sync);
job.setVariable("TO_HOST", toDbSetting.getHost());
job.setVariable("TO_DB", form.getDb());
job.setVariable("TO_USER", toDbSetting.getUser());
job.setVariable("TO_PASSWORD", toDbSetting.getPassword());
job.setVariable("TO_PORT", toDbSetting.getPort());
job.setVariable("FROM_HOST", fromDbSetting.getHost());
job.setVariable("FROM_DB", form.getDb());
job.setVariable("FROM_USER", fromDbSetting.getUser());
job.setVariable("FROM_PASSWORD", fromDbSetting.getPassword());
job.setVariable("FROM_PORT", fromDbSetting.getPort());
job.start(); //開始執行Job
job.waitUntilFinished(); //等待Job完成
複製代碼
三、異步執行做業
由於一個Job的執行時間可能會很長,這個主要是看數據量的多少,因此一個request的來回可能會致使TIMEOUT,因此須要改成異步的模式。
其核心的思想是:啓動新的線程,客戶端定時輪詢執行結果。
筆者分兩篇文章介紹瞭如何利用kettle進行數據同步,並實現一個簡易的系統,下降操做成本和出錯率。
就介紹到這了,若有疑問,能夠留言。
歡迎fork個人工程代碼(github.com/liu-weihao/…)。