Spark技術在京東智能供應鏈預測的應用

1 背景

 前段時間京東公開了面向第二個十二年的戰略規劃,表示京東將全面走向技術化,大力發展人工智能和機器人自動化技術,將過去傳統方式構築的優點全面升級。京東Y事業部順勢成立,該事業部將以服務泛零售爲核心,着重智能供應能力的打造,核心使命是利用人工智能技術來驅動零售革新。算法

1.1   京東的供應鏈

京東一直致力於經過互聯網電商創建需求側與供給側的精準、高效匹配,供應鏈管理是零售聯調中的核心能力,是零售平臺能力的關鍵體現,也是供應商與京東緊密合做的紐帶,更是將來京東智能化商業體佈局中的核心環節。編程

目前京東在全國範圍內的倉庫數量已超過700個,按功能可劃分爲RDCFDC、大件中心倉、大件衛星倉、圖書倉和城市倉等等。RDCRegional Distribution Center)即區域分發中心,可理解爲一級倉庫,向供貨商採購的商品會優先送往這裏,通常設置在中心城市,覆蓋範圍大。FDCForward Distribution Center)即區域運轉中心,可理解爲二級倉庫,覆蓋一些中、小型城市及邊遠地區,一般會根據需求將商品從RDC調配過來。數組

clip_image002

結合人工智能、大數據等技術,京東首先從供貨商那裏合理採購定量的商品到RDC,再根據實際需求調配到FDC,而後運往離客戶最近的配送站,最後快遞員將商品帶到客戶手中。這只是京東供應鏈體系中一個普通的場景,但正由於有這樣的體系,使得京東對用戶的響應速度大大提升,用戶體驗大大提高。微信

1.2   京東供應鏈優化

用戶體驗提高的同時也伴隨着大量資金的投入和成本的提升,成本必須獲得控制,整個體系才能發揮最大的價值,因而對供應鏈的優化就顯得相當重要了。網絡

京東自打創建供應連體系的那一天起,就不斷地進行改進和優化,而且努力深刻到供應鏈的每個環節。優化實際上是一門運籌學問題,需考慮在各類決策目標之間如何平衡以達到最大收益,在這個過程當中須要考慮不少問題,把這些考慮清楚,問題就容易解決了。舉幾個簡單的例子:多線程

l  商品補貨:考慮在什麼時間,給哪一個RDC採購什麼商品,採購量是多少?架構

l  商品調撥:考慮在什麼時間,給哪一個FDC調配什麼商品,調配量是多少?app

l  倉儲運營:在大促來臨之際,倉庫和配送站要增配多少人手、多少輛貨車?框架

雖然看上去這些問題都很容易回答,但仔細想一想卻又很難給出答案,緣由就在於想要作到精確不是那麼容易的事情,就拿補貨來講,補的太多會增長庫存成本,補的太少會增長缺貨成本,只有合理的補貨量才能作到成本最低。機器學習

1.3   預測技術在京東供應鏈的做用

藉助機器學習、大數據等相關技術,京東在不少供應鏈優化問題上都已經實現系統化,由系統自動給出優化建議,並與生產系統相鏈接,實現全流程自動化。在這裏有一項技術起着相當重要的低層支撐做用--預測技術。據粗略估算,1%的預測準確度的提高能夠節約數倍的運營成本。

怎樣理解預測在供應鏈優化中的做用呢?拿商品補貨舉例,一家公司爲了保證庫房不缺貨,可能會頻繁的從供貨商那裏補充大量商品,這樣作雖然不會缺貨,但可能會形成更多賣不出去的商品積壓在倉庫中,從而使商品的週轉率下降,庫存成本增長。反之,這家公司有可能爲了追求零庫存而補不多的商品,但這就可能出現嚴重的缺貨問題,從而使現貨率下降,嚴重影響用戶體驗,缺貨成本增長。因而問題就來了,要補多少商品才合適,什麼時間補貨,這就須要權衡考慮了,最終目的是要使庫存成本和缺貨成本達到一個平衡。

