Flink實戰-實時讀取日誌文件和kafka,持續統計TOP10熱門商品

從日誌中讀取數據作一些統計聚合,最終把想要的數據輸出,基本是每一個公司都會有的場景,好比用戶分析,商品分析,實時大屏等等,咱們最先的作法是所有讀出來,寫到redis或者es去,而後web端再去實時查詢統計,其實有不少的弊端
要麼每次請求都會去從新算一遍耗性能不說還慢的不行,業務方接受不了
或者直接把統計信息存進去,但要實時統計就比較麻煩,好比1小時存一個點,那業務方可能要的是當前10:05-11:05的數據。。。
用Flink能夠很方便的實現,這裏分別展現從csv和kafka裏讀大批量日誌,flink來作統計每小時的熱門商品,以5分鐘作一個間隔,實現上每次延遲1s輸出一次模擬實時

基本需求:

統計近一小時的熱門商品,每5分鐘更新一次
熱門度用瀏覽次數(pv)來衡量

解決思路:

在全部用戶行爲中,過濾出來瀏覽(pv)行爲進行統計
構建一個滑動窗口,窗口長度爲1小時,每次滑動5分鐘

整個實現流程:

DataStream
-> 過濾出來是瀏覽行爲的數據
-> 根據商品id進行分組生成KeydStream(key by 操做)
-> 構建滑動窗口,窗口長度1小時,每次滑動5分鐘
-> 進行聚合算總數
-> 生成一個最終輸出的DataStream

環境準備

一、新建項目
打開IDEA新建一個maven項目,UserBehaviorAnalysis,新建一個包 com.mafei.hotitems_analysis
二、父項目中準備依賴和打包相關 pom.xmljava

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.mafei</groupId>
    <artifactId>UserBehaviorAnalysis</artifactId>
    <packaging>pom</packaging>
    <version>1.0</version>
    <modules>
        <module>HotItemsAnalysis</module>
    </modules>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>

        <flink.version>1.10.1</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <kafka.version>2.2.0</kafka.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_${scala.binary.version}</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
                <!-- 用hutool只是爲了造模擬數據用,實際生產中能夠不用引入-->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.5.6</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!--該插件用於將scala代碼編譯成class文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>4.4.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <!--打包用 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptiorRef>jar-with-dependencies</descriptiorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

最終的項目結構:
Flink實戰-實時讀取日誌文件和kafka,持續統計TOP10熱門商品web

三、開始實戰

實現步驟主要包含幾步:redis

一、從文件中讀取數據
二、構建flink運行環境
三、按照商品id進行聚合,取出來每一個商品一段時間的數量
四、排序取TopN,而後輸出

實現代碼: 在 com.mafei.hotitems_analysis下面新建一個scala的object : HotItems.scalasql

package com.mafei.hotitems_analysis

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.tuple.{Tuple, Tuple1}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import java.sql.Timestamp
import scala.collection.mutable.ListBuffer

//定義輸入數據樣例類(跟數據源的csv對應的上)
case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long )

// 定義窗口聚合結果樣例類
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)

object HotItems {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1) //防止亂序
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //定義取事件時間

    //從文件中讀取數據
    val inputStream = env.readTextFile("/opt/java2020_study/UserBehaviorAnalysis/HotItemsAnalysis/src/main/resources/UserBehavior.csv")

    val dataStream: DataStream[UserBehavior] = inputStream
      .map(data =>{     //data是讀的csv每一行的數據
        val arr = data.split(",")   //按照,分割成list
        UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt,arr(3),arr(4).toLong) //把對應字段塞到樣例類裏面
      })
      .assignAscendingTimestamps(_.timestamp * 1000L) //定時時間戳爲事件時間,*1000轉爲毫秒

    //進行窗口聚合,獲得聚合結果
    val aggStream: DataStream[ItemViewCount] = dataStream
      .filter(_.behavior == "pv") //過濾出來pv行爲的數據
      .keyBy("itemId")
      .timeWindow(Time.hours(1),Time.minutes(5)) //設置一個滑動窗口,窗口大小是1小時,每次滑動5分鐘
      .aggregate(new CountAgg(), new ItemViewCountWindowResult())

    /**
     * 須要針對每個商品作分組,定義次數的狀態,由於要作topN,因此要作排序,定義延遲觸發,多久排序並輸出一次
     */

    val resultStream = aggStream
      .keyBy("windowEnd") //按照結束時間進行分組,收集當前窗口內的,取必定時間內的數據
      .process(new TopNHostItem(10))

    resultStream.print()
    env.execute("獲取商品的訪問次數")
  }
}

