快速入門流處理框架Flink --實時報表場景的應用

  隨着業務的發展,數據量劇增,咱們一些簡單報表大盤類的任務,就不能簡單的依賴於RDBMS了,而是依賴於數倉之類的大數據平臺。java

  數倉有着巨量數據的存儲能力,可是通常都存在必定數據延遲,因此要想徹底依賴數數倉來解決實時報表問題,是困難的。apache

  其實,所謂的實時報表,往簡單了說就是: 對如今的一些數據進行加減乘除聚合後,獲得的一串與時間相關的數字。json

  因此,這類問題的關鍵點應該在於這個實時數據怎麼來,以及怎麼處理這些實時數據。bootstrap

 

  通常地,作這類報表類工做,最基本的原則就是: 業務無侵入性,而後又要作到實時。api

 

  因此,本能性地想到,使用消息中間件來解耦這個數據就行了,Kafka 多是個比較好的選擇。固然,這個前提是業務技術都是使用這一套東西的,若是沒有,則可能想另外的招了,好比: binlog 解析?緩存

 

  有了數據來源以後,咱們就能夠作相應的報表數據了。session

  前面既然提到,報表基本上就是進行簡單的加減乘除,那就是很簡單了唄。架構

  也就是,本身起幾個kafka消費者,而後消費數據,運算後,獲得結果,而後存入DB中,而已。框架

  因此,徹底能夠去作這麼一件事。可是你知道,凡事不會那麼簡單,你要處理多少異常:時間邊界問題,宕機問題,業務新增問題。。。運維

  

  很少說了,回到本文正題:像這類場景,其實就是簡單的流處理流計算而已,早已相應的開發模塊被提煉出來,我們只要學會使用就行了。

  Flink是其中作得比較好的一個框架,聽說也是將來的一個趨勢。既然如此,何不學他一學。

 

  Flink,流計算,感受挺難啊!

  其實否則,就像前面咱們提到解決方案同樣,入門就是這麼簡單。

 

  好,接下來咱們經過一個 flink-demo,試着入門一下!


解釋:
  1. 如下demo的應用場景是: 統計1分鐘類的渠道下單數量;
  2. 數據源源爲kakfa;
  3. 數據輸出存儲爲kafka和控制檯;

 

真實的代碼以下:

package com.my.flink.kafka.consumer;

import com.my.flink.config.KafkaConstantProperties;
import com.my.flink.kafka.serializer.KafkaTuple4StringSchema;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
 * 用java 寫消費者
 *
 */
public class ConsumeKafkaByJava {

    private static final String CONSUMER_GROUP_ID = "test.flink.consumer1";

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.enableCheckpointing(1000);

        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", KafkaConstantProperties.KAFKA_BROKER);
        kafkaProps.setProperty("group.id", CONSUMER_GROUP_ID);

        FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>(
                KafkaConstantProperties.FLINK_COMPUTE_TOPIC_IN1,
                new SimpleStringSchema(),
                kafkaProps);

        DataStream<String> dataStream = env.addSource(myConsumer);
        // 四元組數據爲: 訂單號,統計維度標識,訂單數,訂單金額
        DataStream<Tuple4<String, String, Integer, Double>> counts = dataStream
                .flatMap(new TestBizDataLineSplitter())
                .keyBy(1)
                .timeWindow(Time.of(30, TimeUnit.SECONDS))
                .reduce((value1, value2) -> {
                    return new Tuple4<>(value1.f0, value1.f1, value1.f2 + value2.f2, value1.f3 + value2.f3);
                });
        
        // 暫時輸入與輸出相同
        counts.addSink(new FlinkKafkaProducer010<>(
                KafkaConstantProperties.FLINK_DATA_SINK_TOPIC_OUT1,
                new KafkaTuple4StringSchema(),
                kafkaProps)
        );
        // 統計值多向輸出
        dataStream.print();
        counts.print();
        env.execute("Test Count from Kafka data");
    }

}

  如上,就是一個 flink 的統計代碼了,簡單不?確定簡單!

  不過,單這個東西確定是跑不起來的,咱們還須要框架基礎依賴附加模板工做,不過這些真的只是 copy 而已哦。

1. pom.xml 依賴:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.my.flink</groupId>
  <artifactId>flink-kafka-test</artifactId>
  <version>1.0-SNAPSHOT</version>
  <inceptionYear>2008</inceptionYear>
  <properties>
    <scala.version>2.11.6</scala.version>
  </properties>

  <repositories>
    <repository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </repository>
  </repositories>

  <pluginRepositories>
    <pluginRepository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </pluginRepository>
  </pluginRepositories>

  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.4</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.specs</groupId>
      <artifactId>specs</artifactId>
      <version>1.2.5</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-core</artifactId>
      <version>1.3.2</version>
      <scope>compile</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10_2.11 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
      <version>1.3.2</version>
      <scope>compile</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.10.2.0</version>
      <scope>compile</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.3.0</version>
      <scope>compile</scope>
    </dependency>

    <!-- flink-streaming的jar包,2.11爲scala版本號 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>1.3.2</version>
      <scope>compile</scope>
    </dependency>