考慮一下極端狀況,等庫存降到零時再去補貨,這時供貨商接到補貨通知後將貨物運往倉庫。可是這麼作有個問題,由於運送過程須要時間,這段時間庫房就缺貨了。那怎麼辦呢?就是利用預測技術。利用預測咱們能夠計算出將來商品在途的這段時間裏銷量大概是多少,而後咱們讓倉庫保證這個量,低於這個量就給供貨商下達補貨通知,因而問題得以解決。總而言之,預測技術在這裏發揮了重要的做用,成爲關鍵的一個環。

2 京東預測系統

2.1 預測系統介紹

clip_image004

預測系統在整個供應鏈體系中處在最底層而且起到一個支撐的做用,支持上層的多個決策優化系統,而這些決策優化系統利用精準的預測數據結合運籌學技術得出最優的決策,並將結果提供給更上層的業務執行系統或是業務方直接使用。

目前,預測系統主要支持三大業務:銷量預測、單量預測和GMV預測。其中銷量預測主要支持商品補貨、商品調撥;單量預測主要支持倉庫、站點的運營管理;GMV預測主要支持銷售部門計劃的定製。

銷量預測按照不一樣維度又能夠分爲RDC採購預測、FDC調撥預測、城市倉調撥預測、大建倉補貨預測、全球購銷量預測和圖書促銷預測等;單量預測又可分爲庫房單量預測、配送中心單量預測和配送站單量預測等(在這裏「單量」並不是指用戶所下訂單的量,而是將訂單拆單後流轉到倉庫中的單量。例如一個用戶的訂單中包括3件物品,其中兩個大件品和一個小件品,在京東的供應鏈環節中可能會將其中兩個大件品組成一個單投放到大件倉中,而將那個小件單獨一個單投放到小件倉中,單量指的是拆單後的量);GMV預測支持到商品粒度。

2.2 預測系統架構

       clip_image006

總體架構從上至下依次是:數據源輸入層、基礎數據加工層、核心業務層、數據輸出層和下游系統。首先從外部數據源獲取咱們所需的業務數據,而後對基礎數據進行加工清洗,再經過時間序列、機器學習等人工智能技術對數據進行處理分析,最後計算出預測結果並經過多種途徑推送給下游系統使用。

l  數據源輸入層:京東數據倉庫中存儲着咱們須要的大部分業務數據,例如訂單信息、商品信息、庫存信息等等。而對於促銷計劃數據則大部分來自於採銷人員經過Web系統錄入的信息。除此以外還有一小部分數據經過文本形式直接上傳到HDFS中。

l  基礎數據加工層:在這一層主要經過Hive對基礎數據進行一些加工清洗,去掉不須要的字段,過濾不須要的維度並清洗有問題的數據。

l  核心業務層:這層是系統的的核心部分,橫向看又可分爲三層:特徵構建、預測算法和預測結果加工。縱向看是由多條業務線組成,彼此之間不發生任何交集。

Ø  特徵構建:將以前清洗過的基礎數據經過近一步的處理轉化成標準格式的特徵數據,提供給後續算法模型使用。

Ø  核心算法:利用時間序列分析、機器學習等人工智能技術進行銷量、單量的預測,是預測系統中最爲核心的部分。

Ø  預測結果加工:預測結果可能在格式和一些特殊性要求上不能知足下游系統,因此還須要根據實際狀況對其進行加工處理,好比增長標準差、促銷標識等額外信息。

l  預測結果輸出層:將最終預測結果同步回京東數據倉庫、MySqlHBase或製做成JSF接口供其餘系統遠程調用。

l  下游系統:包括下游任務流程、下游Web系統和其餘系統。

3 預測系統核心介紹

3.1 預測系統核心層技術選型

clip_image008

預測系統核心層技術主要分爲四層:基礎層、框架層、工具層和算法層

基礎層:

HDFS用來作數據存儲,Yarn用來作資源調度,BDPBig Data Platform)是京東本身研發的大數據平臺,咱們主要用它來作任務調度。

框架層:  

