原文連接:https://www.baeldung.com/spring-cloud-data-flow-etljava
做者:Norberto Ritzmannspring
譯者:Emmasql
Spring Cloud Data Flow是一個用於構建實時數據管道和批處理過程的雲原生工具包。 Spring Cloud Data Flow已準備好用於一系列數據處理用例,如簡單的導入/導出,ETL處理,事件流和預測分析。mongodb
在本教程中,咱們將學習使用流管道實時提取轉換和加載(ETL)的示例,該管道從JDBC數據庫中提取數據,將其轉換爲簡單的POJO並將其加載到MongoDB中。docker
ETL - 提取,轉換和加載 -一般被認爲將數據從多個數據庫和系統批量加載到公共數據倉庫中的過程。在此數據倉庫中,能夠在不影響系統總體性能的狀況下進行大量數據分析處理。shell
然而,新趨勢正在改變如何作到這一點的方式。 ETL仍然能夠將數據傳輸到數據倉庫和數據池。數據庫
如今,能夠藉助Spring Cloud Data Flow的事件流體系架構使用流來完成此操做。服務器
藉助Spring Cloud Data Flow(SCDF),開發人員能夠建立兩種風格的數據管道:架構
使用Spring Cloud Stream的長效實時流應用程序app
使用Spring Cloud Task的批處理短時間任務應用程序
在本文中,咱們將介紹第一個,基於Spring Cloud Stream的長效流媒體應用程序。
SCDF管道流由不一樣的步驟組成,其中每一步都是使用Spring Cloud Stream微框架以Spring Boot樣式構建的應用程序。這些應用程序集成了像Apache Kafka或RabbitMQ等的消息中間件。
這些應用程序分爲源,處理器和接收器。與ETL過程相比,咱們能夠說源是「提取」,處理器是「轉換器」,接收器是「加載」部分。
在某些狀況下,咱們能夠在管道的一個或多個步驟中使用應用程序啓動器。這意味着咱們不須要爲每一步實現新的應用程序,而是配置已實現的現有應用程序啓動器。
能夠在此處找到應用程序啓動器列表。
在開始以前,咱們須要選擇這個複雜部署的部分。要定義的第一部分是SCDF服務器。
爲了進行測試,咱們將使用SCDF Server Local進行本地開發。對於生產部署,咱們稍後能夠選擇雲本機運行時,如SCDF Server Kubernetes。咱們能夠在這裏找到服務器運行列表。
如今,檢查系統要求是否知足運行此服務器。
要運行SCDF服務器,咱們必須定義並設置兩個依賴項:
消息中間件
關係數據庫管理系統(the RDBMS)
咱們將使用RabbitMQ做爲消息中間件,咱們選擇PostgreSQL做爲RDBMS來存儲咱們的管道流定義。
爲了運行RabbitMQ,能夠在此處下載最新版本並使用默認配置啓動RabbitMQ實例或運行如下Docker命令:
docker run --name dataflow-rabbit -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management
最後的設置步驟:在默認端口5432上安裝並運行PostgreSQL RDBMS。以後,建立一個數據庫,SCDF可使用如下腳本存儲其流定義:
CREATE DATABASE dataflow;
咱們能夠選擇使用docker-compose啓動服務器,或者將其做爲Java應用程序啓動來運行SCDF Server Local。
在這裏,咱們將SCDF Server Local做爲Java應用程序運行。爲了配置應用程序,咱們必須將配置定義爲Java應用程序參數。咱們在系統路徑中須要配置Java 8。
爲了託管jar和依賴項,咱們須要爲SCDF Server建立一個主文件夾,並將SCDF Server Local發行版下載到此文件夾中。您能夠在此處下載SCDF Server Local最新分行版。
此外,咱們須要建立一個lib文件夾並在其中放置JDBC驅動程序。這裏提供了最新版本的PostgreSQL驅動程序。
最後,運行SCDF本地服務器:
$java -Dloader.path=lib -jar spring-cloud-dataflow-server-local-1.6.3.RELEASE.jar \
--spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/dataflow \
--spring.datasource.username=postgres_username \
--spring.datasource.password=postgres_password \
--spring.datasource.driver-class-name=org.postgresql.Driver \
--spring.rabbitmq.host=127.0.0.1 \
--spring.rabbitmq.port=5672 \
--spring.rabbitmq.username=guest \
--spring.rabbitmq.password=guest
咱們能夠經過查看此URL來檢查它是否正在運行:http://localhost:9393/dashboard
SCDF Shell是一個命令行工具,能夠輕鬆組合和部署咱們的應用程序和管道。這些Shell命令在Spring Cloud Data Flow Server REST API上運行。
在此處得到最新版本的jar,而且下載到SCDF主文件夾中。完成後,運行如下命令(根據須要更新版本):
$ java -jar spring-cloud-dataflow-shell-1.6.3.RELEASE.jar
____ ____ _ __
/ ___| _ __ _ __(_)_ __ __ _ / ___| | ___ _ _ __| |
\___ \| '_ \| '__| | '_ \ / _` | | | | |/ _ \| | | |/ _` |
___) | |_) | | | | | | | (_| | | |___| | (_) | |_| | (_| |
|____/| .__/|_| |_|_| |_|\__, | \____|_|\___/ \__,_|\__,_|
____ |_| _ __|___/ __________
| _ \ __ _| |_ __ _ | ___| | _____ __ \ \ \ \ \ \
| | | |/ _` | __/ _` | | |_ | |/ _ \ \ /\ / / \ \ \ \ \ \
| |_| | (_| | || (_| | | _| | | (_) \ V V / / / / / / /
|____/ \__,_|\__\__,_| |_| |_|\___/ \_/\_/ /_/_/_/_/_/
Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>
若是最後一行中得到「server-unknown:>」而不是「dataflow:>」,則表示您沒有在localhost上運行SCDF服務器。在這種狀況下,請運行如下命令以鏈接到另外一臺主機:
server-unknown:>dataflow config server http://{host}
如今,Shell鏈接到SCDF服務器,咱們能夠運行咱們的命令。
咱們在Shell中須要作的第一件事就是導入應用程序啓動器。在Spring Boot 2.0.x中找到RabbitMQ + Maven的最新版本,並運行如下命令(再次聲明,根據須要更新版本,此處爲「Darwin-SR1」):
$ dataflow:>app import --uri http://bit.ly/Darwin-SR1-stream-applications-rabbit-maven
檢查應用程序是否安裝完成,請運行如下Shell命令:
$ dataflow:> app list
所以,咱們應該看到一個包含全部已安裝應用程序的表。
此外,SCDF提供了一個名爲Flo的圖形界面,咱們能夠經過如下地址訪問:http://localhost:9393/dashboard。可是,它的使用不在本文的範圍內。
咱們如今建立咱們的流管道。爲此,咱們將使用JDBC Source應用啓動程序從關係數據庫中提取信息。
此外,咱們將建立一個自定義處理器,用於轉換信息結構和一個自定義接收器,將數據加載到MongoDB中。
建立一個名爲crm的數據庫和一個名爲customer的表:
CREATE DATABASE crm;
CREATE TABLE customer (
id bigint NOT NULL,
imported boolean DEFAULT false,
customer_name character varying(50),
PRIMARY KEY(id)
)
請注意,咱們正在使用導入的標誌,該標誌將存儲已導入的記錄。若有必要,咱們還能夠將此信息存儲在另外一個表中。
如今,插入一些數據:
INSERT INTO customer(id, customer_name, imported) VALUES (1, 'John Doe', false);
對於轉換步驟,咱們將把源表customer_name字段簡單轉換爲新字段名稱。其餘轉換能夠在這裏完成,但儘可能保持簡短例子。
爲此,咱們將建立一個名爲customer-transform的新項目。最簡單的方法是使用Spring Initializr站點建立項目。看到網站後,選擇一個Group和一個Artifact名稱。咱們將分別使用com.customer和customer-transform。
完成後,單擊「生成項目」按鈕下載項目。而後,解壓縮項目並將其導入喜歡的IDE,並將如下依賴項添加到pom.xml:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
如今咱們開始爲字段名稱轉換進行編碼。爲此,咱們將建立Customer類做爲適配器。此類將經過setName()方法接收customer_name,並將經過getName方法輸出其值。
@JsonProperty註釋在JSON反序列化到Java時執行轉換:
public class Customer {
private Long id;
private String name;
@JsonProperty("customer_name")
public void setName(String name) {
this.name = name;
}
@JsonProperty("name")
public String getName() {
return name;
}
// Getters and Setters
}
處理器須要從輸入接收數據,進行轉換並將結果綁定到輸出通道。建立一個類來執行此操做:
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.integration.annotation.Transformer;
@EnableBinding(Processor.class)
public class CustomerProcessorConfiguration {
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Customer convertToPojo(Customer payload) {
return payload;
}
}
在上面的代碼中,咱們能夠觀察到轉換是自動發生的。輸入接收JSON數據,Jackson使用set方法將其反序列化爲Customer對象。
與輸入相反,輸出使用get方法將數據序列化爲JSON。
與轉換步驟相似,咱們將建立另外一個maven項目,名爲customer-mongodb-sink。再次,訪問Spring Initializr,Group名爲com.customer,Artifact名爲customer-mongodb-sink。而後,在依賴項搜索框中鍵入「MongoDB」並下載項目。
接下來,項目解壓縮並導入IDE.
而後,添加與customer-transform項目中相同的額外依賴項。
如今咱們將建立另外一個Customer類,用於在此步驟中接收輸入:
import org.springframework.data.mongodb.core.mapping.Document;
@Document(collection="customer")
public class Customer {
private Long id;
private String name;
// Getters and Setters
}
爲了接收Customer,咱們將建立一個Listener類,它將使用CustomerRepository保存客戶實體:
@EnableBinding(Sink.class)
public class CustomerListener {
@Autowired
private CustomerRepository repository;
@StreamListener(Sink.INPUT)
public void save(Customer customer) {
repository.save(customer);
}
}
在這種狀況下,CustomerRepository是Spring Data的MongoRepository:
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface CustomerRepository extends MongoRepository<Customer, Long> {
}
如今,兩個自定義應用程序均可以在SCDF服務器上註冊。爲了實現這個目標,先使用Maven命令mvn install編譯這兩個項目。
以後,使用Spring Cloud Data Flow Shell註冊它們:
app register --name customer-transform --type processor --uri maven://com.customer:customer-transform:0.0.1-SNAPSHOT
app register --name customer-mongodb-sink --type sink --uri maven://com.customer:customer-mongodb-sink:jar:0.0.1-SNAPSHOT
最後,檢查應用程序是否存儲在SCDF中,在shell中運行application list命令:
app list
所以,咱們應該在結果表中看到這兩個應用程序。
DSL定義應用程序之間的配置和數據流。 SCDF DSL很簡單。在第一個單詞中,咱們定義應用程序的名稱,而後是配置。
此外,語法是受Unix啓發的Pipeline語法,它使用垂直條(也稱爲「管道」)來鏈接多個應用程序:
http --port=8181 | log
建立端口是8181HTTP應用程序,該應用程序將收到的任何正文有效負載發送到日誌。
如今,讓咱們看看如何建立JDBC Source的DSL流定義。
JDBC Source的關鍵配置是查詢和更新。查詢將選擇未讀記錄,而更新將更改標誌以防止從新讀取當前記錄。
此外,咱們將定義JDBC Source以30秒的固定延遲輪詢並輪詢最多1000行。最後,咱們將定義鏈接的配置,如驅動程序,用戶名,密碼和鏈接URL:
jdbc
--query='SELECT id, customer_name FROM public.customer WHERE imported = false'
--update='UPDATE public.customer SET imported = true WHERE id in (:id)'
--max-rows-per-poll=1000
--fixed-delay=30 --time-unit=SECONDS
--driver-class-name=org.postgresql.Driver
--url=jdbc:postgresql://localhost:5432/crm
--username=postgres
--password=postgres
能夠在此處找到更多JDBC Source配置屬性。
因爲咱們沒有在customer-mongodb-sink的application.properties中定義鏈接配置,咱們將經過DSL參數進行配置。
咱們的應用程序徹底基於MongoDataAutoConfiguration。您能夠在此處查看其餘可能的配置。基本上,咱們將定義spring.data.mongodb.uri:
customer-mongodb-sink --spring.data.mongodb.uri=mongodb://localhost/main
首先,要建立最終的流定義,請返回到Shell並執行如下命令(沒有換行符,剛剛插入它們以便於閱讀):
stream create --name jdbc-to-mongodb
--definition "jdbc
--query='SELECT id, customer_name FROM public.customer WHERE imported=false'
--fixed-delay=30
--max-rows-per-poll=1000
--update='UPDATE customer SET imported=true WHERE id in (:id)'
--time-unit=SECONDS
--password=postgres
--driver-class-name=org.postgresql.Driver
--username=postgres
--url=jdbc:postgresql://localhost:5432/crm | customer-transform | customer-mongodb-sink
--spring.data.mongodb.uri=mongodb://localhost/main"
此DSL流被定義名爲jdbc-to-mongodb。接下來,咱們將按名稱部署流:
stream deploy --name jdbc-to-mongodb
最後,咱們應該在日誌輸出中看到全部可用日誌的位置:
Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-mongodb-sink
Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-transform
Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.jdbc
在本文中,咱們已經看到了使用Spring Cloud Data Flow的ETL數據管道的完整示例。
最值得注意的是,咱們看到了應用啓動程序的配置,使用Spring Cloud Data Flow Shell建立了一個ETL流管道,併爲咱們的讀取,轉換和寫數據實現了自定義應用程序。
與往常同樣,示例代碼能夠在GitHub中找到。