Have A Flink Try

[TOC]java

Have A Flink Try

TIPS
1.在使用map等方法時,傳入的方法可以使用匿名實現的方式,經過open,close,getRuntimeContext等方法實現更復雜的需求(好比自定義計數器、累加器、廣播變量等)

1.DataSet處理

package cn.lang.flink.demo

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}

object FlinkWordCount {

  def main(args: Array[String]): Unit = {
    // env
    val env = ExecutionEnvironment.getExecutionEnvironment

    val input = "/Users/lang/Ideaproject/Github/LangPersonalProject" +
      "/flink/src/main/resources/cn/lang/flink/demo/wordcount.txt"

    /** source */
    val raw_data: DataSet[String] = env.readTextFile(input) //從本地文件中讀取數據,每個iterm就是一行數據

    /** Exception: ------------------------------------------------------ **
     * flink could not find implicit value for evidence parameter of type **
     * 這是由於map方法中有[R: TypeInformation]須要定義,推薦直接導入的方式解決  **
     */
    import org.apache.flink.api.scala._
    /** transformation */
    val result: AggregateDataSet[(String, Int)] = raw_data
      .flatMap(_.split(" ")) // 對每行數據按照空格進行分割
      .map((_, 1)) // 對每個單詞進行計數
      .groupBy(0) // 按照索引位在0號的單詞進行分組
      .sum(1) // 按照索引位在1號的單詞數量進行加和
    
    /** sink */
    result.print()
  }
}

2.監聽端口流處理

package cn.lang.flink.demo

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object FlinkStreamWordCount {
  def main(args: Array[String]): Unit = {
    // 定義監聽的地址和端口
    val host = "localhost"
    val port = 9999
    // env
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    /** source */
    // 監聽端口,可在命令行執行 {nc -lk 9999} 輸入數據
    val raw_stream: DataStream[String] = env.socketTextStream(host, port)

    // avoid could not find the implicit
    import org.apache.flink.api.scala._
    /** transformation */
    val result: DataStream[(String, Int)] = raw_stream
      .flatMap(_.split(" ")) // 對每行數據按照空格進行分割
      .map((_, 1)) // 對每個單詞進行計數
      .keyBy(0) // 按照索引位在0號的單詞進行分組,這裏注意和DataSet API的區別
      .sum(1) // 按照索引位在1號的單詞數量進行加和

    /** sink */
    result.print()
    // 連續輸入4個flink單詞以後的打印狀況
    //    13> (flink,1)
    //    13> (flink,2)
    //    13> (flink,3)
    //    13> (flink,4) 這裏的結果會一直累加,注意和後面窗口函數的區別

    // execute the stream work
    env.execute("FlinkStreamWordCount")
  }
}

3.Kafka流處理

package cn.lang.flink.demo

import java.util.Properties

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.util.serialization.SimpleStringSchema

