Spring Cloud Data Flow Server for Apache Mesos 適用於mesos平臺的springcloud數據流服務器(DCOS構建)

Spring Cloud Data Flow--大數據操做工具,做爲Spring XD的替代產品,它是一個混合計算模型,結合了流數據與批量數據的處理方式。爲數據微服務提供了業務流程,包括長期的 Spring Cloud Stream 應用程序和短時間的 Spring Cloud Task 應用程序。html

以前不多寫博客,主要是國內相關資料少之又少,踩了不少坑,谷歌也沒多大幫助,深知那種無助的感受,因此記錄一下,與喜歡研究技術的朋友分享一下,但願此文會有小小的幫助,微服務的明天會更好大笑java

1.經過vagrant安裝DCOS,咱們會獲得一個Marathon端點url:http://m1.dcos/service/marathon,留做配置用,若是是其它好比CLI、GUI的安裝方式,也可配置其服務的ip地址。service安裝這裏不作過多介紹,能夠經過dcos的universe安裝,也能夠經過json文件安裝mysql

2.安裝mysql能夠經過DCOS進行安裝,若是你有一個mysql數據庫亦可沒必要安裝linux

curl -X POST http://m1.dcos/service/marathon/v2/apps -d @mysql.json -H "Content-type: application/json"

3.安裝rabbitmq,若是你有一個rabbitmq服務器亦可沒必要安裝,此處注意,對於springboot應用,應當在應用配置文件中配置mq的用戶名密碼,如:git

curl -X POST http://m1.dcos/service/marathon/v2/apps -d @rabbitmq.json -H "Content-type: application/json"

4.安裝redis,若是你有一個redis服務器亦可沒必要安裝github

curl -X POST http://m1.dcos/service/marathon/v2/apps -d @redis.json -H "Content-type: application/json"

5.安裝chronos,若是你有一個chronos服務器亦可沒必要安裝,速度慢說明拉取鏡像慢,此處共享下百度雲連接:http://pan.baidu.com/s/1dFaQvSX  密碼:3j9c,下載到agent節點直接docker load < chronos.tar,docker images查看便可,從新執行,速度槓槓滴。web

dcos package install chronos

6.獲取springclouddataflow的json配置文件redis

$ wget https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-server-mesos/v1.0.0.RELEASE/src/etc/marathon/scdf-server.json

7.直接貼上個人配置文件spring

{sql

  "id": "/spring-cloud-data-flow",

  "instances": 1,

  "cpus": 2,

  "mem": 4000,

  "disk": 3000,

  "gpus": 0,

  "backoffSeconds": 1,

  "backoffFactor": 1.15,

  "maxLaunchDelaySeconds": 3600,

  "container": {

    "type": "DOCKER",

    "docker": {

      "image": "springcloud/spring-cloud-dataflow-server-mesos:latest",

      "network": "BRIDGE",

      "portMappings": [

        {

          "containerPort": 9393,

          "hostPort": 0,

          "servicePort": 10000,

          "protocol": "tcp",

          "name": "default"

        }

      ],

      "privileged": false,

      "forcePullImage": false

    }

  },

  "healthChecks": [

    {

      "gracePeriodSeconds": 120,

      "intervalSeconds": 60,

      "timeoutSeconds": 20,

      "maxConsecutiveFailures": 0,

      "portIndex": 0,

      "path": "/management/health",

      "protocol": "HTTP",

      "ignoreHttp1xx": false

    }

  ],

  "upgradeStrategy": {

    "minimumHealthCapacity": 1,

    "maximumOverCapacity": 1

  },

  "unreachableStrategy": {

    "inactiveAfterSeconds": 300,

    "expungeAfterSeconds": 600

  },

  "killSelection": "YOUNGEST_FIRST",

  "requirePorts": true,

  "env": {

    "JDBC_DRIVER": "org.mariadb.jdbc.Driver",

    "MESOS_CHRONOS_URI": "http://172.16.1.77:10105",

    "REDIS_HOST": "172.16.1.61",

    "RABBITMQ_PORT": "6392",

    "MESOS_MARATHON_URI": "http://m1.dcos/service/marathon",

    "REDIS_PORT": "6379",

    "JDBC_PASSWORD": "1234321",

    "JDBC_URL": "jdbc:mysql://172.16.1.145:3306/test",

    "SPRING_APPLICATION_JSON": "{\"spring.cloud.deployer.mesos.marathon.apiEndpoint\":\"${MESOS_MARATHON_URI}\",\"spring.cloud.deployer.mesos.chronos.apiEndpoint\":\"${MESOS_CHRONOS_URI}\",\"spring.datasource.url\":\"${JDBC_URL}\",\"spring.datasource.driverClassName\":\"${JDBC_DRIVER}\",\"spring.datasource.username\":\"${JDBC_USERNAME}\",\"spring.datasource.password\":\"${JDBC_PASSWORD}\",\"spring.datasource.testOnBorrow\":true,\"spring.datasource.validationQuery\":\"SELECT 1\",\"spring.redis.host\":\"${REDIS_HOST}\",\"spring.redis.port\":\"${REDIS_PORT}\",\"spring.cloud.deployer.mesos.marathon.environmentVariables\":\"SPRING_RABBITMQ_HOST=${RABBITMQ_HOST},SPRING_RABBITMQ_PORT=${RABBITMQ_PORT}\",\"spring.cloud.deployer.mesos.dcos.authorizationToken\":\"${DCOS_TOKEN}\",\"spring.cloud.config.enabled\":false,\"spring.freemarker.checkTemplateLocation\":false,\"spring.cloud.deployer.mesos.marathon.memory\":\"3000\",\"spring.dataflow.embedded.database.enabled\":false}",

    "RABBITMQ_HOST": "172.16.1.77",

    "JDBC_USERNAME": "root"

  }

}