<!--    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_2.11</artifactId>
      <version>1.3.2</version>
      <scope>compile</scope>
    </dependency>-->

    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.59</version>
    </dependency>

  </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <configuration>
          <archive>
            <manifest>
              <addClasspath>true</addClasspath>
              <useUniqueVersions>false</useUniqueVersions>
              <classpathPrefix>lib/</classpathPrefix>
              <mainClass>com.my.flink.kafka.consumer.ConsumeKafkaByJava</mainClass>
            </manifest>
          </archive>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
          <args>
            <arg>-target:jvm-1.5</arg>
          </args>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-eclipse-plugin</artifactId>
        <configuration>
          <downloadSources>true</downloadSources>
          <buildcommands>
            <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
          </buildcommands>
          <additionalProjectnatures>
            <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
          </additionalProjectnatures>
          <classpathContainers>
            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
            <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
          </classpathContainers>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>2.4.1</version>
        <configuration>
          <!-- get all project dependencies -->
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
          <!-- MainClass in mainfest make a executable jar -->
          <archive>
            <manifest>
              <mainClass>com.my.flink.kafka.consumer.ConsumeKafkaByJava</mainClass>
            </manifest>
          </archive>

        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <!-- bind to the packaging phase -->
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

    </plugins>
  </build>
  <reporting>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
        </configuration>
      </plugin>
    </plugins>
  </reporting>
</project>

 

2. 另外再加幾個輔助類:

// 1. 
package com.my.flink.config;

/**
 * kafka 相關常量定義
 *
 */
public class KafkaConstantProperties {

    /**
     * kafka broker 地址
     */
    public static final String KAFKA_BROKER = "127.0.0.1:9092";

    /**
     * zk 地址,低版本 kafka 使用,高版本已丟棄
     */
    public static final String ZOOKEEPER_HOST = "master:2181,slave1:2181,slave2:2181";

    /**
     * flink 計算使用topic 1
     */
    public static final String FLINK_COMPUTE_TOPIC_IN1 = "mastertest";

    /**
     * flink消費結果,輸出到kafka, topic 數據
     */
    public static final String FLINK_DATA_SINK_TOPIC_OUT1 = "flink_compute_result_out1";

}

// 2. 
package com.my.flink.kafka.formatter;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.util.Collector;

import java.math.BigDecimal;

/**
 * 原始消息參數處理類
 *
 */

public final class TestBizDataLineSplitter implements FlatMapFunction<String,
                                    Tuple4<String, String, Integer, Double>> {

    private static final long serialVersionUID = 1L;

    /**
     * 進行 map 階段展開操做
     *
     * @param value 原始值: bizData: 2019-08-01 17:39:32,
     *              P0001,channel1,201908010116100001,100
     *
     *              dateTimeMin,
     *              productCode, channel,
     *              orderId, money
     *              [, totalCount, totalMoney]
     *
     * @param out 輸出值, 用四元組保存
     *
     */
    @Override
    public void flatMap(String value, Collector<Tuple4<String, String,
                                                        Integer, Double>> out) {
        String[] tokens = value.split(",");
        String time = tokens[0].substring(0, 16);
        String uniqDimKey = time + "," + tokens[1] + "," + tokens[2];
        // totalCount: 1, totalPremium: premium
        // todo: 寫成 pojo

        out.collect(new Tuple4<>(tokens[3], uniqDimKey, 1, Double.valueOf(tokens[4])));
    }

}

// 3. 
package com.my.flink.kafka.serializer;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
 * kafka 自定義序列化器
 */
public class KafkaTuple4StringSchema  implements DeserializationSchema<Tuple4<String, String, Integer, Double>>, SerializationSchema<Tuple4<String, String, Integer, Double>> {

    private static final long serialVersionUID = -5784600791822349178L;

    // ------------------------------------------------------------------------
    //  Kafka Serialization
    // ------------------------------------------------------------------------

    /** The charset to use to convert between strings and bytes.
     * The field is transient because we serialize a different delegate object instead */
    private transient Charset charset;

    private String separator = ",";

    /**
     * Creates a new SimpleStringSchema that uses "UTF-8" as the encoding.
     */
    public KafkaTuple4StringSchema() {
        this(StandardCharsets.UTF_8);
    }

    /**
     * Creates a new SimpleStringSchema that uses the given charset to convert between strings and bytes.
     *
     * @param charset The charset to use to convert between strings and bytes.
     */
    public KafkaTuple4StringSchema(Charset charset) {
        this.charset = checkNotNull(charset);
    }

    @Override
    public Tuple4<String, String, Integer, Double> deserialize(byte[] message) {
        String rawData = new String(message, StandardCharsets.UTF_8);
        String[] dataArr = rawData.split(separator);
        return new Tuple4<>(dataArr[0], dataArr[1],
                            Integer.valueOf(dataArr[2]), Double.valueOf(dataArr[3]));
    }

    @Override
    public boolean isEndOfStream(Tuple4<String, String, Integer, Double> nextElement) {
        return false;
    }