Spark RDDSpark SQLHive爲主, MapReduce程序佔一小部分,是原先遺留下來的,目前正逐步替換成Spark RDD。選擇Spark除了對性能的考慮外,還考慮了Spark程序開發的高效率、多語言特性以及對機器學習算法的支持。在Spark開發語言上咱們選擇了Python,緣由有如下三點:

l  Python有不少不錯的機器學習算法包可使用,比起SparkMLlib,算法的準確度更高。咱們用GBDT作過對比,發現xgboostMLlib裏面提供的提高樹模型預測準確度高出大概5%~10%。雖然直接使用Spark自帶的機器學習框架會節省咱們的開發成本,但預測準確度對於咱們來講相當重要,每提高1%的準確度,就可能會帶來成本的成倍下降。

l  咱們的團隊中包括開發工程師和算法工程師,對於算法工程師而言他們更擅長使用Python進行數據分析,使用JavaScala會有不小的學習成本。

l  對比其餘語言,咱們發現使用Python的開發效率是最高的,而且對於一個新人,學習Python比學習其餘語言更加容易。

工具層:

一方面咱們會結合自身業務有針對性的開發一些算法,另外一方面咱們會直接使用業界比較成熟的算法和模型,這些算法都封裝在第三方Python包中。咱們比較經常使用的包有xgboostnumpypandassklearnscipyhyperopt

Xgboost:它是Gradient Boosting Machine的一個C++實現,xgboost最大的特色在於,它可以自動利用CPU的多線程進行並行,同時在算法上加以改進提升了精度。

numpy:是Python的一種開源的數值計算擴展。這種工具可用來存儲和處理大型矩陣,比Python自身的嵌套列表結構要高效的多(該結構也能夠用來表示矩陣)。

pandas:是基於NumPy 的一種工具,該工具是爲了解決數據分析任務而建立的。Pandas 歸入了大量庫和一些標準的數據模型,提供了高效地操做大型數據集所需的工具。

sklearn:是Python重要的機器學習庫,支持包括分類、迴歸、降維和聚類四大機器學習算法。還包含了特徵提取、數據處理和模型評估三大模塊。

scipy:是在NumPy庫的基礎上增長了衆多的數學、科學以及工程計算中經常使用的庫函數。例如線性代數、常微分方程數值求解、信號處理、圖像處理和稀疏矩陣等等。

算法層:

咱們用到的算法模型很是多,緣由是京東的商品品類齊全、業務複雜,須要根據不一樣的狀況採用不一樣的算法模型。咱們有一個獨立的系統來爲算法模型與商品之間創建匹配關係,有些比較複雜的預測業務還須要使用多個模型。咱們使用的算法整體上能夠分爲三類:時間序列、機器學習和結合業務開發的一些獨有的算法。

1.     機器學習算法主要包括GBDTLASSORNN

GBDT是一種迭代的決策樹算法,該算法由多棵決策樹組成,全部樹的結論累加起來作最終答案。咱們用它來預測高銷量,但歷史規律不明顯的商品。

RNN這種網絡的內部狀態能夠展現動態時序行爲。不一樣於前饋神經網絡的是,RNN能夠利用它內部的記憶來處理任意時序的輸入序列,這讓它能夠更容易處理如時序預測、語音識別等。

LASSO:該方法是一種壓縮估計。它經過構造一個罰函數獲得一個較爲精煉的模型,使得它壓縮一些係數,同時設定一些係數爲零。所以保留了子集收縮的優勢,是一種處理具備復共線性數據的有偏估計。用來預測低銷量,歷史數據平穩的商品效果較好。

2.     時間序列主要包括ARIMAHolt winters

ARIMA全稱爲自迴歸積分滑動平均模型,於70年代初提出的一個著名時間序列預測方法,咱們用它來主要預測相似庫房單量這種平穩的序列。

Holt winters又稱三次指數平滑算法,也是一個經典的時間序列算法,咱們用它來預測季節性和趨勢都很明顯的商品。

3.     結合業務開發的獨有算法包括WMAStockDTSimilarityModelNewProduct等:

WMAStockDT庫存決策樹模型,用來預測受庫存狀態影響較大的商品。

SimilarityModel類似品模型,使用指定的同類品數據來預測某商品將來銷量。NewProduct新品模型,顧名思義就是用來預測新品的銷量。