object KafkaFlinkStreamWordCount {
  def main(args: Array[String]): Unit = {
    // environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // property 生產中能夠讀args也能夠讀配置文件
    val ZOOKEEPER_HOST = "localhost:2181"
    val KAFKA_BROKER = "localhost:9092"
    val TRANSACTION_GROUP = "transaction"
    val IN_KAFKA_TOPIC = "first"

    // set kafka properties
    val kafkaProps = new Properties()
    kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST)
    kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKER)
    kafkaProps.setProperty("group.id", TRANSACTION_GROUP)
    kafkaProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaProps.setProperty("auto.offset.reset", "latest")

    // source
    // 本地執行 啓動zk和kafka,以及console的生產者
    // /Users/langjiang/Software/zookeeper-3.4.10/bin/zkServer.sh start
    // /Users/langjiang/Software/kafka_2.11-2.1.0/bin/kafka-server-start.sh -daemon /Users/langjiang/Software/kafka_2.11-2.1.0/config/server.properties
    // /Users/langjiang/Software/kafka_2.11-2.1.0/bin/kafka-server-stop.sh
    val transaction: DataStream[String] = env.addSource(
      new FlinkKafkaConsumer011[String](IN_KAFKA_TOPIC, new SimpleStringSchema(), kafkaProps))

    // transform
    val result= transaction
      .flatMap(_.split(" ")) // 對每行數據按照空格進行分割
      .map((_, 1)) // 對每個單詞進行計數
      .keyBy(0) // 按照索引位在0號的單詞進行分組,這裏注意和DataSet API的區別
      .sum(1) // 按照索引位在1號的單詞數量進行加和

    // sink1
    // 將最終的結果寫回到Kafka
    //    val OUT_KAFKA_TOPIC = "second"
    //    result.addSink(new FlinkKafkaProducer011[(String)](KAFKA_BROKER, OUT_KAFKA_TOPIC, new SimpleStringSchema()))

    // sink2
    result.print()
    //    13> (flink,1) // 前面的數字表明的是計算的core
    //    13> (flink,2)
    //    13> (flink,3)
    //    13> (flink,4)
    //    13> (flink,5)
    //    1> (spark,1)
    env.execute("KafkaFlinkStreamWordCount")
  }
}

4.Kafka流窗口處理

package cn.lang.flink.demo

import java.util.Properties
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
import org.apache.flink.streaming.util.serialization.SimpleStringSchema

object KafkaFlinkStreamTimeWindow {
  def main(args: Array[String]): Unit = {
    // environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // property
    val ZOOKEEPER_HOST = "localhost:2181"
    val KAFKA_BROKER = "localhost:9092"
    val TRANSACTION_GROUP = "transaction"
    val IN_KAFKA_TOPIC = "first"
    // set
    val kafkaProps = new Properties()
    kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST)
    kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKER)
    kafkaProps.setProperty("group.id", TRANSACTION_GROUP)
    kafkaProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaProps.setProperty("auto.offset.reset", "latest")
    // source
    val transaction: DataStream[String] = env.addSource(
      new FlinkKafkaConsumer011[String](IN_KAFKA_TOPIC, new SimpleStringSchema(), kafkaProps))

    /** transform1 */
    // 每10s內上報正常和異常日誌的條數
    //    val result = transaction
    //      .map(iterm => (iterm.contains("error"), 1)) // 若是包含某一條帶有error的日誌,那麼將添加統計
    //      .keyBy(0) // 按照索引位爲0的是否正常日誌進行分組,分別統計正常與否的條數
    //      .timeWindow(Time.seconds(10)) // 設定窗口10s,這裏只有一個參數,啓動的是滾動窗口
    //      // 啓動的是return window(TumblingProcessingTimeWindows.of(size))
    //      .reduce((history, add) => (history._1, history._2 + add._2)) // 能夠理解爲累加器的update

    // 每過10s打印一次
    //    1> (false,2) : 1588391750005
    //    10> (true,1) : 1588391760003
    //    10> (true,4) : 1588391770002
    //    10> (true,1) : 1588391780000

    /** transform2 */
    // 最近15s內上報日誌異常條數
    val result: DataStream[(String, Int)] = transaction
      .map(iterm => (iterm, 1)) // 若是包含某一條帶有error的日誌,那麼將添加統計
      .filter(_._1.contains("error")) // 過濾正常的日誌
      .keyBy(0) // 按照索引位爲0號的日誌自己以及是否正常進行分組
      .timeWindow(Time.seconds(15), Time.seconds(5)) // 窗口設置爲15s,滑動步長設置爲5s
      .reduce((history, add) => (history._1, history._2 + add._2)) // 核心是理解reduce方法須要傳入的是什麼

    // 每過5s打印一次
    //    14> (error,1) : 1588392335002
    //    14> (error,3) : 1588392340001
    //    14> (error,3) : 1588392345004
    //    14> (error,2) : 1588392350003

    // sink
    result.map(iterm => iterm.toString() + " : " + System.currentTimeMillis()).print()
    env.execute("KafkaFlinkStreamTimeWindow")
  }
}

