Flink| 實時須要分析

 

 

========================實時流量統計web

 1. 實時熱門商品HotItems
 每隔 5 分鐘輸出最近一小時內點擊量最多的前 N 個商品。
抽取出業務時間戳,告訴 Flink 框架基於業務時間作窗口
• 過濾出點擊行爲數據
• 按一小時的窗口大小,每 5 分鐘統計一次,作滑動窗口聚合( Sliding Window)
• 按每一個窗口聚合,輸出每一個窗口中點擊量前 N 名的商品



2. 實時流量統計 NetworkFlow
 "實時流量統計" 對於一個電商平臺而言,用戶登
錄的入口流量、不一樣頁面的訪問流量 都是值得分析的重要數據,而這些數據,能夠
簡單地從 web 服務器的日誌中提取出來。

實現"熱門頁面瀏覽數"的統計,也就是讀取服務器日誌中的每
一行log統計在一段時間內用戶訪問每個url的次數,而後排序輸出顯示。
具體作法爲:
每隔 5 秒,輸出最近 10 分鐘內訪問 量最多的前 N 個 URL。能夠看出,這個需求與以前「實時熱門商品統計」很是相似,
因此咱們徹底能夠借鑑此前的代碼。



3. PV 網站頁面流量 - PageView
  衡量網站流量一個最簡單的指標,就是網站的頁面瀏覽量(Page View PV );
用戶每次打開一個頁面便記錄 1 次 PV ,屢次打開同一頁面則瀏覽量累計。通常來講PV 與來訪者的數量成正比,可是 PV 並不直接決定頁面的真實來訪者數量,
如同一個來訪者經過不斷的刷新頁面,也能夠製造出很是高的 PV 。
咱們知道,用戶瀏覽頁面時,會從瀏覽器向網絡服務器 發出一個請求 Request網絡服務器接到這個請求後,會將該請求對應的一個網頁( Page )發送給瀏覽器
從而產生了一個 PV。因此咱們的統計方法,能夠是從 web 服務器的日誌中去提取對應的頁面訪問而後統計,就向上一節中的作法同樣;也能夠直接從埋點日誌中提
取用戶發來的頁面請求,從而統計出總瀏覽量。

實現一個網站總瀏覽量的統計。能夠設置滾動時間窗口,實時統計每小時內的網站 PV


4. UV 獨立訪客數
* 上例中,咱們統計的是全部用戶對頁面的全部瀏覽行爲,也就是說,同一用戶的瀏覽行爲會被重複統計。而在實際應用中,咱們每每還會關注,在一段
* 時間內到底有多少不一樣的用戶訪問了網站。另一個統計流量的重要指標是網站的獨立訪客數(Unique Visitor UV )。 UV指的是一段時間(好比一小時)內訪問網站 的 總人數, 1 天內同一訪客的屢次訪問
* 只記錄爲一個訪客。經過 IP 和 cookie 通常 是判斷 UV 值的兩種方式。 當客戶端第一次訪問某個網站服務器的時候,網站服務器會給這個客戶端的電腦發出 一個 Cookie
* 一般放在這個客戶端電腦的 C 盤當中。在這個 Cookie 中會分配一個獨一無二的編號,這其中會記錄一些訪問服務器的信息,如訪問時間,訪問了哪些頁面等等。當你下次再訪問這個服務器的時候,服務器就能夠直接從你的電腦中找到上一次放進去的
* Cookie 文件,而且對其進行一些更新,但那個獨一無二的編號是不會變的。
* 此例中能夠根據 userId 來區分不一樣的用戶。


5. 使用布隆過濾器查重-過濾的UV統計
/**
* 上例中,把全部數據的userId 都存在了窗口計算的狀態裏,在窗口收集數據的過程當中,狀態會不斷增大。通常狀況下,只要不超出內存的承受範圍,
* 這種作法也沒什麼問題;但若是咱們遇到的數據量很大呢?把全部數據暫存放到內存裏,顯然不是一個好注意。咱們會想到,能夠利用 redis這種內存級 k v 數據庫,爲咱們作一個緩存。
* 但若是咱們遇到的狀況很是極端,數據大到驚人呢?好比上億級的用戶,要去重計算 UV 。
* 若是放到redis 中,億級的用戶id (每一個 20 字節左右的話)可能須要幾G甚至幾十G的空間來存儲。固然放到 redis 中,用集羣進行擴展也不是不能夠,但明顯
* 代價太大了。一個更好的想法是,其實咱們不須要完整地存儲用戶ID 的信息,只要知道他在不在就好了。因此其實咱們能夠進行壓縮處理,用一位( bit )就能夠表示一個用戶
* 的狀態。這個思想的具體實現就是布隆過濾器( Bloom Filter )。
* 本質上布隆過濾器是一種數據結構,比較巧妙的機率型數據結構(probabilisticdata structure ),特色是高效地插入和查詢,能夠用來告訴你 「某樣東西必定不存在或者可能存在」。
* 它自己是一個很長的二進制向量,既然是二進制的向量,那麼顯而易見的,存放的不是 0 ,就是 1 。 相比於傳統的 List 、 Set 、 Map 等數據結構,它更高效、佔用空間更少,
* 可是缺點是其返回的結果是機率性的,而不是確切的。
* 咱們的目標就是,利用某種方法(通常是Hash 函數)把每一個數據,對應到一個位圖的某一位上去;若是數據存在,那一位就是1,不存在則爲 0 。
*/

 

 