3.2 預測系統核心流程

預測核心流程主要包括兩類:以機器學習算法爲主的流程和以時間序列分析爲主的流程。

1.     以機器學習算法爲主的流程以下:

       clip_image010

特徵構建:經過數據分析、模型試驗肯定主要特徵,經過一系列任務生成標準格式的特徵數據。

模型選擇:不一樣的商品有不一樣的特性,因此首先會根據商品的銷量高低、新品舊品、假節日敏感性等因素分配不一樣的算法模型。

特徵選擇:對一批特徵進行篩選過濾不須要的特徵,不一樣類型的商品特徵不一樣。

樣本分區:對訓練數據進行分組,分紅多組樣本,真正訓練時針對每組樣本生成一個模型文件。通常是同類型商品被分紅一組,好比按品類維度分組,這樣作是考慮並行化以及模型的準確性。

模型參數:選擇最優的模型參數,合適的參數將提升模型的準確度,由於須要對不一樣的參數組合分別進行模型訓練和預測,因此這一步是很是耗費資源。

模型訓練:待特徵、模型、樣本都肯定好後就能夠進行模型訓練,訓練每每會耗費很長時間,訓練後會生成模型文件,存儲在HDFS中。

模型預測:讀取模型文件進行預測執行。

多模型擇優:爲了提升預測準確度,咱們可能會使用多個算法模型,當每一個模型的預測結果輸出後系統會經過一些規則來選擇一個最優的預測結果。

預測值異常攔截:咱們發現越是複雜且不易解釋的算法越容易出現極個別預測值異常偏高的狀況,這種預測偏高沒法結合歷史數據進行解釋,所以咱們會經過一些規則將這些異常值攔截下來,而且用一個更加保守的數值代替。

模型評價:計算預測準確度,咱們一般用使用mapd來做爲評價指標。

偏差分析:經過分析預測準確度得出一個偏差在不一樣維度上的分佈,以便給算法優化提供參考依據。

2.     以時間序列分析爲主的預測流程以下:

clip_image012

生成歷史時序:將歷史銷量、價格、庫存等數據按照規定格式生成時序數據。

節假日因子:計算節假日與銷量之間的關係,用來平滑節假日對銷量影響。

週日因子:計算週一到週日這7天與銷量的關係,用來平滑週日對銷量的影響。

促銷因子:計算促銷與銷量之間的關係,用來平滑促銷對銷量的影響。

因子平滑:歷史銷量是不穩定的,會受到節假日、促銷等影響,在這種狀況下進行預測有很大難度,因此須要利用以前計算的各種因子對歷史數據進行平滑處理。

時序預測:在一個相對平穩的銷量數據上經過算法進行預測。

因子疊加:結合將來節假日、促銷計劃等因素對預測結果進行調整。

3.3 Spark在預測核心層的應用

咱們使用Spark SQLSpark RDD相結合的方式來編寫程序,對於通常的數據處理,咱們使用Spark的方式與其餘無異,可是對於模型訓練、預測這些須要調用算法接口的邏輯就須要考慮一下並行化的問題了。咱們平均一個訓練任務在一天處理的數據量大約在500G左右,雖然數據規模不是特別的龐大,可是Python算法包提供的算法都是單進程執行。咱們計算過,若是使用一臺機器訓練所有品類數據須要一個星期的時間,這是沒法接收的,因此咱們須要藉助Spark這種分佈式並行計算框架來將計算分攤到多個節點上實現並行化處理。

咱們實現的方法很簡單,首先須要在集羣的每一個節點上安裝所需的所有Python包,而後在編寫Spark程序時考慮經過某種規則將數據分區,好比按品類維度,經過groupByKey操做將數據從新分區,每個分區是一個樣本集合並進行獨立的訓練,以此達到並行化。流程以下圖所示:

 

clip_image014

僞碼以下: 

sc.textFile("...").map(lambda x: repartitionBy(x)).groupByKey()
  .map(lambda x: train(x)).saveAsPickleFile("...")