這裏對幾點作說明:(1)"image": "springcloud/spring-cloud-dataflow-server-mesos:latest",對應的agent節點docker中的鏡像,直接運行的話,首先它會下載鏡像,因爲你懂得,速度慢成翔,你可

以docker去pull喝着大茶慢慢等,若是你別處有此鏡像也可導入到你的agent節點的docker中。此處共享下導出的鏡像百度雲地址連接:http://pan.baidu.com/s/1hstbj5E  

密碼:56nk,下載後直接docker load < springdataflow.tar,若是你有搭建docker的本地遠程倉庫,也可將鏡像打個tag推送到倉庫,agent的docker直接從倉庫中pull。

(2)配置marathon還有chronos地址,也能夠配置ip地址,瀏覽器能正常訪問便可

"MESOS_MARATHON_URI": "http://m1.dcos/service/marathon",
"MESOS_CHRONOS_URI": "http://m1.dcos/service/chronos",

ip能夠經過service的detail中查看,logs能夠看到其運行日誌

(3)配置mysql

"JDBC_URL": "jdbc:mysql://",
"JDBC_DRIVER": "org.mariadb.jdbc.Driver",
"JDBC_USERNAME": "",
"JDBC_PASSWORD": "",

(4)配置mq

"RABBITMQ_HOST": "",
"RABBITMQ_PORT": "",

(5)配置redis

"REDIS_HOST": "",
"REDIS_PORT": "",

(6)若是DCOS是開啓權限認證,則須要配置

DCOS_TOKEN

獲取token方法以下:

curl https://downloads.dcos.io/binaries/cli/linux/x86-64/dcos-1.9/dcos -o dcos && 
sudo mv dcos /usr/local/bin && 
sudo chmod +x /usr/local/bin/dcos && 
dcos config set core.dcos_url http://XXXXXXXXX && 
dcos

安裝dcos的cli,配置你的master地址,而後dcos auth login 出現一個地址,http://XXXXXX/login?redirect_uri=urn:ietf:wg:oauth:2.0:oob貼到瀏覽器獲取一個token

而後複製到控制檯,登陸成功。而後 dcos config show core.dcos_acs_token 出現的token能夠copy到

DCOS_TOKEN

還能夠配置maven地址,可參閱文檔:http://docs.spring.io/spring-cloud-dataflow/docs/1.2.2.BUILD-SNAPSHOT/reference/htmlsingle/#arch-data-flow-server,可是註冊app的時候,直接配置jar的地址或者maven地址,都會出現異常,看了下源碼,在deploy的時候jar會生成一個臨時鏡像,而後經過fegin去請求marathon的api,可是請求一直會報異常null或者:reason不啦不啦的,因此我是把每個流的jar都經過docker打包鏡像push到遠程倉庫,配置本地docker倉庫地址去拉取鏡像,還有須要注意的地方就是註冊app的名稱還有stream的名稱的時候所有用小寫,否則fegin請求的時候也會報異常。如有大神知道jar或者maven配置的正確方法,請分享與我,謝謝。

8.而後就能夠經過json文件運行了,頁面也是能夠操做的

查看日誌啓動成功

9.訪問springdataflow的web端http://XXXX:AAA/dashboard,亦可經過

spring-cloud-dataflow-shell

來註冊app或者stream,此處再也不贅述。

10.註冊app

11.建立數據流

注意:至少得有一個source一個sink,因爲資源有限,processor暫時不加入(ps:跑應用真的很消耗資源)

12.應用數據流

而後就能夠取services去查看,一個jar應該會啓動一個實例

大功告成哈哈哈哈哈哈哈哈哈

下面附上部分代碼

source:

application