判斷當前最大的時間戳 <= 當前的watermark,就返回一個TriggerResult.FIRE(觸發);不然就註冊一個定時器(關窗的操做)redis

 

 

 
TriggerResult的類型:CONTINUE-什麼都不作繼續處理窗口;FIRE觸發窗口的計算操做但並不會關閉窗口清除它的狀態;PURGE清除窗口的狀態;FIRE_AND_PURGE觸發並清除掉;

 

 

 

 

 

 redis:數據庫

 

 

==========================市場營銷商業指標統計分析===========
* 對於電商企業來講,通常會經過各類不一樣的渠道對本身的APP進行市場推廣,而這些渠道的統計數據(好比,不一樣網站上廣告連接的點擊量、APP下載量)就成了市場營銷的重要商業指標。
* 首先考察分渠道的市場推廣統計。
* 須要自定義一個測試源SimulatedEventSource來生成用戶行爲的事件流。
*1. 分渠道統計 AppMarketingByChannel.scala
 
/**
* 2. 不分渠道(總量)統計
* 一樣咱們還能夠考察不分渠道的市場推廣統計,這樣獲得的就是全部渠道推廣的總量 AppMarketing.scala 。
* /


/**
* 電商網站的市場營銷商業指標中,除了自身的APP 推廣,還會考慮到頁面上的廣告投放(包括本身經營的產品和其它網站的廣告)。 因此廣告相關的統計分析,也是市場營銷的重要指標。
* 對於廣告的統計,最簡單也最重要的就是頁面廣告的點擊量,網站每每須要根據廣告點擊量來制定訂價策略和調整推廣方式,並且也能夠藉此收集用戶的偏好信息。
* 更加具體的應用是,咱們能夠根據用戶的地理位置進行劃分,從而總結出不一樣省份用戶對不一樣廣告的偏好,這樣更有助於廣告的精準投放。
* 3. 頁面廣告點擊量統計
* 接下來咱們就進行頁面廣告按照省份劃分的點擊量的統計。AdStatisticsByGeo .scala 文件 。
* 自定義一些測試數據AdClickLog,用來生成用戶點擊廣告行爲的事件流。
* 主函數以 province 進行 keyBy ,而後開一小時的時間窗口,滑動距離爲5秒,統計窗口內的點擊事件數量。
*
* 廣告點擊量統計,同一用戶的重複點擊是會疊加計算的。在實際場景中,同一用戶確實可能反覆點開同一個廣告,這也說明了用戶對廣告更大的興趣;
* 可是若是用戶在一段時間很是頻繁地點擊廣告,這顯然不是一個正常行爲,有刷點擊量的嫌疑。因此咱們能夠對一段時間內(好比一天內)的用戶點擊行爲進行約束,
* 若是對同一個廣告點擊超過必定限額(好比 100 次),應該把該用戶加入黑名單並報警,此後其點擊行爲不該該再統計。
* 4. 黑名單過濾



==========================惡意登陸監控==================
* 對於網站而言,用戶登陸並非頻繁的業務操做。若是一個用戶短期內頻繁登陸失敗,就有多是出現了程序的惡意攻擊,好比密碼暴力破解。所以咱們考慮,
* 應該對用戶的登陸失敗動做進行統計,具體來講,若是同一用戶(能夠是不一樣 IP)在2秒以內連續兩次登陸失敗,就認爲存在惡意登陸的風險,輸出相關的信息進行
* 報警提示。這是電商網站、也是幾乎全部 網站風控的基本一環。
* 1. 狀態編程的方式實現:LoginFail .scala
* 因爲一樣引入了時間,咱們能夠想到,最簡單的方法其實與以前的熱門統計相似,只須要按照用戶 ID 分流,而後遇到登陸失敗的事件時將其保存在 ListState 中,
* 而後設置一個定時器,2秒後觸發。定時器觸發時檢查狀態中的登陸失敗事件個數,若是大於等於2,那麼就輸出報警信息。
*
* 新建一個單例對象。 定義樣例類LoginEvent ,這是輸入的登陸事件流。登陸數據本應該從UserBehavior日誌裏提取
* 因爲UserBehavior.csv中沒有作相關埋點,從另外一個文件 LoginLog.csv 中讀取登陸數據 。
*
*
* 2. 優化操做:
* 第一次的代碼實現中咱們能夠看到,直接把每次登陸失敗的數據存起來、設置定時器一段時間後再讀取,這種作法儘管簡單,但和咱們開始的需求仍是略有差別
* 的。這種作法只能隔 2 秒以後去判斷一下這期間是否有屢次失敗登陸,而不是在一次登陸失敗以後、再一次登陸失敗時就馬上報警。這個需求若是嚴格實現起來,相
* 當於要判斷任意緊鄰的事件,是否符合某種模式。因而咱們能夠想到,這個需求其實能夠不用定時器觸發,直接在狀態中存取上一次登陸失敗的事件,每次都作判斷和比對,就能夠實現最初的需求。
* 在代碼MatchFunction中刪掉onTimer processElement
*
 
* 咱們經過對狀態編程的改進,去掉了定時器,在 process function 中作了
* 更多的邏輯處 理,實現了最初的需求。不過這種方法裏有不少的條件判斷,目前僅僅實現的是檢測「連續2次登陸失敗」,這是最簡單的情形。
* 若是須要檢測更屢次,內部邏輯顯然會變得很是複雜。那有什麼方式能夠方便地實現呢?
* flink爲咱們提供了CEP Complex Event Processing ,復瑣事件處理庫,用於在流中篩選符合某種複雜模式的事件。
* 3. 基於 CEP 來完成這個模塊的實現。

========================訂單支付實時監控=========================

      在電商網站中,訂單的支付做爲直接與營銷收入掛鉤的一環,在業務流程中很是重要。對於訂單而言,爲了正確控制業務流程,也爲了增長用戶的支付意願,網
 站通常會設置一個支付失效時間,超過一段時間不支付的訂單就會被取消。另外,對於訂單的支付,咱們還應保證用戶支付的正確性,這能夠經過第三方支付平臺的
 對於訂單的支付,咱們還應保證用戶支付的正確性,這能夠經過第三方支付平臺的交易數據來作一個實時對帳。編程

  將實現這兩個需求。交易數據來作一個實時對帳。瀏覽器

* 在電商平臺中最終創造收入和利潤的是用戶下單購買的環節;更具體一點,是用戶真正完成支付動做的時候。用戶下單的行爲能夠代表用戶對商品的需求,但
* 在現實中,並非每次下單都會被用戶馬上支付。當拖延一段時間後,用戶支付的意願會下降。因此爲了讓用戶更有緊迫感從而提升支付轉化率,同時也爲了防範訂
* 單支付環節的安全風險,電商網站每每會對訂單狀態進行監控,設置一個失效時間(好比 15 分鐘),若是下單後一段時間仍未支付,訂單就會被 取消。
* 使用 CEP 實現
* 利用 CEP 庫來實現這個功能。咱們先將事件流按照訂單號orderId分流,
* 定義這樣的一個事件模式:在15分鐘內,事件「create」與pay非嚴格緊鄰,這樣調用.select 方法時,就能夠同時獲取到匹配出的事件和超時未匹配的事件。
* 1. CEP實現訂單超時報警

 

 

* 2. 用狀態編程來實現:
* 咱們一樣能夠利用Process Function ,自定義實現檢測訂單超時的功能。爲了簡化問題,咱們只考慮超時報警的情形,在 pay 事件超時未發生的狀況下,輸出超時報警信息。
* 一個簡單的思路是,能夠在訂單的create 事件到來後註冊定時器,15分鐘後觸發;而後再用一個布爾類型的 Value 狀態來做爲標識位,代表 pay 事件是否發生過。
* 若是 pay 事件已經發生,狀態被置爲 true ,那麼就再也不須要 作什麼操做;而若是 pay事件一直沒來,狀態一直爲 false,到定時器觸發時,就應該輸出超時報警信息。
* 如今只考慮兩種狀況:①來一個create,來一個pay create後邊有pay就正常匹配,若是沒來就超時報警
* 亂序的數據,有可能create和pay的前後順序
* 超時報警的狀況: 遇到create設一個定時器,遇到pay改一個狀態(或者不刪定時器,直接設定一個狀態看有沒有pay來過,有則定時器觸發時說是正常的,沒有就超時報警

-----來自兩條流的訂單交易匹配----------
* 對於訂單支付事件,用戶支付完成其實並不算完,咱們還得確認平臺帳戶上是否到帳了。而每每這會來自不一樣的日誌信息,因此咱們要同時讀入兩條流的數據來
* 作合併處理。這裏咱們利用 connect 將兩條流進行鏈接,
* 1. 用自定義的CoProcessFunction 進行處理。


* 2. 雙流join
* window join(Tumbling Window Join、 Sliding Window Join)適用於兩條流join,後邊還要開窗口的分析
*Interval join(區間join)適用於傳感器報警(溫度煙霧出現異常,它倆時間得匹配上在同一時間範圍內同時出現,溫度又升高的很快)
* Join中當作狀態保存起來
*此需求是兩條流匹配上就能夠了



統計類:讀取數據、作簡單包裝轉換map、filter、按某個字段分組,開窗,作聚合
排序| TopN:再作一個ProcessFunction,把全部數據都收集到排序輸出;
以上是基於DataStreamAPI,也能夠用高級API、TableAPI和FlinkSQL
業務流程中的狀態作檢測輸出和警告:自定義編程、狀態
事件邏輯、風控:CEP
相關文章
相關標籤/搜索