在電商運營工做中,營銷活動是很是重要的部分,對用戶增加和GMV都有很大幫助。對電商運營來講,如何從龐大的商品庫中篩選出賣家優質商品並推送給有須要的買家購買是每時每刻都要思索的問題,並且這個過程須要儘量快和實時。保證快和實時就能夠提高買賣雙方的用戶體驗,提升用戶粘性。算法
爲了解決上面提到的問題,閒魚研發了馬赫系統。馬赫是一個實時高性能的商品選品系統,解決在億級別商品中經過規則篩選優質商品並進行投放的場景。有了馬赫系統以後,閒魚的運營同窗能夠在馬赫系統上建立篩選規則,好比商品標題包含「小豬佩奇」、類目爲「玩具」、價格不超過100元且商品狀態爲未賣出。在運營建立規則後,馬赫系統會同時進行兩步操做,第一步是從存量商品數據篩選符合條件的商品進行打標;第二步是對商品實時變動進行規則計算,實時同步規則命中結果。數據庫
馬赫系統最大的特色是快而實時,體如今命中規模爲100w的規則能夠在10分鐘以內完成打標;商品自己變動致使的規則命中結果同步時間爲1秒鐘。運營能夠經過馬赫系統快速篩選商品向用戶投放,閒魚的流量也能夠精準投給符合條件的商品而且將流量利用到最大化。數組
那麼馬赫系統是如何解決這一典型的電商問題的呢,馬赫系統和流計算有什麼關係呢,這是下面要詳細說明的部分。框架
流計算是持續、低延遲、事件觸發的數據處理模型。流計算模型是使用實時數據集成工具,將數據實時變化傳輸到流式數據存儲,此時數據的傳輸變成實時化,將長時間累積大量的數據平攤到每一個時間點不停地小批量實時傳輸;流計算會將計算邏輯封裝爲常駐計算服務,一旦啓動就一直處於等待事件觸發狀態,當有數據流入後會觸發計算迅速獲得結果;當流計算獲得計算結果後能夠馬上將數據輸出,無需等待總體數據的計算結果。ide
閒魚實時選品系統使用的流計算框架是Blink,Blink是阿里巴巴基於開源流計算框架Flink定製研發的企業級流計算框架,能夠認爲是Flink的增強版,如今已經開源。Flink是一個高吞吐、低延遲的計算引擎,同時還提供不少高級功能。好比它提供有狀態的計算,支持狀態管理,支持強一致性的數據語義以及支持Event Time,WaterMark對消息亂序的處理等特性,爲閒魚實時選品系統的超低延時選品提供了有力支持。函數
State是指流計算過程當中計算節點的中間計算結果或元數據屬性,好比在aggregation過程當中要在state中記錄中間聚合結果,好比Apache Kafka做爲數據源時候,咱們也要記錄已經讀取記錄的offset,這些State數據在計算過程當中會進行持久化(插入或更新)。因此Blink中的State就是與時間相關的,Blink任務的內部數據(計算數據和元數據屬性)的快照。
馬赫系統會在State中保存商品合併以後的所有數據和規則運行結果數據。當商品發生變動後,馬赫系統會將商品變動信息與State保存的商品信息進行合併,並將合併的信息做爲入參運行全部規則,最後將規則運行結果與State保存的規則運行結果進行Diff後獲得最終有效的運行結果。因此Blink的State特性是馬赫系統依賴的關鍵特性。工具
Blink的Window特性特指流計算系統特有的數據分組方式,Window的建立是數據驅動的,也就是說,窗口是在屬於此窗口的第一個元素到達時建立。當窗口結束時候刪除窗口及狀態數據。Blink的Window主要包括兩種,分別爲滾動窗口(Tumble)和滑動窗口(Hop)。
滾動窗口有固定大小,在每一個窗口結束時進行一次數據計算,也就是說滾動窗口任務每通過一次固定週期就會進行一次數據計算,例如每分鐘計算一次總量。性能
滑動窗口與滾動窗口相似,窗口有固定的size,與滾動窗口不一樣的是滑動窗口能夠經過slide參數控制滑動窗口的新建頻率。所以當slide值小於窗口size的值的時候多個滑動窗口會重疊,此時數據會被分配給多個窗口,以下圖所示:測試
Blink的Window特性在數據計算統計方面有不少使用場景,馬赫系統主要使用窗口計算系統處理數據的實時速度和延時,用來進行數據統計和監控告警。ui
UDX是Blink中用戶自定義函數,能夠在任務中調用以實現一些定製邏輯。Blink的UDX包括三種,分別爲:
馬赫系統中使用了大量的UDX進行邏輯定製,包括消息解析、數據處理等。而馬赫系統最核心的商品數據合併、規則運行和結果Diff等流程就是經過UDAF實現的。
選品系統在項目立項後也設計有多套技術方案。通過多輪討論後,最終決定對兩套方案實施驗證後決定最終實現方案。
第一套方案是基於PostgreSQL的方案,PostgreSQL能夠很便捷的定義Function進行數據合併操做,在PostgreSQL的trigger上定義執行規則邏輯。基於PostgreSQL的技術實現較複雜,但能知足功能需求。不過性能測試結果顯示PostgreSQL處理小數據量(百萬級)性能較好;當trigger數量多、trigger邏輯複雜或處理億級別數據時,PostgreSQL的性能會有較大下滑,不能知足秒級選品的性能指標。所以基於PostgreSQL的方案被否決(在閒魚小商品池場景中仍在使用)。
第二套方案是基於Blink流計算方案,經過驗證發現Blink SQL很適合用來表達數據處理邏輯並且Blink性能很好,綜合對比以後最終選擇Blink流計算方案做爲實際實施的技術方案。
爲了配合使用流計算方案,馬赫系統通過設計和解耦,無縫對接Blink計算引擎。其中數據處理模塊是馬赫系統核心功能模塊,負責接入商品相關各種數據、校驗數據、合併數據、執行規則和處理執行結果並輸出等步驟,因此數據處理模塊的處理速度和延時在很大程度上能表明馬赫系統數據處理速度和延時。接下來咱們看下數據處理模塊如何與Blink深度結合將數據處理延遲降到秒級。
數據處理模塊結構如上圖,包含數據接入層、數據合併層、規則運行層和規則運行結果處理層。每層都針對流計算處理模式進行了單獨設計。
數據接入層是數據處理模塊前置,負責對接多渠道各類類型的業務數據,主要邏輯以下:
這樣設計的考慮是由於業務數據是多種多樣的,好比商品信息包括數據庫的商品表記錄、商品變動的MQ消息和算法產生的離線數據,若是直接經過Blink對接這些業務數據源的話,須要建立多個Blink任務來對接不一樣類型業務數據源,這種處理方式過重,並且數據接入邏輯與Blink緊耦合,不夠靈活。
數據接入層能夠很好的解決上述問題,數據接入層能夠靈活接入多種業務數據,而且將數據接入與Blink解耦,最終經過同一個Topic發出消息。而Blink任務只要監聽對應的Topic就能夠接二連三的收到業務數據流,觸發接下來的數據處理流程。
數據合併是數據處理流程的重要步驟,數據合併的主要做用是將商品的最新信息與內存中保存的商品信息合併供後續規則運行使用。數據合併主要邏輯是:
{key:[timestamp, value]}
,key是字段名稱,value是字段值,timestamp爲字段數據產生時間戳;數據合併有幾個前提:
舉例來講,內存中保存的商品ID=1的信息是{"desc": [1, "描述1"], "price": [4, 100.5]},數據流中商品ID=1的信息是{"desc": [2, "描述2"], "price": [3, 99.5]},那麼合併結果就是{"desc": [2, "描述2"], "price": [4, 100.5]},每一個字段的值都是最新的,表明商品當前最新信息。
當商品信息發生變化後,最新數據由數據接入層流入,經過數據合併層將數據合併到內存,Blink內存中保存的是商品當前最新的所有數據。
規則運行層是數據處理流程核心模塊,經過規則運算得出商品對各規則命中結果,邏輯以下:
這裏的規則指的是運營建立的業務規則,好比商品價格大於50且狀態爲在線。規則的輸入是通過數據合併後的商品數據,輸出是true或false,便是否命中規則條件。規則表明的是業務投放場景,馬赫系統的業務價值就是在商品發生變動後儘快判斷是否命中以前未命中的規則或是不命中以前已經命中的規則,並將命中和不命中結果儘快體現到投放場景中。
規則運行需利用Blink強大算力來保證快速執行,馬赫系統當前有將近300條規則,並且還在快速增加。這意味着每一個商品發生變動後要在Blink上運行成百上千條規則,閒魚天天有上億商品發生變動,這背後須要的運算量是很是驚人的。
讀者讀到這裏可能會奇怪,明明通過規則運行以後直接把運行結果輸出到投放場景就能夠了,不須要運行結果處理層。實際上運行結果處理層是數據處理模塊最重要的部分。
由於在實際場景中,商品的變動在大部分狀況只會命中不多一部分規則,並且命中結果也不多會變化。也就是說商品對不少規則的命中結果是沒有意義的,若是將這些命中結果也輸出的話,只會增長操做TPS,對實際結果沒有任何幫助。而篩選出有效的運行結果,這就是運行結果處理層的做用。運行結果處理層邏輯以下:
運行結果處理層利用Blink內存保存商品上一次變動後規則運行結果,並將當前變動後規則運行結果與內存中結果進行比較,計算出有效運行結果。舉例來講,商品A上一次變動後規則命中結果爲{"rule1":true, "rule2":true, "rule3":false, "rule4":false},當前變動後規則命中結果爲{"rule1":true, "rule2":false, "rule3":false, "rule4":true}。由於商品A變動後對rule1和rule3的命中結果沒有變化,因此實際有效的命中結果是{"rule2":false, "rule4":true},經過運行結果處理層處理後輸出的是有效結果的最小集,能夠極大減少無效結果輸出,提升數據處理的總體性能和效率。
雖然閒魚實時選品系統在立項之初通過預研和論證,但由於使用不少新技術框架和流計算思路,在開發過程當中遇到一些難題,包括設計和功能實現方面的,不少是設計流計算系統的典型問題。咱們就其中一個問題與各位讀者探討-規則公式轉換。
這個問題的業務場景是:運營同窗在馬赫系統頁面上篩選商品字段後保存規則,服務端是已有的老系統,邏輯是根據規則生成一段SQL,SQL的where條件和運營篩選條件相同。SQL有兩方面的做用,一方面是做爲離線規則,在離線數據庫中執行SQL篩選符合規則的離線商品數據;另外一方面是轉換成在線規則,在Blink任務中對實時商品變動數據執行規則以判斷是否命中。
由於實時規則運行使用的是MVEL表達式引擎,MVEL表達式是類Java語法的,因此問題就是將離線規則的SQL轉換成在線規則的Java表達式,二者邏輯需一致,而且需兼顧性能和效率。問題的解決方案很明確,解析SQL後將SQL操做符轉換成Java操做符,並將SQL特有語法轉成Java語法,例如A like '%test%'轉成A.contains('test')。
這個問題的難點是如何解析SQL和將解析後的語義轉成Java語句。通過調研以後給出了簡單而優雅的解決方案,主要步驟以下:
經過後序遍歷算法遍歷where條件子樹;
實際運行結果以下:
代碼邏輯以下(主要是二叉樹後續遍歷和操做符轉換,再也不詳細解釋):
馬赫系統上線以來,已經支持近400場活動和投放場景,天天處理近1.4億條消息,峯值TPS達到50000。馬赫系統已經成爲閒魚選品投放的重要支撐。
本文主要闡述馬赫系統中數據處理的具體設計方案,說明總體設計的前因後果。雖然閒魚實時選品系統針對的是商品選品,但數據處理流計算技術方案的輸入是MQ消息,輸出也是MQ消息,不與具體業務綁定,因此數據處理流計算技術方案不僅適用於商品選品,也適合其餘相似實時篩選業務場景。但願咱們的技術方案和設計思路能給你帶來一些想法和思考,也歡迎和咱們留言討論,謝謝。
原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。