    @Override
    public byte[] serialize(Tuple4<String, String, Integer, Double> element) {
        return (element.f0 + separator +
                element.f1 + separator +
                element.f2 + separator +
                element.f3).getBytes();
    }

    @Override
    public TypeInformation<Tuple4<String, String, Integer, Double>> getProducedType() {
        return null;
    }

}

 

  這樣,加上上面的 demo, 其實就能夠跑起來了。

 

下面咱們從demo裏看看 flink 的開發套路:

        // 1. 獲取運行環境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 2. 配置接入數據源
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", KafkaConstantProperties.KAFKA_BROKER);
        kafkaProps.setProperty("group.id", CONSUMER_GROUP_ID);
        FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>(
                KafkaConstantProperties.FLINK_COMPUTE_TOPIC_IN1,
                new SimpleStringSchema(),
                kafkaProps);
        DataStream<String> dataStream = env.addSource(myConsumer);
        
        // 3. 處理數據
        DataStream<Tuple4<String, String, Integer, Double>> counts = dataStream
                .flatMap(new ProposalBizDataLineSplitter())
                .keyBy(1)
                .timeWindow(Time.of(30, TimeUnit.SECONDS))
                .reduce((value1, value2) -> {
                    return new Tuple4<>(value1.f0, value1.f1, value1.f2 + value2.f2, value1.f3 + value2.f3);
                });

        // 4. 輸出處理結果
        counts.addSink(new FlinkKafkaProducer010<>(
                KafkaConstantProperties.FLINK_DATA_SINK_TOPIC_OUT1,
                new KafkaTuple4StringSchema(),
                kafkaProps)
        );
        // 統計值多向輸出
        dataStream.print();
        counts.print();
        
        // 5. 正式提交運行
        env.execute("Test Count from Kafka data");
    

  其實就5個步驟,並且本身稍微想一想,除了第5個步驟外,這些也都是必須的東西,再無多餘了。
    1. 獲取運行環境
    2. 配置接入數據源
    3. 處理數據
    4. 輸出處理結果
    5. 正式提交運行

  因此,你以爲複雜嗎?除了那些模板?(模板歷來都是複製)

  因此,咱們能夠隨意使用這些框架來幫咱們處理事務嗎?

  你還得看下公司的環境:好比 資金支持、運維支持、框架支持?

  總之,入門很簡單,但不要覺得真簡單!(保持敬畏之心)

接下來,咱們來看一下關於Flink的一些架構問題:

和大多數的大數據處理框架同樣,Flink也是一種 master-slave 架構;如圖:

 

  簡單點說就是,flink 是一套自管理的運行環境,你只需按照flink範式編寫代碼,提交到集羣運行便可。

Flink 抽象層級:

 

Flink 的重要特性:

  支持高吞吐、低延遲、高性能的流處理
  支持帶有事件時間的窗口操做
  支持有狀態計算的Exactly-once語義
  支持高度靈活的窗口操做,支持基於time、count、session,以及data-driven的窗口操做
  支持具備Backpressure功能的持續流模型
  支持基於輕量級分佈式快照實現的容錯
  支持批流合一處理
  Flink在JVM內部實現了本身的內存管理
  支持迭代計算
  支持程序自動優化:避免特定狀況下Shuffle、排序等昂貴操做,中間結果有必要進行緩存
  支持Table-API的操做
  支持SQL式友好開發

 

重要概念解釋:


  Watermark: 是一種衡量Event Time進展的機制,它是數據自己的一個隱藏屬性;包含: eventTime / IngestionTime / processTime
  DataStream 流處理, DataSet 批處理;
  Window: TumblingWindow / SlidingWindow / SessionWindow / CountWindow
  Map: 一對一映射數據流,flatMap: 一對N數據流映射;
  Filter: 過濾返回false的數據,keyBy: 將相同key的DataStream分配到同一分區以便進行聚合計算, reduce: 將數據合併爲一個新的數據;
  Sink: 輸出,RichSinkFunction 實現自定義輸出;基於文件的:如 writeAsText()、writeAsCsv()、writeUsingOutputFormat、FileOutputFormat。 寫到socket: writeToSocket。 用於顯示的:print、printToErr。 自定義Sink: addSink。connectors 用於給接入第三方數據提供接口,如今支持的connectors 包括:Apache Kafka/Apache Cassandra/Elasticsearch/Hadoop FileSystem/RabbitMQ/Apache NiFi
  SnapShot:因爲 Flink 的 checkpoint 是經過分佈式快照實現的,接下來咱們將 snapshot 和 checkpoint 這兩個詞交替使用。因爲 Flink checkpoint 是經過分佈式 snapshot 實現的,snapshot 和 checkpoint 能夠互換使用。
  Backpressure: 反壓一般產生於這樣的場景:短時負載高峯致使系統接收數據的速率遠高於它處理數據的速率。許多平常問題都會致使反壓,例如,垃圾回收停頓可能會致使流入的數據快速堆積,或者遇到大促或秒殺活動致使流量陡增。反壓若是不能獲得正確的處理,可能會致使資源耗盡甚至系統崩潰。

 

嘮叨: 方向。

相關文章
相關標籤/搜索