從日誌中讀取數據作一些統計聚合,最終把想要的數據輸出,基本是每一個公司都會有的場景,好比用戶分析,商品分析,實時大屏等等,咱們最先的作法是所有讀出來,寫到redis或者es去,而後web端再去實時查詢統計,其實有不少的弊端 要麼每次請求都會去從新算一遍耗性能不說還慢的不行,業務方接受不了 或者直接把統計信息存進去,但要實時統計就比較麻煩,好比1小時存一個點,那業務方可能要的是當前10:05-11:05的數據。。。 用Flink能夠很方便的實現,這裏分別展現從csv和kafka裏讀大批量日誌,flink來作統計每小時的熱門商品,以5分鐘作一個間隔,實現上每次延遲1s輸出一次模擬實時
統計近一小時的熱門商品,每5分鐘更新一次 熱門度用瀏覽次數(pv)來衡量
在全部用戶行爲中,過濾出來瀏覽(pv)行爲進行統計 構建一個滑動窗口,窗口長度爲1小時,每次滑動5分鐘
DataStream -> 過濾出來是瀏覽行爲的數據 -> 根據商品id進行分組生成KeydStream(key by 操做) -> 構建滑動窗口,窗口長度1小時,每次滑動5分鐘 -> 進行聚合算總數 -> 生成一個最終輸出的DataStream
一、新建項目
打開IDEA新建一個maven項目,UserBehaviorAnalysis,新建一個包 com.mafei.hotitems_analysis
二、父項目中準備依賴和打包相關 pom.xmljava
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.mafei</groupId> <artifactId>UserBehaviorAnalysis</artifactId> <packaging>pom</packaging> <version>1.0</version> <modules> <module>HotItemsAnalysis</module> </modules> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.10.1</flink.version> <scala.binary.version>2.12</scala.binary.version> <kafka.version>2.2.0</kafka.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${scala.binary.version}</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- 用hutool只是爲了造模擬數據用,實際生產中能夠不用引入--> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.5.6</version> </dependency> </dependencies> <build> <plugins> <!--該插件用於將scala代碼編譯成class文件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>4.4.0</version> <executions> <execution> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <!--打包用 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <configuration> <descriptorRefs> <descriptiorRef>jar-with-dependencies</descriptiorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
最終的項目結構:web
實現步驟主要包含幾步:redis
一、從文件中讀取數據 二、構建flink運行環境 三、按照商品id進行聚合,取出來每一個商品一段時間的數量 四、排序取TopN,而後輸出
實現代碼: 在 com.mafei.hotitems_analysis下面新建一個scala的object : HotItems.scalasql
package com.mafei.hotitems_analysis import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} import org.apache.flink.api.java.tuple.{Tuple, Tuple1} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import java.sql.Timestamp import scala.collection.mutable.ListBuffer //定義輸入數據樣例類(跟數據源的csv對應的上) case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long ) // 定義窗口聚合結果樣例類 case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long) object HotItems { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //防止亂序 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //定義取事件時間 //從文件中讀取數據 val inputStream = env.readTextFile("/opt/java2020_study/UserBehaviorAnalysis/HotItemsAnalysis/src/main/resources/UserBehavior.csv") val dataStream: DataStream[UserBehavior] = inputStream .map(data =>{ //data是讀的csv每一行的數據 val arr = data.split(",") //按照,分割成list UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt,arr(3),arr(4).toLong) //把對應字段塞到樣例類裏面 }) .assignAscendingTimestamps(_.timestamp * 1000L) //定時時間戳爲事件時間,*1000轉爲毫秒 //進行窗口聚合,獲得聚合結果 val aggStream: DataStream[ItemViewCount] = dataStream .filter(_.behavior == "pv") //過濾出來pv行爲的數據 .keyBy("itemId") .timeWindow(Time.hours(1),Time.minutes(5)) //設置一個滑動窗口,窗口大小是1小時,每次滑動5分鐘 .aggregate(new CountAgg(), new ItemViewCountWindowResult()) /** * 須要針對每個商品作分組,定義次數的狀態,由於要作topN,因此要作排序,定義延遲觸發,多久排序並輸出一次 */ val resultStream = aggStream .keyBy("windowEnd") //按照結束時間進行分組,收集當前窗口內的,取必定時間內的數據 .process(new TopNHostItem(10)) resultStream.print() env.execute("獲取商品的訪問次數") } } /** * 自定義一個預聚合函數AggregateFunction * 這裏AggregateFunction 傳入3個參數, * 第一個是輸入的數據類型,這個在 dataStream 中已經定義的輸出的類型了 * 第二個是中間的狀態,由於要計算總數,因此每次數量+1,有個值記錄下,因此是Long類型的 * 第三個是最終的輸出數據, 最終的數量,因此也是Long類型 */ class CountAgg() extends AggregateFunction[UserBehavior, Long, Long]{ override def createAccumulator(): Long = 0L // 定義初始化的數據,確定是0咯 //這裏每一個數據來了都會調用一次,因此直接在上一次結果上加一就能夠,這個acc就是中間狀態 override def add(in: UserBehavior, acc: Long): Long = acc +1 //最終輸出的數據 override def getResult(acc: Long): Long = acc //這個在session窗口中才有用,是多個狀態的時候作窗口合併的時候,這裏只有1個狀態,直接2個相加就能夠 override def merge(acc: Long, acc1: Long): Long = acc+acc1 } /** * 自定義一個窗口函數windowFunction,用於生成最終咱們要的數據結構 * WindowFunction 須要4個參數,分別是 * @tparam IN The type of the input value. 輸入類型,也就是數量 * @tparam OUT The type of the output value. 輸出類型,這個是自定義的,這裏定義一個輸出類,com.mafei.hotitems_analysis.ItemViewCount * @tparam KEY The type of the key. key的類型,由於作聚合了麼keyBy,由於keyBy輸出的是JavaTuple類型(能夠點到keyBy源碼裏面看下),因此須要定義類型的時候定義成Tuple */ class ItemViewCountWindowResult() extends WindowFunction[Long,ItemViewCount, Tuple,TimeWindow]{ override def apply(key: Tuple, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = { val itemId = key.asInstanceOf[Tuple1[Long]].f0 //是一個一元組(只有itemId字段),因此直接這樣子取 val windowEnd = window.getEnd val count = input.iterator.next() out.collect(ItemViewCount(itemId,windowEnd, count)) } } /** * * @param topN * * KeyedProcessFunction 傳入3個參數 * K, I, O * K: 排序的key字段,這裏用的是windowEnd 但由於keyBy輸出是JavaTuple類型,因此傳的是Tuple */ class TopNHostItem(topN: Int) extends KeyedProcessFunction[Tuple,ItemViewCount,String]{ //先定義一個ListState,保存全部商品的count,id,ts等信息,由於要最終從這個結果裏面排序截取topN個,因此得提早所有存下來才行 var itemViewCountListState: ListState[ItemViewCount] = _ override def open(parameters: Configuration): Unit = { itemViewCountListState = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("itemViewCountList", classOf[ItemViewCount])) } override def processElement(i: ItemViewCount, context: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit = { // 每來一條數據,直接追加到ListState itemViewCountListState.add(i) //註冊一個定時器,windowEnd+1秒以後觸發 context.timerService().registerEventTimeTimer(i.windowEnd + 1) } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = { //爲了方便排序,定義另外一個ListBuffer,保存ListState的全部數據 val allItemListBuffer: ListBuffer[ItemViewCount] = ListBuffer() val iter = itemViewCountListState.get().iterator() while (iter.hasNext){ allItemListBuffer += iter.next() } //清空ListState的數據,已經放到allItemListBuffer 準備計算了,等下次觸發就應該是新的了 itemViewCountListState.clear() // 先按照count,從大到小排序,而後再取前N個 val sortItemViewCounts = allItemListBuffer.sortBy(_.count)(Ordering.Long.reverse).take(topN) //格式化輸出數據: val result : StringBuilder = new StringBuilder result.append("當前窗口的結束時間:\t").append(new Timestamp(timestamp -1)).append("\n") //遍歷結果列表中的每一個ItemViewCount , 輸出到一行 for(i <- sortItemViewCounts.indices){ val currentItemViewCount = sortItemViewCounts(i) result.append("Top").append(i+1).append("\t") .append("商品id = ").append(currentItemViewCount.itemId).append("\t") .append("訪問量: ").append(currentItemViewCount.count).append("\n") } result.append("---------------------------------------\n\n\n") Thread.sleep(1000) out.collect(result.toString()) } }
UserBehavior.csv 的內容:shell
1,40000,12306,pv,1609512713 5,30000,12306,pv,1609512714 4,50000,12306,pv,1609512715 2,20000,12306,pv,1609512716
運行效果和代碼結構:apache
在開發的時候UserBehavior.csv 若是想看效果,最好仍是多準備點數據,不然一行一行的手敲有點麻煩,我寫了個java腳本,GenerateData.java 運行完,就能給UserBehavior.csv模擬寫入10萬條數據作測試了bootstrap
/* @author mafei * @date 2021/1/1 */ package com.mafei.hotitems_analysis; import cn.hutool.core.io.file.FileWriter; import cn.hutool.core.util.RandomUtil; import java.time.LocalDateTime; import java.time.ZoneOffset; public class GenerateData { public static void main(String[] args) { String csvPath = "/opt/java2020_study/UserBehaviorAnalysis/HotItemsAnalysis/src/main/resources/UserBehavior.csv"; Integer[] userId= new Integer[]{1,2,3,4,5,9}; Integer[] itemId= new Integer[]{10000,20000,30000,40000,50000,60000}; int categoryId = 12306; StringBuffer content = new StringBuffer(); Long second = LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8")); for (int i = 0; i < 100000; i++) { content.append(RandomUtil.randomEle(userId)).append(",") .append(RandomUtil.randomEle(itemId)).append(",") .append(categoryId).append(",pv,").append(second + i).append("\n"); } FileWriter writer = new FileWriter(csvPath); writer.write(content.toString()); } }
也是同樣,只須要把從csv改爲讀kafka就能夠windows
//一、從文件中讀取數據 // val inputStream = env.readTextFile("/opt/java2020_study/UserBehaviorAnalysis/HotItemsAnalysis/src/main/resources/UserBehavior.csv") //二、從kafka中讀取數據 val properties = new Properties() properties.setProperty("bootstrap.servers", "127.0.0.1:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("auto.offset.reset", "latest") val inputStream = env.addSource(new FlinkKafkaConsumer[String]("hotItems",new SimpleStringSchema(), properties)) //把kafka中的數據打印出來,看下具體的值 inputStream.print()
關於kafka安裝部署能夠參考以前的文章,找個centos虛擬機 http://www.javashuo.com/article/p-yisbokxi-hm.html
不少狀況下不想改配置文件裏監聽端口,又想直接連服務器上的端口,推薦使用端口轉發,把服務器端口轉到127.0.0.1就能夠,像xshell,terminus等等這些工具都有這個功能centos
累加規則---窗口內 碰到一條數據就加一(add方法) 實現AggregateFunction接口 Interface AggregateFunction<IN,ACC,OUT> 實現輸出結構----itemViewCount(itemid,windowEnd,count) 實現WindowFunction接口
-針對有狀態流的底層api 1.-keydProcessFunction會對分區後的每個子流進程處理 2.- 以windowEnd做爲key,保證分流後每一條流都在一個時間窗口內 3.-從ListState中讀取當前流的狀態,存儲數據進行輸出
分區以後每隔KeyedStream都有本身的生命週期 1. -open,初始化,從這裏能夠獲取當前流狀態 2. -processElement,處理流中每一個元素時調用 3. -onTimer: 定時調用,註冊定時器Timer並觸發以後的回調操做