5.Kafka流Event time處理

package cn.lang.flink.demo

import java.text.SimpleDateFormat
import java.util.Properties

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.util.serialization.SimpleStringSchema

import scala.collection.mutable

/**
 * watermark
 * watermark = maxEventTime - delayTimeInternal(different within every record)
 * invoke window calculate while watermark > timeWindowDeadline
 **/
object KafkaFlinkStreamEventTime {

  // 返回的是13位時間戳,精確度到毫秒
  def dataToTimestamp(date: String): Long = {
    val sdf = new SimpleDateFormat("yyyy年MM月dd日HH:mm:ss")
    sdf.parse(date).getTime
  }

  def main(args: Array[String]): Unit = {
    // environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // attention site imported from別導錯包,這裏是啓用event time的機制
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // property
    val ZOOKEEPER_HOST = "localhost:2181"
    val KAFKA_BROKER = "localhost:9092"
    val TRANSACTION_GROUP = "transaction"
    val IN_KAFKA_TOPIC = "first"
    // set
    val kafkaProps = new Properties()
    kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST)
    kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKER)
    kafkaProps.setProperty("group.id", TRANSACTION_GROUP)
    kafkaProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaProps.setProperty("auto.offset.reset", "latest")

    // source,這裏輸入的每一條數據格式:uid,2020年05月02日17:26:16
    val transaction = env.addSource(
      new FlinkKafkaConsumer011[String](IN_KAFKA_TOPIC, new SimpleStringSchema(), kafkaProps))
      .setParallelism(1)

    /** transform */
    /* 轉換需求是將同一個uid窗口內上報的全部數據 */
    val result: DataStream[mutable.HashSet[Long]] = transaction
      .map(iterm => Event(iterm.split(",")(0), dataToTimestamp(iterm.split(",")(1)))) // 傳入的每一條數據都是13位時間戳
      .assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessTimestampExtractor[Event](Time.milliseconds(2000)) {
          override def extractTimestamp(element: Event): Long = element.timestamp
        })
      .keyBy(_.uid)
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      .fold(new mutable.HashSet[Long]()) {
        case (set, iterm) => set += iterm.timestamp
      }

    // sink
    result.map(iterm => iterm.mkString(";") + " : " + System.currentTimeMillis()).print()

    env.execute("KafkaFlinkStreamEventTime")
  }
}

// 這是業務中數據按照對應格式封裝的樣例類,其中包含有event time
case class Event(uid: String, timestamp: Long) // 這裏上報時間戳默認是10位

// 水印機制的類,也能夠使用BoundedOutOfOrdernessTimestampExtractor匿名實現類
class MyBoundedOutOfOrdernessTimestampExtractor(delayInterval: Long) extends AssignerWithPeriodicWatermarks[Event] {
  // 上一個發送的水印值(也就是上一個觸發窗口時的水印值)
  var lastEmittedWatermark: Long = 0L
  // 當前進入全部數據中最大的event time和上一次發送水印值的差值
  var maxOutOfOrderness: Long = delayInterval
  // 當前進入全部數據中最大的event time
  var currentMaxTimestamp: Long = lastEmittedWatermark + this.maxOutOfOrderness

  // 獲取當前的水印
  override def getCurrentWatermark: Watermark = {
    val tmp = this.currentMaxTimestamp - this.maxOutOfOrderness
    if (tmp >= lastEmittedWatermark) {
      lastEmittedWatermark = tmp
    }
    new Watermark(lastEmittedWatermark)
  }

  // 從數據樣例類中抽取時間戳
  override def extractTimestamp(element: Event, previousElementTimestamp: Long): Long = {
    val tmp = element.timestamp
    if (tmp > currentMaxTimestamp) {
      currentMaxTimestamp = tmp
    }
    tmp
  }
}
相關文章
相關標籤/搜索