repartitionBy方法即設置一個重分區的邏輯返回(K,V)結構RDDtrain方法是訓練數據,在train方法裏面會調用Python算法包接口。saveAsPickleFileSpark Python獨有的一個Action操做,支持將RDD保存成序列化後的sequnceFile格式的文件,在序列化過程當中會以10個一批的方式進行處理,保存模型文件很是適合。

雖然原理簡單,但存在着一個難點,即以什麼樣的規則進行分區,key應該如何設置。爲了解決這個問題咱們須要考慮幾個方面,第一就是哪些數據應該被聚合到一塊兒進行訓練,第二就是如何避免數據傾斜。

針對第一個問題咱們作了以下幾點考慮:

l  被分在一個分區的數據要有必定的類似性,這樣訓練的效果纔會更好,好比按品類分區就是個典型例子。

l  分析商品的特性,根據特性的不一樣選擇不一樣的模型,例如高銷商品和低銷商品的預測模型是不同的,即便是同一模型使用的特徵也可能不一樣,好比對促銷敏感的商品就須要更多與促銷相關特徵,相同模型相同特徵的商品應傾向於分在一個分區中。

針對第二個問題咱們採用了以下的方式解決:

l  對於數據量過大的分區進行隨機抽樣選取。

l  對於數據量過大的分區還能夠作二次拆分,好比圖書小說這個品類數據量明顯大於其餘品類,因而就能夠分析小說品類下的子品類數據量分佈狀況,並將子品類合併成新的幾個分區。

l  對於數據量太小這種狀況則須要考慮進行幾個分區數據的合併處理。

總之對於後兩種處理方式能夠單獨經過一個Spark任務按期運行,並將這種分區規則保存。

4 結合圖解Spark書進行應用與優化

《圖解Spark:核心技術與案例實戰》一書以Spark2.0版本爲基礎進行編寫,系統介紹了Spark核心及其生態圈組件技術。其內容包括Spark生態圈、實戰環境搭建和編程模型等,重點介紹了做業調度、容錯執行、監控管理、存儲管理以及運行架構,同時還介紹了Spark生態圈相關組件,包括了Spark SQL的即席查詢、Spark Streaming的實時流處理、MLlib的機器學習、GraphX的圖處理和Alluxio的分佈式內存文件系統等。下面介紹京東預測系統如何進行資源調度,並描述如何使用Spark存儲相關知識進行系統優化。

4.1 結合系統中的應用

在圖解Spark書的第六章描述了Spark運行架構,介紹了Spark集羣資源調度通常分爲粗粒度調度和細粒度調度兩種模式。粗粒度包括了獨立運行模式和Mesos粗粒度運行模式,在這種狀況下以整個機器做爲分配單元執行做業,該模式優勢是因爲資源長期持有減小了資源調度的時間開銷,缺點是該模式中沒法感知資源使用的變化,易形成系統資源的閒置,從而形成了資源浪費。而細粒度包括了Yarn運行模式和Mesos細粒度運行模式,該模式的優勢是系統資源可以獲得充分利用,缺點是該模式中每一個任務都須要從管理器獲取資源,調度延遲較大、開銷較大。

因爲京東Spark集羣屬於基礎平臺,在公司內部共享這些資源,因此集羣採用的是Yarn運行模式,在這種模式下能夠根據不一樣系統所須要的資源進行靈活的管理。在YARN-Cluster模式中,當用戶向YARN集羣中提交一個應用程序後,YARN集羣將分兩個階段運行該應用程序:第一個階段是把SparkSparkContext做爲Application MasterYARN集羣中先啓動;第二個階段是由Application Master建立應用程序,而後爲它向Resource Manager申請資源,並啓動Executor來運行任務集,同時監控它的整個運行過程,直到運行完成。下圖爲Yarn-Cluster運行模式執行過程:

clip_image016

4.2   結合系統的優化

咱們都知道大數據處理的瓶頸在IO。咱們藉助Spark能夠把迭代過程當中的數據放在內存中,相比MapReduce寫到磁盤速度提升近兩個數量級;另外對於數據處理過程儘量避免Shuffle,若是不能避免則Shuffle前儘量過濾數據,減小Shuffle數據量;最後,就是使用高效的序列化和壓縮算法。在京東預測系統主要就是圍繞這些環節展開優化,相關Spark存儲原理知識能夠參見圖解Spark書第五章的詳細描述。

