使用Restful方式提交Flink任務詳解

Flink提供了豐富的客戶端操做來提交任務,本文在Restful方式上提供擴展,其他四種方式可觀看flink-china系列教程-客戶端操做的具體分享spring

Flink從1.7版本開始支持RESTClient提交任務,RESTClient可能不少人不熟悉。使用RESTClient提交任務,換句話說就是Flink可支持API調用方式,提交你編寫好的Flink代碼,提交到Flink集羣運行。本文演示讀取kafka數據,使用Springcloud微服務框架調用啓動,下面是具體實現步驟。apache

編寫Flink程序

新建Springboot父工程,新建flink-service模塊。flink-service新建如下代碼,而後打包項目。打包項目的目的是把flink任務執行須要用到jar包,在使用RESTClient提交任務是,一塊兒提交到集羣運行。bootstrap

public class ReadFromsinkKafka {
  public static void main(String[] args) throws Exception {
    // create execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "192.168.11.12:9092");
    properties.setProperty("group.id", "flinksink");
    DataStream<String> stream = env.addSource(new FlinkKafkaConsumer("flinktest", 
        new SimpleStringSchema(), properties));
    stream.map(new MapFunction<String, String>() {
      private static final long serialVersionUID = -6867736771747690202L;
      @Override
      public String map(String value) throws Exception {
        System.out.println(value);
        return value;
      }
    }).print();
    env.execute();
  }
}
複製代碼

打包代碼分享以下,在pom中刪除springcloud原生打包方式spring-boot-maven-plugin,改成如下代碼。按這個方式打包完成後,會獲得flink-service-1.0-SNAPSHOT-kafka.jar和flink-service-1.0-SNAPSHOT.jar兩個jar包,flink-service-1.0-SNAPSHOT-kafka.jar是你所編寫Flink代碼,flink-service-1.0-SNAPSHOT-kafka.jar是執行你的Flink程序須要用到的kafka base和client等jar包。bash

<build>
  <plugins>
  <!-- get default data from flink-examples-batch package -->
  <plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-dependency-plugin</artifactId>
  <version>2.9</version>
  <executions>
    <execution>
    <id>unpack</id>
    <phase>prepare-package</phase>
    <goals>
    <goal>unpack</goal>
    </goals>
    <configuration>
    <artifactItems>
    <!-- for kafka connector-->
    <artifactItem>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <type>jar</type>
    <overWrite>false</overWrite>
    <outputDirectory>${project.build.directory}/classes</outputDirectory>
    <includes>org/apache/flink/**</includes>
    </artifactItem>
    <!-- for kafka base -->
    <artifactItem>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-base_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <type>jar</type>
    <overWrite>false</overWrite>
    <outputDirectory>${project.build.directory}/classes</outputDirectory>
    <includes>org/apache/flink/**</includes>
    </artifactItem>
    <!-- for kafka client -->
    <artifactItem>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.1</version>
    <type>jar</type>
    <overWrite>false</overWrite>
    <outputDirectory>${project.build.directory}/classes</outputDirectory>
    <includes>org/apache/**</includes>
    </artifactItem>
    </artifactItems>
    </configuration>
    </execution>
  </executions>
  </plugin>
  <plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-jar-plugin</artifactId>
  <executions>
    <!-- kafka -->
    <execution>
    <id>kafka</id>
    <phase>package</phase>
    <goals>
    <goal>jar</goal>
    </goals>
    <configuration>
    <classifier>kafka</classifier>
    <archive>
    <manifestEntries>
    <program-class>com.flink.kafka.ReadFromsinkKafka</program-class>
    </manifestEntries>
    </archive>
    <includes>
    <include>**/com/flink/kafka/ReadFromsinkKafka.class</include>
    </includes>
    </configuration>
    </execution>
  </executions>
  </plugin>
  </plugins>
</build>
複製代碼

Restful調用功能實現

新建controller,加入以下代碼。本文實現RESTClient提交Flink任務的關鍵在於,經過createRemoteEnvironment 方法鏈接到遠程Flink環境,拿到Flink執行環境環境後,執行env.execute()就能夠提交任務至遠程服務器環境執行。服務器

@RestController
@RequestMapping("flink")
public class FlinkController   {
  @RequestMapping(value="/test",method= RequestMethod.POST)
  public void test() throws Exception {
    String[] jars = {"flink-service/target/flink-service-1.0-SNAPSHOT-kafka.jar",
        "flink-service/target/flink-service-1.0-SNAPSHOT.jar"};
    StreamExecutionEnvironment env = 
        StreamExecutionEnvironment.createRemoteEnvironment("192.168.11.11",8081,2,jars);
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "192.168.11.12:9092");
    properties.setProperty("group.id", "flinksink");
    DataStream<String> stream = env.addSource(new FlinkKafkaConsumer("flinktest",
        new SimpleStringSchema(), properties));
    stream.map(new MapFunction<String, String>() {
      private static final long serialVersionUID = -6867736771747690202L;
      @Override
      public String map(String value) throws Exception
      {
        System.out.println(value);
        return value;
      }
    }).print();
    env.execute();
   }
}
複製代碼

提交測試

經本人驗證,此方法提交到Flink standalone集羣和yarn集羣均可以運行。向kafka中寫入數據,能夠在Flink日誌中查看到數據。因排版問題,文章配圖可在公衆號的同名文章中查看到。app

最新Flink資訊可關注個同名公衆號Flink實戰應用指南框架

推薦閱讀 Flink與TensorFlow整合詳解maven

相關文章
相關標籤/搜索