import java.util.Date;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.MessageBuilder;
@EnableBinding(Source.class)
@SpringBootApplication
public class LoggingSourceApplication {
   @Bean
   @InboundChannelAdapter(
     value = Source.OUTPUT, 
     poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1")
   )
   public MessageSource<Long> timeMessageSource() {
      System.out.println(new Date() +"======================logging-source========================== execued");
       return () -> { 
          System.out.println(new Date() + "*****logging-source****** send");
          return MessageBuilder.withPayload(new Date().getTime()).build();
       };
   }

   public static void main(String[] args) {
      SpringApplication.run(LoggingSourceApplication.class, args);
   }
}

properties

spring.rabbitmq.host=172.16.3.183
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

# 本機啓動測試須要如下配置
spring.cloud.stream.default.contentType=application/json
spring.cloud.stream.bindings.output.destination=source-log
# 默認狀況下,Spring Cloud Stream 會在 RabbitMQ 中建立一個臨時的隊列,程序關閉,
# 對應的鏈接關閉的時候,該隊列也會消失。爲此,咱們須要一個持久化的隊列,而且指定一個分組,用於保證應用服務的縮放。
# 只須要在消費者端的 binding 添加配置項 spring.cloud.stream.bindings.[channelName].group = logistic
# 對應的隊列就是持久化
spring.cloud.stream.bindings.output.group=logTest
spring.cloud.stream.bindings.output.binder=rabbitMq1
spring.cloud.stream.binders.rabbitMq1.type=rabbit
spring.cloud.stream.default-binder=rabbitMq1
# rabbitMQ服務器地址
spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.host=172.16.3.183
# rabbitMQ服務器端口
spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.port=5672
spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.username=admin
spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.password=admin
spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.virtual-host=/

pom

<parent>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-parent</artifactId>
   <version>1.4.4.RELEASE</version>
   <relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
   <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
   <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
   <java.version>1.8</java.version>
</properties>

<dependencies>
   <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
   </dependency>

   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
   </dependency>
</dependencies>

<dependencyManagement>
   <dependencies>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-dependencies</artifactId>
         <version>Camden.SR5</version>
         <type>pom</type>
         <scope>import</scope>
      </dependency>
   </dependencies>
</dependencyManagement>

<build>
   <plugins>
      <plugin>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
   </plugins>
</build>

sink:

application

import java.util.Date;
import java.util.Map;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;

@EnableBinding(Sink.class)
@SpringBootApplication
public class LoggingSinkApplication {

   @MessageEndpoint
   public static class LoggingMessageEndpoint {
      @ServiceActivator(inputChannel = Sink.INPUT)
      public void logIncomingMessages(@Payload String msg, @Headers Map<String, Object> headers) {
         System.out.println(new Date() + "***********logging-sink**************"+ msg);
         headers.entrySet().forEach(e -> System.out.println(e.getKey() + '=' + e.getValue()));
      }
   }
   @StreamListener(Sink.INPUT)
   public void loggerSink(String date) {
       System.out.println("logging-sink Received: " + date);
   }
   @Payload
   public static void main(String[] args) {
      SpringApplication.run(LoggingSinkApplication.class, args);
   }
}

properties

spring.rabbitmq.host=172.16.3.183
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

#本地測試須要配置
server.port=8090
spring.cloud.stream.default.contentType=application/json
spring.cloud.stream.bindings.input.destination=source-log
# 默認狀況下,Spring Cloud Stream 會在 RabbitMQ 中建立一個臨時的隊列,程序關閉,
# 對應的鏈接關閉的時候,該隊列也會消失。爲此,咱們須要一個持久化的隊列,而且指定一個分組,用於保證應用服務的縮放。
# 只須要在消費者端的 binding 添加配置項 spring.cloud.stream.bindings.[channelName].group = logistic
# 對應的隊列就是持久化
spring.cloud.stream.bindings.input.group=logTest
spring.cloud.stream.bindings.input.binder=rabbitMq1
spring.cloud.stream.binders.rabbitMq1.type=rabbit
spring.cloud.stream.default-binder=rabbitMq1
# rabbitMQ服務器地址
spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.host=172.16.3.183
# rabbitMQ服務器端口
spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.port=5672
spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.username=admin
spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.password=admin
spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.virtual-host=/

pom

<parent>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-parent</artifactId>
   <version>1.4.4.RELEASE</version>
   <relativePath /> <!-- lookup parent from repository -->
</parent>

<properties>
   <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
   <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
   <java.version>1.8</java.version>
</properties>

<dependencies>
   <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
   </dependency>

   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
   </dependency>
</dependencies>

<dependencyManagement>
   <dependencies>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-dependencies</artifactId>
         <version>Camden.SR5</version>
         <type>pom</type>
         <scope>import</scope>
      </dependency>
   </dependencies>
</dependencyManagement>

<build>
   <plugins>
      <plugin>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
   </plugins>
</build>
相關文章
相關標籤/搜索