因爲資源限制,分配給預測系統的Spark集羣規模並非很大,在有限的資源下運行Spark應用程序確實是一個考驗,由於在這種狀況下常常會出現諸如程序計算時間太長、找不到Executor等錯誤。咱們經過調整參數、修改設計和修改程序邏輯三個方面進行優化:

   4.2.1 參數調整

l  減小num-executors,調大executor-memory,這樣的目的是但願Executor有足夠的內存可使用。

l  查看日誌發現沒有足夠的空間存儲廣播變量,分析是因爲Cache到內存裏的數據太多耗盡了內存,因而咱們將Cache的級別適當調成MEMORY_ONLY_SERDISK_ONLY

l  針對某些任務關閉了推測機制,由於有些任務會出現暫時沒法解決的數據傾斜問題,並不是節點出現問題。

l  調整內存分配,對於一個Shuffle不少的任務,咱們就把Cache的內存分配比例調低,同時調高Shuffle的內存比例。

  4.2.2 修改設計

參數的調整雖然容易作,但每每效果很差,這時候須要考慮從設計的角度去優化:

l  原先在訓練數據以前會先讀取歷史的幾個月甚至幾年的數據,對這些數據進行合併、轉換等一系列複雜的處理,最終生成特徵數據。因爲數據量龐大,任務有時會報錯。通過調整後當天只處理當天數據,並將結果保存到當日分區下,訓練時按天數須要讀取多個分區的數據作union操做便可。

l  將「模型訓練」從天天執行調整到每週執行,將「模型參數選取」從每週執行調整到每個月執行。由於這兩個任務都十分消耗資源,而且屬於不須要頻繁運行,這麼作雖然準確度會略微下降,但都在可接受範圍內。

l  經過拆分任務也能夠很好的解決資源不夠用的問題。能夠橫向拆分,好比原先是將100個品類數據放在一個任務中進行訓練,調整後改爲每10個品類提交一次Spark做業進行訓練。這樣雖然總體執行時間變長,可是避免了程序異常退出,保證任務能夠執行成功。除了橫向還能夠縱向拆分,即將一個包含10StageSpark任務拆分紅兩個任務,每一個任務包含5Stage,中間數據保存到HDFS中。

4.2.3 修改程序邏輯

爲了進一步提升程序的運行效率,經過修改程序的邏輯來提升性能,主要是在以下方面進行了改進:避免過多的Shuffle、減小Shuffle時須要傳輸的數據和處理數據傾斜問題等。

    1 避免過多的Shuffle

l  Spark提供了豐富的轉換操做,可使咱們完成各種複雜的數據處理工做,可是也正由於如此咱們在寫Spark程序時候可能會遇到一個陷阱,那就是爲了使代碼變的簡潔過度依賴RDD的轉換操做,使原本僅需一次Shuffle的過程變爲了執行屢次。咱們就曾經犯過這樣一個錯誤,原本能夠經過一次groupByKey完成的操做卻使用了兩回。業務邏輯是這樣的:咱們有三張表分別是銷量(s)、價格(p)、庫存(v),每張表有3個字段:商品idsku_id)、品類idcategory)和歷史時序數據(data),如今須要按sku_idspv數據合併,而後再按category再合併一次,最終的數據格式是:[category[[sku_id, s , p, v], [sku_id, s , p, v], […][…]]]。一開始咱們先按照sku_id + category做爲key進行一次groupByKey,將數據格式轉換成[sku_id, category , [sp, v]],而後按category做爲keygroupByKey一次。後來咱們修改成按照category做爲key只進行一次groupByKey,由於一個sku_id只會屬於一個category,因此後續的map轉換裏面只須要寫一些代碼將相同sku_idspv數據group到一塊兒就能夠了。

兩次groupByKey的狀況:

clip_image018

修改後變爲一次groupByKey的狀況:

clip_image020

