Spring Cloud Data Flow--大數據操做工具,做爲Spring XD的替代產品,它是一個混合計算模型,結合了流數據與批量數據的處理方式。爲數據微服務提供了業務流程,包括長期的 Spring Cloud Stream 應用程序和短時間的 Spring Cloud Task 應用程序。html
以前不多寫博客,主要是國內相關資料少之又少,踩了不少坑,谷歌也沒多大幫助,深知那種無助的感受,因此記錄一下,與喜歡研究技術的朋友分享一下,但願此文會有小小的幫助,微服務的明天會更好java
1.經過vagrant安裝DCOS,咱們會獲得一個Marathon端點url:http://m1.dcos/service/marathon,留做配置用,若是是其它好比CLI、GUI的安裝方式,也可配置其服務的ip地址。service安裝這裏不作過多介紹,能夠經過dcos的universe安裝,也能夠經過json文件安裝mysql
2.安裝mysql能夠經過DCOS進行安裝,若是你有一個mysql數據庫亦可沒必要安裝linux
curl -X POST http://m1.dcos/service/marathon/v2/apps -d @mysql.json -H "Content-type: application/json"
3.安裝rabbitmq,若是你有一個rabbitmq服務器亦可沒必要安裝,此處注意,對於springboot應用,應當在應用配置文件中配置mq的用戶名密碼,如:git
curl -X POST http://m1.dcos/service/marathon/v2/apps -d @rabbitmq.json -H "Content-type: application/json"
4.安裝redis,若是你有一個redis服務器亦可沒必要安裝github
curl -X POST http://m1.dcos/service/marathon/v2/apps -d @redis.json -H "Content-type: application/json"
5.安裝chronos,若是你有一個chronos服務器亦可沒必要安裝,速度慢說明拉取鏡像慢,此處共享下百度雲連接:http://pan.baidu.com/s/1dFaQvSX 密碼:3j9c,下載到agent節點直接docker load < chronos.tar,docker images查看便可,從新執行,速度槓槓滴。web
dcos package install chronos
6.獲取springclouddataflow的json配置文件redis
$ wget https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-server-mesos/v1.0.0.RELEASE/src/etc/marathon/scdf-server.json
7.直接貼上個人配置文件spring
{sql
"id": "/spring-cloud-data-flow",
"instances": 1,
"cpus": 2,
"mem": 4000,
"disk": 3000,
"gpus": 0,
"backoffSeconds": 1,
"backoffFactor": 1.15,
"maxLaunchDelaySeconds": 3600,
"container": {
"type": "DOCKER",
"docker": {
"image": "springcloud/spring-cloud-dataflow-server-mesos:latest",
"network": "BRIDGE",
"portMappings": [
{
"containerPort": 9393,
"hostPort": 0,
"servicePort": 10000,
"protocol": "tcp",
"name": "default"
}
],
"privileged": false,
"forcePullImage": false
}
},
"healthChecks": [
{
"gracePeriodSeconds": 120,
"intervalSeconds": 60,
"timeoutSeconds": 20,
"maxConsecutiveFailures": 0,
"portIndex": 0,
"path": "/management/health",
"protocol": "HTTP",
"ignoreHttp1xx": false
}
],
"upgradeStrategy": {
"minimumHealthCapacity": 1,
"maximumOverCapacity": 1
},
"unreachableStrategy": {
"inactiveAfterSeconds": 300,
"expungeAfterSeconds": 600
},
"killSelection": "YOUNGEST_FIRST",
"requirePorts": true,
"env": {
"JDBC_DRIVER": "org.mariadb.jdbc.Driver",
"MESOS_CHRONOS_URI": "http://172.16.1.77:10105",
"REDIS_HOST": "172.16.1.61",
"RABBITMQ_PORT": "6392",
"MESOS_MARATHON_URI": "http://m1.dcos/service/marathon",
"REDIS_PORT": "6379",
"JDBC_PASSWORD": "1234321",
"JDBC_URL": "jdbc:mysql://172.16.1.145:3306/test",
"SPRING_APPLICATION_JSON": "{\"spring.cloud.deployer.mesos.marathon.apiEndpoint\":\"${MESOS_MARATHON_URI}\",\"spring.cloud.deployer.mesos.chronos.apiEndpoint\":\"${MESOS_CHRONOS_URI}\",\"spring.datasource.url\":\"${JDBC_URL}\",\"spring.datasource.driverClassName\":\"${JDBC_DRIVER}\",\"spring.datasource.username\":\"${JDBC_USERNAME}\",\"spring.datasource.password\":\"${JDBC_PASSWORD}\",\"spring.datasource.testOnBorrow\":true,\"spring.datasource.validationQuery\":\"SELECT 1\",\"spring.redis.host\":\"${REDIS_HOST}\",\"spring.redis.port\":\"${REDIS_PORT}\",\"spring.cloud.deployer.mesos.marathon.environmentVariables\":\"SPRING_RABBITMQ_HOST=${RABBITMQ_HOST},SPRING_RABBITMQ_PORT=${RABBITMQ_PORT}\",\"spring.cloud.deployer.mesos.dcos.authorizationToken\":\"${DCOS_TOKEN}\",\"spring.cloud.config.enabled\":false,\"spring.freemarker.checkTemplateLocation\":false,\"spring.cloud.deployer.mesos.marathon.memory\":\"3000\",\"spring.dataflow.embedded.database.enabled\":false}",
"RABBITMQ_HOST": "172.16.1.77",
"JDBC_USERNAME": "root"
}
}
這裏對幾點作說明:(1)"image": "springcloud/spring-cloud-dataflow-server-mesos:latest",對應的agent節點docker中的鏡像,直接運行的話,首先它會下載鏡像,因爲你懂得,速度慢成翔,你可
以docker去pull喝着大茶慢慢等,若是你別處有此鏡像也可導入到你的agent節點的docker中。此處共享下導出的鏡像百度雲地址連接:http://pan.baidu.com/s/1hstbj5E
密碼:56nk,下載後直接docker load < springdataflow.tar,若是你有搭建docker的本地遠程倉庫,也可將鏡像打個tag推送到倉庫,agent的docker直接從倉庫中pull。
(2)配置marathon還有chronos地址,也能夠配置ip地址,瀏覽器能正常訪問便可
"MESOS_MARATHON_URI": "http://m1.dcos/service/marathon", "MESOS_CHRONOS_URI": "http://m1.dcos/service/chronos",
ip能夠經過service的detail中查看,logs能夠看到其運行日誌
(3)配置mysql
"JDBC_URL": "jdbc:mysql://", "JDBC_DRIVER": "org.mariadb.jdbc.Driver", "JDBC_USERNAME": "", "JDBC_PASSWORD": "",
(4)配置mq
"RABBITMQ_HOST": "", "RABBITMQ_PORT": "",
(5)配置redis
"REDIS_HOST": "", "REDIS_PORT": "",
(6)若是DCOS是開啓權限認證,則須要配置
DCOS_TOKEN
獲取token方法以下:
curl https://downloads.dcos.io/binaries/cli/linux/x86-64/dcos-1.9/dcos -o dcos && sudo mv dcos /usr/local/bin && sudo chmod +x /usr/local/bin/dcos && dcos config set core.dcos_url http://XXXXXXXXX && dcos
安裝dcos的cli,配置你的master地址,而後dcos auth login 出現一個地址,http://XXXXXX/login?redirect_uri=urn:ietf:wg:oauth:2.0:oob貼到瀏覽器獲取一個token
而後複製到控制檯,登陸成功。而後 dcos config show core.dcos_acs_token 出現的token能夠copy到
DCOS_TOKEN
還能夠配置maven地址,可參閱文檔:http://docs.spring.io/spring-cloud-dataflow/docs/1.2.2.BUILD-SNAPSHOT/reference/htmlsingle/#arch-data-flow-server,可是註冊app的時候,直接配置jar的地址或者maven地址,都會出現異常,看了下源碼,在deploy的時候jar會生成一個臨時鏡像,而後經過fegin去請求marathon的api,可是請求一直會報異常null或者:reason不啦不啦的,因此我是把每個流的jar都經過docker打包鏡像push到遠程倉庫,配置本地docker倉庫地址去拉取鏡像,還有須要注意的地方就是註冊app的名稱還有stream的名稱的時候所有用小寫,否則fegin請求的時候也會報異常。如有大神知道jar或者maven配置的正確方法,請分享與我,謝謝。
8.而後就能夠經過json文件運行了,頁面也是能夠操做的
查看日誌啓動成功
9.訪問springdataflow的web端http://XXXX:AAA/dashboard,亦可經過
spring-cloud-dataflow-shell
來註冊app或者stream,此處再也不贅述。
10.註冊app
11.建立數據流
注意:至少得有一個source一個sink,因爲資源有限,processor暫時不加入(ps:跑應用真的很消耗資源)
12.應用數據流
而後就能夠取services去查看,一個jar應該會啓動一個實例
大功告成哈哈哈哈哈哈哈哈哈
下面附上部分代碼
source:
application
import java.util.Date; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.context.annotation.Bean; import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.Poller; import org.springframework.integration.core.MessageSource; import org.springframework.messaging.support.MessageBuilder; @EnableBinding(Source.class) @SpringBootApplication public class LoggingSourceApplication { @Bean @InboundChannelAdapter( value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1") ) public MessageSource<Long> timeMessageSource() { System.out.println(new Date() +"======================logging-source========================== execued"); return () -> { System.out.println(new Date() + "*****logging-source****** send"); return MessageBuilder.withPayload(new Date().getTime()).build(); }; } public static void main(String[] args) { SpringApplication.run(LoggingSourceApplication.class, args); } }
properties
spring.rabbitmq.host=172.16.3.183 spring.rabbitmq.username=admin spring.rabbitmq.password=admin # 本機啓動測試須要如下配置 spring.cloud.stream.default.contentType=application/json spring.cloud.stream.bindings.output.destination=source-log # 默認狀況下,Spring Cloud Stream 會在 RabbitMQ 中建立一個臨時的隊列,程序關閉, # 對應的鏈接關閉的時候,該隊列也會消失。爲此,咱們須要一個持久化的隊列,而且指定一個分組,用於保證應用服務的縮放。 # 只須要在消費者端的 binding 添加配置項 spring.cloud.stream.bindings.[channelName].group = logistic # 對應的隊列就是持久化 spring.cloud.stream.bindings.output.group=logTest spring.cloud.stream.bindings.output.binder=rabbitMq1 spring.cloud.stream.binders.rabbitMq1.type=rabbit spring.cloud.stream.default-binder=rabbitMq1 # rabbitMQ服務器地址 spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.host=172.16.3.183 # rabbitMQ服務器端口 spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.port=5672 spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.username=admin spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.password=admin spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.virtual-host=/
pom
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.4.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Camden.SR5</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
sink:
application
import java.util.Date; import java.util.Map; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.integration.annotation.MessageEndpoint; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; @EnableBinding(Sink.class) @SpringBootApplication public class LoggingSinkApplication { @MessageEndpoint public static class LoggingMessageEndpoint { @ServiceActivator(inputChannel = Sink.INPUT) public void logIncomingMessages(@Payload String msg, @Headers Map<String, Object> headers) { System.out.println(new Date() + "***********logging-sink**************"+ msg); headers.entrySet().forEach(e -> System.out.println(e.getKey() + '=' + e.getValue())); } } @StreamListener(Sink.INPUT) public void loggerSink(String date) { System.out.println("logging-sink Received: " + date); } @Payload public static void main(String[] args) { SpringApplication.run(LoggingSinkApplication.class, args); } }
properties
spring.rabbitmq.host=172.16.3.183 spring.rabbitmq.username=admin spring.rabbitmq.password=admin #本地測試須要配置 server.port=8090 spring.cloud.stream.default.contentType=application/json spring.cloud.stream.bindings.input.destination=source-log # 默認狀況下,Spring Cloud Stream 會在 RabbitMQ 中建立一個臨時的隊列,程序關閉, # 對應的鏈接關閉的時候,該隊列也會消失。爲此,咱們須要一個持久化的隊列,而且指定一個分組,用於保證應用服務的縮放。 # 只須要在消費者端的 binding 添加配置項 spring.cloud.stream.bindings.[channelName].group = logistic # 對應的隊列就是持久化 spring.cloud.stream.bindings.input.group=logTest spring.cloud.stream.bindings.input.binder=rabbitMq1 spring.cloud.stream.binders.rabbitMq1.type=rabbit spring.cloud.stream.default-binder=rabbitMq1 # rabbitMQ服務器地址 spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.host=172.16.3.183 # rabbitMQ服務器端口 spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.port=5672 spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.username=admin spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.password=admin spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.virtual-host=/
pom
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.4.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Camden.SR5</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>