/**
 * 自定義一個預聚合函數AggregateFunction
 * 這裏AggregateFunction 傳入3個參數,
 *      第一個是輸入的數據類型,這個在 dataStream 中已經定義的輸出的類型了
 *      第二個是中間的狀態,由於要計算總數,因此每次數量+1,有個值記錄下,因此是Long類型的
 *      第三個是最終的輸出數據, 最終的數量,因此也是Long類型
 */

class CountAgg() extends AggregateFunction[UserBehavior, Long, Long]{
  override def createAccumulator(): Long = 0L  // 定義初始化的數據,確定是0咯

  //這裏每一個數據來了都會調用一次,因此直接在上一次結果上加一就能夠,這個acc就是中間狀態
  override def add(in: UserBehavior, acc: Long): Long = acc +1

  //最終輸出的數據
  override def getResult(acc: Long): Long = acc

  //這個在session窗口中才有用,是多個狀態的時候作窗口合併的時候,這裏只有1個狀態,直接2個相加就能夠
  override def merge(acc: Long, acc1: Long): Long = acc+acc1
}

/**
 * 自定義一個窗口函數windowFunction,用於生成最終咱們要的數據結構
 * WindowFunction 須要4個參數,分別是
 *      @tparam IN The type of the input value.  輸入類型,也就是數量
 *      @tparam OUT The type of the output value.  輸出類型,這個是自定義的,這裏定義一個輸出類,com.mafei.hotitems_analysis.ItemViewCount
 *      @tparam KEY The type of the key.    key的類型,由於作聚合了麼keyBy,由於keyBy輸出的是JavaTuple類型(能夠點到keyBy源碼裏面看下),因此須要定義類型的時候定義成Tuple
 */
class ItemViewCountWindowResult() extends  WindowFunction[Long,ItemViewCount, Tuple,TimeWindow]{
  override def apply(key: Tuple, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
    val itemId = key.asInstanceOf[Tuple1[Long]].f0  //是一個一元組(只有itemId字段),因此直接這樣子取
    val windowEnd = window.getEnd
    val count = input.iterator.next()
    out.collect(ItemViewCount(itemId,windowEnd, count))
  }
}

/**
 *
 * @param topN
 *
 * KeyedProcessFunction 傳入3個參數
 *  K, I, O
 *  K: 排序的key字段,這裏用的是windowEnd 但由於keyBy輸出是JavaTuple類型,因此傳的是Tuple
 */
class TopNHostItem(topN: Int) extends KeyedProcessFunction[Tuple,ItemViewCount,String]{
  //先定義一個ListState,保存全部商品的count,id,ts等信息,由於要最終從這個結果裏面排序截取topN個,因此得提早所有存下來才行
  var itemViewCountListState: ListState[ItemViewCount] = _

  override def open(parameters: Configuration): Unit = {

    itemViewCountListState = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("itemViewCountList", classOf[ItemViewCount]))
  }

  override def processElement(i: ItemViewCount, context: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
    // 每來一條數據,直接追加到ListState
    itemViewCountListState.add(i)

    //註冊一個定時器,windowEnd+1秒以後觸發
    context.timerService().registerEventTimeTimer(i.windowEnd + 1)

  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {

    //爲了方便排序,定義另外一個ListBuffer,保存ListState的全部數據
    val allItemListBuffer: ListBuffer[ItemViewCount] = ListBuffer()
    val iter = itemViewCountListState.get().iterator()
    while (iter.hasNext){
      allItemListBuffer += iter.next()
    }

    //清空ListState的數據,已經放到allItemListBuffer 準備計算了,等下次觸發就應該是新的了
    itemViewCountListState.clear()

    // 先按照count,從大到小排序,而後再取前N個
    val sortItemViewCounts = allItemListBuffer.sortBy(_.count)(Ordering.Long.reverse).take(topN)

    //格式化輸出數據:
    val result : StringBuilder = new StringBuilder
    result.append("當前窗口的結束時間:\t").append(new Timestamp(timestamp -1)).append("\n")

    //遍歷結果列表中的每一個ItemViewCount , 輸出到一行
    for(i <- sortItemViewCounts.indices){
      val currentItemViewCount = sortItemViewCounts(i)
      result.append("Top").append(i+1).append("\t")
        .append("商品id = ").append(currentItemViewCount.itemId).append("\t")
        .append("訪問量: ").append(currentItemViewCount.count).append("\n")
    }

    result.append("---------------------------------------\n\n\n")
    Thread.sleep(1000)
    out.collect(result.toString())
  }
}

UserBehavior.csv 的內容:shell

1,40000,12306,pv,1609512713
5,30000,12306,pv,1609512714
4,50000,12306,pv,1609512715
2,20000,12306,pv,1609512716

