[TOC]java
TIPS
1.在使用map等方法時,傳入的方法可以使用匿名實現的方式,經過open,close,getRuntimeContext等方法實現更復雜的需求(好比自定義計數器、累加器、廣播變量等)
package cn.lang.flink.demo import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} object FlinkWordCount { def main(args: Array[String]): Unit = { // env val env = ExecutionEnvironment.getExecutionEnvironment val input = "/Users/lang/Ideaproject/Github/LangPersonalProject" + "/flink/src/main/resources/cn/lang/flink/demo/wordcount.txt" /** source */ val raw_data: DataSet[String] = env.readTextFile(input) //從本地文件中讀取數據,每個iterm就是一行數據 /** Exception: ------------------------------------------------------ ** * flink could not find implicit value for evidence parameter of type ** * 這是由於map方法中有[R: TypeInformation]須要定義,推薦直接導入的方式解決 ** */ import org.apache.flink.api.scala._ /** transformation */ val result: AggregateDataSet[(String, Int)] = raw_data .flatMap(_.split(" ")) // 對每行數據按照空格進行分割 .map((_, 1)) // 對每個單詞進行計數 .groupBy(0) // 按照索引位在0號的單詞進行分組 .sum(1) // 按照索引位在1號的單詞數量進行加和 /** sink */ result.print() } }
package cn.lang.flink.demo import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object FlinkStreamWordCount { def main(args: Array[String]): Unit = { // 定義監聽的地址和端口 val host = "localhost" val port = 9999 // env val env = StreamExecutionEnvironment.getExecutionEnvironment /** source */ // 監聽端口,可在命令行執行 {nc -lk 9999} 輸入數據 val raw_stream: DataStream[String] = env.socketTextStream(host, port) // avoid could not find the implicit import org.apache.flink.api.scala._ /** transformation */ val result: DataStream[(String, Int)] = raw_stream .flatMap(_.split(" ")) // 對每行數據按照空格進行分割 .map((_, 1)) // 對每個單詞進行計數 .keyBy(0) // 按照索引位在0號的單詞進行分組,這裏注意和DataSet API的區別 .sum(1) // 按照索引位在1號的單詞數量進行加和 /** sink */ result.print() // 連續輸入4個flink單詞以後的打印狀況 // 13> (flink,1) // 13> (flink,2) // 13> (flink,3) // 13> (flink,4) 這裏的結果會一直累加,注意和後面窗口函數的區別 // execute the stream work env.execute("FlinkStreamWordCount") } }
package cn.lang.flink.demo import java.util.Properties import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 import org.apache.flink.streaming.util.serialization.SimpleStringSchema object KafkaFlinkStreamWordCount { def main(args: Array[String]): Unit = { // environment val env = StreamExecutionEnvironment.getExecutionEnvironment // property 生產中能夠讀args也能夠讀配置文件 val ZOOKEEPER_HOST = "localhost:2181" val KAFKA_BROKER = "localhost:9092" val TRANSACTION_GROUP = "transaction" val IN_KAFKA_TOPIC = "first" // set kafka properties val kafkaProps = new Properties() kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST) kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKER) kafkaProps.setProperty("group.id", TRANSACTION_GROUP) kafkaProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") kafkaProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") kafkaProps.setProperty("auto.offset.reset", "latest") // source // 本地執行 啓動zk和kafka,以及console的生產者 // /Users/langjiang/Software/zookeeper-3.4.10/bin/zkServer.sh start // /Users/langjiang/Software/kafka_2.11-2.1.0/bin/kafka-server-start.sh -daemon /Users/langjiang/Software/kafka_2.11-2.1.0/config/server.properties // /Users/langjiang/Software/kafka_2.11-2.1.0/bin/kafka-server-stop.sh val transaction: DataStream[String] = env.addSource( new FlinkKafkaConsumer011[String](IN_KAFKA_TOPIC, new SimpleStringSchema(), kafkaProps)) // transform val result= transaction .flatMap(_.split(" ")) // 對每行數據按照空格進行分割 .map((_, 1)) // 對每個單詞進行計數 .keyBy(0) // 按照索引位在0號的單詞進行分組,這裏注意和DataSet API的區別 .sum(1) // 按照索引位在1號的單詞數量進行加和 // sink1 // 將最終的結果寫回到Kafka // val OUT_KAFKA_TOPIC = "second" // result.addSink(new FlinkKafkaProducer011[(String)](KAFKA_BROKER, OUT_KAFKA_TOPIC, new SimpleStringSchema())) // sink2 result.print() // 13> (flink,1) // 前面的數字表明的是計算的core // 13> (flink,2) // 13> (flink,3) // 13> (flink,4) // 13> (flink,5) // 1> (spark,1) env.execute("KafkaFlinkStreamWordCount") } }
package cn.lang.flink.demo import java.util.Properties import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011} import org.apache.flink.streaming.util.serialization.SimpleStringSchema object KafkaFlinkStreamTimeWindow { def main(args: Array[String]): Unit = { // environment val env = StreamExecutionEnvironment.getExecutionEnvironment // property val ZOOKEEPER_HOST = "localhost:2181" val KAFKA_BROKER = "localhost:9092" val TRANSACTION_GROUP = "transaction" val IN_KAFKA_TOPIC = "first" // set val kafkaProps = new Properties() kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST) kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKER) kafkaProps.setProperty("group.id", TRANSACTION_GROUP) kafkaProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") kafkaProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") kafkaProps.setProperty("auto.offset.reset", "latest") // source val transaction: DataStream[String] = env.addSource( new FlinkKafkaConsumer011[String](IN_KAFKA_TOPIC, new SimpleStringSchema(), kafkaProps)) /** transform1 */ // 每10s內上報正常和異常日誌的條數 // val result = transaction // .map(iterm => (iterm.contains("error"), 1)) // 若是包含某一條帶有error的日誌,那麼將添加統計 // .keyBy(0) // 按照索引位爲0的是否正常日誌進行分組,分別統計正常與否的條數 // .timeWindow(Time.seconds(10)) // 設定窗口10s,這裏只有一個參數,啓動的是滾動窗口 // // 啓動的是return window(TumblingProcessingTimeWindows.of(size)) // .reduce((history, add) => (history._1, history._2 + add._2)) // 能夠理解爲累加器的update // 每過10s打印一次 // 1> (false,2) : 1588391750005 // 10> (true,1) : 1588391760003 // 10> (true,4) : 1588391770002 // 10> (true,1) : 1588391780000 /** transform2 */ // 最近15s內上報日誌異常條數 val result: DataStream[(String, Int)] = transaction .map(iterm => (iterm, 1)) // 若是包含某一條帶有error的日誌,那麼將添加統計 .filter(_._1.contains("error")) // 過濾正常的日誌 .keyBy(0) // 按照索引位爲0號的日誌自己以及是否正常進行分組 .timeWindow(Time.seconds(15), Time.seconds(5)) // 窗口設置爲15s,滑動步長設置爲5s .reduce((history, add) => (history._1, history._2 + add._2)) // 核心是理解reduce方法須要傳入的是什麼 // 每過5s打印一次 // 14> (error,1) : 1588392335002 // 14> (error,3) : 1588392340001 // 14> (error,3) : 1588392345004 // 14> (error,2) : 1588392350003 // sink result.map(iterm => iterm.toString() + " : " + System.currentTimeMillis()).print() env.execute("KafkaFlinkStreamTimeWindow") } }
package cn.lang.flink.demo import java.text.SimpleDateFormat import java.util.Properties import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 import org.apache.flink.streaming.util.serialization.SimpleStringSchema import scala.collection.mutable /** * watermark * watermark = maxEventTime - delayTimeInternal(different within every record) * invoke window calculate while watermark > timeWindowDeadline **/ object KafkaFlinkStreamEventTime { // 返回的是13位時間戳,精確度到毫秒 def dataToTimestamp(date: String): Long = { val sdf = new SimpleDateFormat("yyyy年MM月dd日HH:mm:ss") sdf.parse(date).getTime } def main(args: Array[String]): Unit = { // environment val env = StreamExecutionEnvironment.getExecutionEnvironment // attention site imported from別導錯包,這裏是啓用event time的機制 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // property val ZOOKEEPER_HOST = "localhost:2181" val KAFKA_BROKER = "localhost:9092" val TRANSACTION_GROUP = "transaction" val IN_KAFKA_TOPIC = "first" // set val kafkaProps = new Properties() kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST) kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKER) kafkaProps.setProperty("group.id", TRANSACTION_GROUP) kafkaProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") kafkaProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") kafkaProps.setProperty("auto.offset.reset", "latest") // source,這裏輸入的每一條數據格式:uid,2020年05月02日17:26:16 val transaction = env.addSource( new FlinkKafkaConsumer011[String](IN_KAFKA_TOPIC, new SimpleStringSchema(), kafkaProps)) .setParallelism(1) /** transform */ /* 轉換需求是將同一個uid窗口內上報的全部數據 */ val result: DataStream[mutable.HashSet[Long]] = transaction .map(iterm => Event(iterm.split(",")(0), dataToTimestamp(iterm.split(",")(1)))) // 傳入的每一條數據都是13位時間戳 .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[Event](Time.milliseconds(2000)) { override def extractTimestamp(element: Event): Long = element.timestamp }) .keyBy(_.uid) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .fold(new mutable.HashSet[Long]()) { case (set, iterm) => set += iterm.timestamp } // sink result.map(iterm => iterm.mkString(";") + " : " + System.currentTimeMillis()).print() env.execute("KafkaFlinkStreamEventTime") } } // 這是業務中數據按照對應格式封裝的樣例類,其中包含有event time case class Event(uid: String, timestamp: Long) // 這裏上報時間戳默認是10位 // 水印機制的類,也能夠使用BoundedOutOfOrdernessTimestampExtractor匿名實現類 class MyBoundedOutOfOrdernessTimestampExtractor(delayInterval: Long) extends AssignerWithPeriodicWatermarks[Event] { // 上一個發送的水印值(也就是上一個觸發窗口時的水印值) var lastEmittedWatermark: Long = 0L // 當前進入全部數據中最大的event time和上一次發送水印值的差值 var maxOutOfOrderness: Long = delayInterval // 當前進入全部數據中最大的event time var currentMaxTimestamp: Long = lastEmittedWatermark + this.maxOutOfOrderness // 獲取當前的水印 override def getCurrentWatermark: Watermark = { val tmp = this.currentMaxTimestamp - this.maxOutOfOrderness if (tmp >= lastEmittedWatermark) { lastEmittedWatermark = tmp } new Watermark(lastEmittedWatermark) } // 從數據樣例類中抽取時間戳 override def extractTimestamp(element: Event, previousElementTimestamp: Long): Long = { val tmp = element.timestamp if (tmp > currentMaxTimestamp) { currentMaxTimestamp = tmp } tmp } }