l  多表join時,若是key值相同,則可使用union+groupByKey+flatMapValues形式進行。好比:須要將銷量、庫存、價格、促銷計劃和商品信息經過商品編碼鏈接到一塊兒,一開始使用的是join轉換操做,將幾個RDD彼此join在一塊兒。後來發現這樣作運行速度很是慢,因而換成union+groypByKey+flatMapValue形式,這樣作只需進行一次Shuffle,這樣修改後運行速度比之前快多了。

實例代碼以下:

rdd1 = rdd1.mapValues(lambda x: (1, x))
rdd2 = rdd2.mapValues(lambda x: (2, x))
rdd3 = rdd3.mapValues(lambda x: (3, x))
def dispatch(seq):
    vbuf, wbuf, xbuf = [], [], []
    for (n, v) in seq:           
        if n == 1: vbuf.append(v)        
        elif n == 2: wbuf.append(v)         
        elif n == 3: xbuf.append(v)    
    return [(v, w, x) for v in vbuf for w in wbuf for x in xbuf]
new_rdd = sc.union([rdd1, rdd3, rdd3]).groupByKey().flatMapValues(lambda x: dispatch(x))

l  若是兩個RDD須要在groupByKey後進行join操做,可使用cogroup轉換操做代替。好比, 將歷史銷量數據按品類進行合併,而後再與模型文件進行join操做,流程以下:

[sku_id, category , sales] 
-> [category, [[sku_id, sales], […]]] 
-> [category, [[[sku_id, sales], […]], [model1, model2]]]

 

使用cogroup後,通過一次Shuffle就可完成了兩步操做,性能大幅提高。

    2.  減小Shuffle時傳輸的數據量

l  Shuffle操做前儘可能將不須要的數據過濾掉。

l  使用comebineyeByKey能夠高效率的實現任何複雜的聚合邏輯。

comebineyeByKey屬於聚合類操做,因爲它支持map端的聚合因此比groupByKey性能好,又因爲它的map端與reduce端能夠設置成不同的邏輯,因此它支持的場景比reduceByKey多,它的定義以下 

def combineByKey(self, createCombiner, mergeValue,mergeCombiners, numPartitions)

 reduceByKeygroupByKey內部實際是調用了comebineyeByKey

def reduceByKey(self, func, numPartitions=None):
      return self.combineByKey(lambda x: x, func, func, numPartitions)
def groupByKey(self, numPartitions=None):
     def createCombiner(x):
          return [x]
     def mergeValue(xs, x):
          xs.append(x)
          return xs
     def mergeCombiners(a, b):
          a.extend(b)
          return a
     return self.combineByKey(createCombiner, mergeValue,  mergeCombiners,numPartitions).mapValues(lambda x: ResultIterable(x))

咱們以前有不少複雜的沒法用reduceByKey來實現的聚合邏輯都經過groupByKey來完成的,後來所有替換爲comebineyeByKey後性能提高了很多。

     3.   處理數據傾斜

有些時候通過一系列轉換操做以後數據變得十分傾斜,在這樣狀況下後續的RDD計算效率會很是的糟糕,嚴重時程序報錯。遇到這種狀況一般會使用repartition這個轉換操做對RDD進行從新分區,從新分區後數據會均勻分佈在不一樣的分區中,避免了數據傾斜。若是是減小分區使用coalesce也能夠達到效果,但比起repartition不足的是分配不是那麼均勻。

5  小結

雖然京東的預測系統已經穩定運行了很長一段時間,可是咱們也看到系統自己還存在着不少待改進的地方,接下來咱們會在預測準確度的提升、系統性能的優化、多業務支持的便捷性上進行改進。將來,隨着大數據、人工智能技術在京東供應鏈管理中的使用愈來愈多,預測系統也將發揮出更大做用,對於京東預測系統的研發工做也將是充滿着挑戰與樂趣。

 

該文本人發表於2017-03-06 InfoQ的大數據雜談公衆號,微信號:BigdataTina2016

連接地址爲 https://mp.weixin.qq.com/s/35c06LQHVsyG-dy4FgJZnA

相關文章
相關標籤/搜索