運行效果和代碼結構:
Flink實戰-實時讀取日誌文件和kafka,持續統計TOP10熱門商品apache

在開發的時候UserBehavior.csv 若是想看效果,最好仍是多準備點數據,不然一行一行的手敲有點麻煩,我寫了個java腳本,GenerateData.java 運行完,就能給UserBehavior.csv模擬寫入10萬條數據作測試了bootstrap

/*
  @author mafei
 * @date 2021/1/1
 */
package com.mafei.hotitems_analysis;

import cn.hutool.core.io.file.FileWriter;
import cn.hutool.core.util.RandomUtil;

import java.time.LocalDateTime;
import java.time.ZoneOffset;

public class GenerateData {
    public static void main(String[] args) {
        String csvPath = "/opt/java2020_study/UserBehaviorAnalysis/HotItemsAnalysis/src/main/resources/UserBehavior.csv";
        Integer[] userId= new Integer[]{1,2,3,4,5,9};
        Integer[] itemId= new Integer[]{10000,20000,30000,40000,50000,60000};
        int categoryId = 12306;
        StringBuffer content = new StringBuffer();
        Long second = LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8"));

        for (int i = 0; i < 100000; i++) {
            content.append(RandomUtil.randomEle(userId)).append(",")
                    .append(RandomUtil.randomEle(itemId)).append(",")
                    .append(categoryId).append(",pv,").append(second + i).append("\n");
        }
        FileWriter writer = new FileWriter(csvPath);
        writer.write(content.toString());
    }
}

從kakfa中讀取數據

也是同樣,只須要把從csv改爲讀kafka就能夠windows

//一、從文件中讀取數據
//    val inputStream = env.readTextFile("/opt/java2020_study/UserBehaviorAnalysis/HotItemsAnalysis/src/main/resources/UserBehavior.csv")

    //二、從kafka中讀取數據
    val properties = new Properties()

    properties.setProperty("bootstrap.servers", "127.0.0.1:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("auto.offset.reset", "latest")
    val inputStream = env.addSource(new FlinkKafkaConsumer[String]("hotItems",new SimpleStringSchema(), properties))
    //把kafka中的數據打印出來,看下具體的值
    inputStream.print()

關於kafka安裝部署能夠參考以前的文章,找個centos虛擬機 http://www.javashuo.com/article/p-yisbokxi-hm.html
不少狀況下不想改配置文件裏監聽端口,又想直接連服務器上的端口,推薦使用端口轉發,把服務器端口轉到127.0.0.1就能夠,像xshell,terminus等等這些工具都有這個功能
Flink實戰-實時讀取日誌文件和kafka,持續統計TOP10熱門商品centos

實現思路圖解:

Flink實戰-實時讀取日誌文件和kafka,持續統計TOP10熱門商品

Flink實戰-實時讀取日誌文件和kafka,持續統計TOP10熱門商品

設置時間窗口

Flink實戰-實時讀取日誌文件和kafka,持續統計TOP10熱門商品

Flink實戰-實時讀取日誌文件和kafka,持續統計TOP10熱門商品

再作窗口聚合

Flink實戰-實時讀取日誌文件和kafka,持續統計TOP10熱門商品

窗口聚合策略--每出現一次就加一

Flink實戰-實時讀取日誌文件和kafka,持續統計TOP10熱門商品

累加規則---窗口內  碰到一條數據就加一(add方法)
實現AggregateFunction接口
Interface AggregateFunction<IN,ACC,OUT>

實現輸出結構----itemViewCount(itemid,windowEnd,count)
實現WindowFunction接口

Flink實戰-實時讀取日誌文件和kafka,持續統計TOP10熱門商品

進行統計整理輸出----keyBy("windowEnd")

Flink實戰-實時讀取日誌文件和kafka,持續統計TOP10熱門商品
Flink實戰-實時讀取日誌文件和kafka,持續統計TOP10熱門商品

最終輸出排序----keydProcessFunction

-針對有狀態流的底層api
1.-keydProcessFunction會對分區後的每個子流進程處理
2.- 以windowEnd做爲key,保證分流後每一條流都在一個時間窗口內
3.-從ListState中讀取當前流的狀態,存儲數據進行輸出

用ProcessFunction來定義KeyedStream的處理邏輯

分區以後每隔KeyedStream都有本身的生命週期
1.  -open,初始化,從這裏能夠獲取當前流狀態
2.  -processElement,處理流中每一個元素時調用
3.  -onTimer: 定時調用,註冊定時器Timer並觸發以後的回調操做

Flink實戰-實時讀取日誌文件和kafka,持續統計TOP10熱門商品

相關文章
相關標籤/搜索