結合前文講述的數據源特色、分類、採集方式、存儲選型、數據分析、數據處理,我在這裏給出一個整體的大數據平臺的架構。值得注意的是,架構圖中去掉了監控、資源協調、安全日誌等。 node
左側是數據源,有實時流的數據(多是結構化、非結構化,但其特色是實時的),有離線數據,離線數據通常採用的多爲ETL的工具,常見的作法是在大數據平臺裏使用Sqoop或Flume去同步數據,或調一些NIO的框架去讀取加載,而後寫到HDFS裏面,固然也有一些特別的技術存儲的類型,好比HAWQ就是一個支持分佈式、支持事務一致性的開源數據庫。
從業務場景來看,若是咱們作統計分析,就可使用SQL或MapReduce或streaming或Spark。若是作查詢檢索,同步寫到HDFS的同時還要考慮寫到ES裏。若是作數據分析,能夠建一個Cube,而後再進入OLAP的場景。 算法
Lambda架構的主要思想是將大數據系統架構爲多層個層次,分別爲批處理層(batchlayer)、實時處理層(speedlayer)、服務層(servinglayer)如圖(C)。sql
理想狀態下,任何數據訪問均可以從表達式Query= function(alldata)開始,可是,若數據達到至關大的一個級別(例如PB),且還須要支持實時查詢時,就須要耗費很是龐大的資源。一個解決方式是預運算查詢函數(precomputedquery funciton)。書中將這種預運算查詢函數稱之爲Batch View(A),這樣當須要執行查詢時,能夠從BatchView中讀取結果。這樣一個預先運算好的View是能夠創建索引的,於是能夠支持隨機讀取(B)。因而系統就變成:數據庫
(A)batchview = function(all data);編程
(B)query =function(batch view)。後端
下圖給出了Lambda架構中各組件在大數據生態系統中和阿里集團的經常使用組件。數據流存儲選用不可變日誌的分佈式系統Kafa、TT、Metaq;BatchLayer數據集的存儲選用Hadoop的HDFS或者阿里雲的ODPS;BatchView的加工採用MapReduce;BatchView數據的存儲採用Mysql(查詢少許的最近結果數據)、Hbase(查詢大量的歷史結果數據)。SpeedLayer採用增量數據處理Storm、Flink;RealtimeView增量結果數據集採用內存數據庫Redis。數組
圖(H)緩存
Lambda是一個通用框架,各模塊選型不要侷限於上面給出的組件,特別是view的選型。由於View是和各業務關聯很是大的概念,View選擇組件時要根據業務的需求,選擇最合適的組件。安全
優勢:服務器
a、數據的不可變性。裏面給出的數據傳輸模型是在初始化階段對數據進行實例化,這樣的作法是能獲益良多的。可以使得大量的MapReduce工做變得有跡可循,從而便於在不一樣階段進行獨立調試。
b、強調了數據的從新計算問題。在流處理中從新計算是個主要挑戰,可是常常被忽視。比方說,某工做流的數據輸出是由輸入決定的,那麼一旦代碼發生改動,咱們將不得不從新計算來檢視變動的效度。什麼狀況下代碼會改動呢?例如需求發生變動,計算字段須要調整或者程序發出錯誤,須要進行調試。
缺點:
a、Jay Kreps認爲Lambda包含固有的開發和運維的複雜性。Lambda須要將全部的算法實現兩次,一次是爲批處理系統,另外一次是爲實時系統,還要求查詢獲得的是兩個系統結果的合併。
因爲存在以上缺點,Linkedin的Jaykreps提出了Kappa架構如圖(I):
圖(I)
1、使用Kafka或其它系統來對須要從新計算的數據進行日誌記錄,以及提供給多個訂閱者使用。例如須要從新計算30天內的數據,咱們能夠在Kafka中設置30天的數據保留值。
2、當須要進行從新計算時,啓動流處理做業的第二個實例對以前得到的數據進行處理,以後直接把結果數據放入新的數據輸出表中。
3、看成業完成時,讓應用程序直接讀取新的數據記錄表。
4、中止歷史做業,刪除舊的數據輸出表。
Kappa架構暫時未作深刻了解,在此不作評價。我我的以爲,不一樣的數據架構有各自的優缺點,咱們使用的時候只能根據應用場景,選擇更合適的架構,才能揚長避短。
在具體介紹本文內容以前,先給你們看一下Hadoop業務的總體開發流程:
從Hadoop的業務開發流程圖中能夠看出,在大數據的業務處理過程當中,對於數據的採集是十分重要的一步,也是不可避免的一步,從而引出咱們本文的主角—Flume。本文將圍繞Flume的架構、Flume的應用(日誌採集)進行詳細的介紹。
flume的特色:
flume是一個分佈式、可靠、和高可用的海量日誌採集、聚合和傳輸的系統。支持在日誌系統中定製各種數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各類數據接受方(好比文本、HDFS、Hbase等)的能力 。
flume的數據流由事件(Event)貫穿始終。事件是Flume的基本數據單位,它攜帶日誌數據(字節數組形式)而且攜帶有頭信息,這些Event由Agent外部的Source生成,當Source捕獲事件後會進行特定的格式化,而後Source會把事件推入(單個或多個)Channel中。你能夠把Channel看做是一個緩衝區,它將保存事件直到Sink處理完該事件。Sink負責持久化日誌或者把事件推向另外一個Source。
先看一下咱們數據處理的主要步驟,首先是咱們SDK採集數據,採集數據以後,首先把它扔到咱們的消息隊列裏作一個基礎的持久化,以後咱們會有兩部分,一部分是實時統計,一部分是離線統計,這兩部分統計完以後會把統計結果存下來,而後提供給咱們的查詢服務,最後是咱們外部展現界面。咱們的數據平臺主要基於中間的四個綠色的部分。
關於要求,對消息隊列來講確定是吞吐量必定要大,要很是好的擴展性,若是有一個消息的波峯的話要隨時可以擴展,由於全部的東西都是分佈式的,因此要保證節點故障不會影響咱們正常的業務。
咱們的實時計算目前採用的是分鐘級別的實時,沒有精確到秒級,離線計算須要計算速度很是快,這兩部分咱們當初在考慮的時候就選用了Spark,由於Spark自己既支持實時,又支持離線,並且相對於其餘的實時的方案來講,像Flink或者是Storm和Samza來講,咱們不須要到秒級的這種實時,咱們須要的是吞吐量,因此咱們選擇Spark。實時部分用的是Spark streaming,離線部分用的是Spark offline的方案。
查詢方案由於咱們要支持多個維度的組合排序,因此咱們但願支持sql,這樣的話各類組合排序就能夠轉化成sql的group和order操做。
消息隊列咱們選擇的是Kafka,由於在咱們看來,Kafka目前是最成熟的分佈式消息隊列方案,並且它的性能、擴展性也很是好,並且支持容錯方案,你能夠經過設置冗餘來保證數據的完整性。 Kafka目前獲得了全部主流流式計算框架的支持,像Spark, Flink, Storm, Samza等等;另一個就是咱們公司的幾個創始人都來自於LinkedIn,他們以前在LinkedIn的時候就已經用過Kafka,對Kafka很是熟,因此咱們選擇了Kafka。
但選定Kafka以後咱們發現了一個問題就是消息時序的問題。首先咱們的數據採集 程中,由於不一樣的用戶網絡帶寬不同,數據多是有延遲的,晚到的消息反而可能更早發生,並且Kafka不一樣的partition之間是不保證時序的。
可是咱們全部的離線統計程序都是須要按時間統計的,因此咱們就須要一個支持時序的數據庫幫咱們把數據排好序,這裏咱們選了HBase。咱們用消息產生的時間加上咱們生成消息的ID作成它惟一的row key,進行排序和索引。
對於sql的方案來講,咱們選擇的是Phoenix。選Phoenix是由於咱們考慮了目前幾個SQL On HBase的方案,咱們發現Phoenix的效率很是好,是由於它充分的利用了HBase coprocessor的特性,在server端進行了大量的計算,因此大量減輕了client的數據壓力還有計算壓力。
還有就是它支持HBase的Column Family概念,好比說咱們要支持40個緯度的時候咱們會有一張大寬表,若是咱們把全部的列都設置一個列族的話,在查詢任意一個列的時候都須要把40列的數據都讀出來,這樣是得不償失的,因此Phoenix支持Column Family的話,咱們就能夠把不一樣的列根據它們的相關性分紅幾個列族,查詢的時候可能只會命中一個到兩個列族,這樣大大減小了讀取量。
Phoenix還支持Spark的DataSource API,支持列剪枝和行過濾的功能,並且支持數據寫入。什麼是Spark的DataSource API呢, Spark在1.2的時候提供了DataSource API,它主要是給Spark框架提供一種快速讀取外界數據的能力,這個API能夠方便的把不一樣的數據格式經過DataSource API註冊成Spark的表,而後經過Spark SQL直接讀取。它能夠充分利用Spark分佈式的優勢進行併發讀取,並且Spark自己有一個很好的優化引擎,可以極大的加快Spark SQL的執行。
由於Spark最近很是的火,因此它的社區資源很是的多,基本上全部主流的框架,像咱們常見的Phoenix,Cassandra, MongoDB都有Spark DataSource相關的實現。還有一個就是它提供了一個統一的數據類型,把全部的外部表都統一轉化成Spark的數據類型,這樣的話不一樣的外部表可以相互的關聯和操做。
在通過上述的思考以後,咱們選擇了這樣的一個數據框架。
首先咱們最下面是三個SDK,JS、安卓和iOS,採集完數據以後會發到咱們的負載均衡器,咱們的負載均衡器用的是AWS,它會自動把咱們這些數據發到咱們的server端,server在收集完數據以後會進行一個初步的清洗,把那些不規律的數據給清洗掉,而後再把那些數據發到Kafka裏,後面就進入到咱們的實時和離線過程。
最終咱們的數據會統計到HBase裏面,對外暴露的是一個sql的接口,能夠經過各類sql的組合去查詢所須要的統計數據。目前咱們用的主要版本,Spark用的仍是1.5.1,咱們本身根據咱們本身的業務需求打了一些定製的patch,Hadoop用的仍是2.5.2,HBase是0.98,Phoenix是4.7.0,咱們修復了一些小的bug,以及加了一些本身的特性,打了本身的patch。
Ceph 是一個開源、多管齊下的操做系統,由於其高性能並行文件系統的特性,有人甚至認爲它是基於Hadoop環境下的HDFS的接班人,由於自2010年就有研究者在尋找這個特性。
Apache Flink是一種能夠處理批處理任務的流處理框架。該技術可將批處理數據視做具有有限邊界的數據流,藉此將批處理任務做爲流處理的子集加以處理。爲全部處理任務採起流處理爲先的方法會產生一系列有趣的反作用。
這種流處理爲先的方法也叫作Kappa架構,與之相對的是更加被廣爲人知的Lambda架構(該架構中使用批處理做爲主要處理方法,使用流做爲補充並提供早期未經提煉的結果)。Kappa架構中會對一切進行流處理,藉此對模型進行簡化,而這一切是在最近流處理引擎逐漸成熟後纔可行的。
Flink的流處理模型在處理傳入數據時會將每一項視做真正的數據流。Flink提供的DataStream API可用於處理無盡的數據流。Flink可配合使用的基本組件包括:
爲了在計算過程當中遇到問題後可以恢復,流處理任務會在預約時間點建立快照。爲了實現狀態存儲,Flink可配合多種狀態後端系統使用,具體取決於所需實現的複雜度和持久性級別。
此外Flink的流處理能力還能夠理解「事件時間」這一律念,這是指事件實際發生的時間,此外該功能還能夠處理會話。這意味着能夠經過某種有趣的方式確保執行順序和分組。
Flink的批處理模型在很大程度上僅僅是對流處理模型的擴展。此時模型再也不從持續流中讀取數據,而是從持久存儲中以流的形式讀取有邊界的數據集。Flink會對這些處理模型使用徹底相同的運行時。
Flink能夠對批處理工做負載實現必定的優化。例如因爲批處理操做可經過持久存儲加以支持,Flink能夠不對批處理工做負載建立快照。數據依然能夠恢復,但常規處理操做能夠執行得更快。
另外一個優化是對批處理任務進行分解,這樣便可在須要的時候調用不一樣階段和組件。藉此Flink能夠與集羣的其餘用戶更好地共存。對任務提早進行分析使得Flink能夠查看須要執行的全部操做、數據集的大小,以及下游須要執行的操做步驟,藉此實現進一步的優化。
Flink目前是處理框架領域一個獨特的技術。雖然Spark也能夠執行批處理和流處理,但Spark的流處理採起的微批架構使其沒法適用於不少用例。Flink流處理爲先的方法可提供低延遲,高吞吐率,近乎逐項處理的能力。
Flink的不少組件是自行管理的。雖然這種作法較爲罕見,但出於性能方面的緣由,該技術可自行管理內存,無需依賴原生的Java垃圾回收機制。與Spark不一樣,待處理數據的特徵發生變化後Flink無需手工優化和調整,而且該技術也能夠自行處理數據分區和自動緩存等操做。
Flink會經過多種方式對工做進行分許進而優化任務。這種分析在部分程度上相似於SQL查詢規劃器對關係型數據庫所作的優化,可針對特定任務肯定最高效的實現方法。該技術還支持多階段並行執行,同時可將受阻任務的數據集合在一塊兒。對於迭代式任務,出於性能方面的考慮,Flink會嘗試在存儲數據的節點上執行相應的計算任務。此外還可進行「增量迭代」,或僅對數據中有改動的部分進行迭代。
在用戶工具方面,Flink提供了基於Web的調度視圖,藉此可輕鬆管理任務並查看系統狀態。用戶也能夠查看已提交任務的優化方案,藉此瞭解任務最終是如何在集羣中實現的。對於分析類任務,Flink提供了相似SQL的查詢,圖形化處理,以及機器學習庫,此外還支持內存計算。
Flink能很好地與其餘組件配合使用。若是配合Hadoop 堆棧使用,該技術能夠很好地融入整個環境,在任什麼時候候都只佔用必要的資源。該技術可輕鬆地與YARN、HDFS和Kafka 集成。在兼容包的幫助下,Flink還能夠運行爲其餘處理框架,例如Hadoop和Storm編寫的任務。
目前Flink最大的侷限之一在於這依然是一個很是「年幼」的項目。現實環境中該項目的大規模部署尚不如其餘處理框架那麼常見,對於Flink在縮放能力方面的侷限目前也沒有較爲深刻的研究。隨着快速開發週期的推動和兼容包等功能的完善,當愈來愈多的組織開始嘗試時,可能會出現愈來愈多的Flink部署。
Flink提供了低延遲流處理,同時可支持傳統的批處理任務。Flink也許最適合有極高流處理需求,並有少許批處理任務的組織。該技術可兼容原生Storm和Hadoop程序,可在YARN管理的集羣上運行,所以能夠很方便地進行評估。快速進展的開發工做使其值得被你們關注。
大數據系統可以使用多種處理技術。
對於僅須要批處理的工做負載,若是對時間不敏感,比其餘解決方案實現成本更低的Hadoop將會是一個好選擇。
對於僅須要流處理的工做負載,Storm可支持更普遍的語言並實現極低延遲的處理,但默認配置可能產生重複結果而且沒法保證順序。Samza與YARN和Kafka緊密集成可提供更大靈活性,更易用的多團隊使用,以及更簡單的複製和狀態管理。
對於混合型工做負載,Spark可提供高速批處理和微批處理模式的流處理。該技術的支持更完善,具有各類集成庫和工具,可實現靈活的集成。Flink提供了真正的流處理並具有批處理能力,經過深度優化可運行鍼對其餘平臺編寫的任務,提供低延遲的處理,但實際應用方面還爲時過早。
最適合的解決方案主要取決於待處理數據的狀態,對處理所需時間的需求,以及但願獲得的結果。具體是使用全功能解決方案或主要側重於某種項目的解決方案,這個問題須要慎重權衡。隨着逐漸成熟並被普遍接受,在評估任何新出現的創新型解決方案時都須要考慮相似的問題。
HBase | Cassandra | |
---|---|---|
語言 | Java | Java |
出發點 | BigTable | BigTable and Dynamo |
License | Apache | Apache |
Protocol | HTTP/REST (also Thrift) | Custom, binary (Thrift) |
數據分佈 | 表劃分爲多個region存在不一樣region server上 | 改進的一致性哈希(虛擬節點) |
存儲目標 | 大文件 | 小文件 |
一致性 | 強一致性 | 最終一致性,Quorum NRW策略 |
架構 | master/slave | p2p |
高可用性 | NameNode是HDFS的單點故障點 | P2P和去中心化設計,不會出現單點故障 |
伸縮性 | Region Server擴容,經過將自身發佈到Master,Master均勻分佈Region | 擴容需在Hash Ring上多個節點間調整數據分佈 |
讀寫性能 | 數據讀寫定位可能要經過最多6次的網絡RPC,性能較低。 | 數據讀寫定位很是快 |
數據衝突處理 | 樂觀併發控制(optimistic concurrency control) | 向量時鐘 |
臨時故障處理 | Region Server宕機,重作HLog | 數據回傳機制:某節點宕機,hash到該節點的新數據自動路由到下一節點作 hinted handoff,源節點恢復後,推送回源節點。 |
永久故障恢復 | Region Server恢復,master從新給其分配region | Merkle 哈希樹,經過Gossip協議同步Merkle Tree,維護集羣節點間的數據一致性 |
成員通訊及錯誤檢測 | Zookeeper | 基於Gossip |
CAP | 1,強一致性,0數據丟失。2,可用性低。3,擴容方便。 | 1,弱一致性,數據可能丟失。2,可用性高。3,擴容方便。 |
單純的就部署和運維hbase以及Cassandra來講,部署hbase前,須要部署的組件有zookeeper,hdfs,而後纔是hbase。對應的Cassandra就比較簡單不少,編譯完成一個jar包,單臺服務器啓動一個Cassandra進程便可。
在部署hbase的時候,可能須要規劃好,哪些機器跑hmaser,rs,zk,hdfs的相關進程等, 還有可能爲了集羣的性能,還要預先規劃好多少個rs。本身人工去部署這麼一個hbase集羣仍是比較麻煩的,更別提本身維護(阿里雲ApsaraDB-HBase你值得擁有)。
Cassandra部署的時候比較簡單,一個tar包搞定,因爲cassandra數據落本地盤,須要人爲的配置一些參數好比是否須要虛擬節點(vnode)以及多少vnode;須要基於業務的場景選擇特定的key的放置策略(partitioner),這個放置策略的選擇以及一些參數的配置須要必定的門檻。
簡單總結下:部署運維的話,hbase依賴組件多,部署麻煩一點,可是相關資料不少,下降了難度;cassandra部署依賴少,可是配置參數多,相關資料較少。
特別是使用雲HBase徹底避免了部署形成的各類麻煩,比手工部署運維任何大數據數據庫都方便太多。
總體架構從上至下依次是:數據源輸入層、基礎數據加工層、核心業務層、數據輸出層和下游系統。首先從外部數據源獲取咱們所需的業務數據,而後對基礎數據進行加工清洗,再經過時間序列、機器學習等人工智能技術對數據進行處理分析,最後計算出預測結果並經過多種途徑推送給下游系統使用。
數據源輸入層:京東數據倉庫中存儲着咱們須要的大部分業務數據,例如訂單信息、商品信息、庫存信息等等。而對於促銷計劃數據則大部分來自於採銷人員經過Web系統錄入的信息。除此以外還有一小部分數據經過文本形式直接上傳到HDFS中。
基礎數據加工層:在這一層主要經過Hive對基礎數據進行一些加工清洗,去掉不須要的字段,過濾不須要的維度並清洗有問題的數據。
核心業務層:這層是系統的的核心部分,橫向看又可分爲三層:特徵構建、預測算法和預測結果加工。縱向看是由多條業務線組成,彼此之間不發生任何交集。
特徵構建:將以前清洗過的基礎數據經過近一步的處理轉化成標準格式的特徵數據,提供給後續算法模型使用。
核心算法:利用時間序列分析、機器學習等人工智能技術進行銷量、單量的預測,是預測系統中最爲核心的部分。
預測結果加工:預測結果可能在格式和一些特殊性要求上不能知足下游系統,因此還須要根據實際狀況對其進行加工處理,好比增長標準差、促銷標識等額外信息。
預測結果輸出層:將最終預測結果同步回京東數據倉庫、MySql、HBase或製做成JSF接口供其餘系統遠程調用。
下游系統:包括下游任務流程、下游Web系統和其餘系統。
預測系統核心介紹預測系統核心層技術選型
預測系統核心層技術主要分爲四層:基礎層、框架層、工具層和算法層。
基礎層:
HDFS用來作數據存儲,Yarn用來作資源調度,BDP(Big Data Platform)是京東本身研發的大數據平臺,咱們主要用它來作任務調度。
框架層:
以Spark RDD、Spark SQL、Hive爲主, MapReduce程序佔一小部分,是原先遺留下來的,目前正逐步替換成Spark RDD。 選擇Spark除了對性能的考慮外,還考慮了Spark程序開發的高效率、多語言特性以及對機器學習算法的支持。在Spark開發語言上咱們選擇了Python,緣由有如下三點:
Python有不少不錯的機器學習算法包可使用,比起Spark的MLlib,算法的準確度更高。咱們用GBDT作過對比,發現xgboost比MLlib裏面提供的提高樹模型預測準確度高出大概5%~10%。雖然直接使用Spark自帶的機器學習框架會節省咱們的開發成本,但預測準確度對於咱們來講相當重要,每提高1%的準確度,就可能會帶來成本的成倍下降。
咱們的團隊中包括開發工程師和算法工程師,對於算法工程師而言他們更擅長使用Python進行數據分析,使用Java或Scala會有不小的學習成本。
對比其餘語言,咱們發現使用Python的開發效率是最高的,而且對於一個新人,學習Python比學習其餘語言更加容易。
工具層:
一方面咱們會結合自身業務有針對性的開發一些算法,另外一方面咱們會直接使用業界比較成熟的算法和模型,這些算法都封裝在第三方Python包中。咱們比較經常使用的包有xgboost、numpy、pandas、sklearn、scipy和hyperopt等。
Xgboost:它是Gradient Boosting Machine的一個C++實現,xgboost最大的特色在於,它可以自動利用CPU的多線程進行並行,同時在算法上加以改進提升了精度。
numpy:是Python的一種開源的數值計算擴展。這種工具可用來存儲和處理大型矩陣,比Python自身的嵌套列表結構要高效的多(該結構也能夠用來表示矩陣)。
pandas:是基於NumPy 的一種工具,該工具是爲了解決數據分析任務而建立的。Pandas 歸入了大量庫和一些標準的數據模型,提供了高效地操做大型數據集所需的工具。
sklearn:是Python重要的機器學習庫,支持包括分類、迴歸、降維和聚類四大機器學習算法。還包含了特徵提取、數據處理和模型評估三大模塊。
scipy:是在NumPy庫的基礎上增長了衆多的數學、科學以及工程計算中經常使用的庫函數。例如線性代數、常微分方程數值求解、信號處理、圖像處理和稀疏矩陣等等。
算法層:
咱們用到的算法模型很是多,緣由是京東的商品品類齊全、業務複雜,須要根據不一樣的狀況採用不一樣的算法模型。咱們有一個獨立的系統來爲算法模型與商品之間創建匹配關係,有些比較複雜的預測業務還須要使用多個模型。咱們使用的算法整體上能夠分爲三類:時間序列、機器學習和結合業務開發的一些獨有的算法。
1. 機器學習算法主要包括GBDT、LASSO和RNN :
GBDT:是一種迭代的決策樹算法,該算法由多棵決策樹組成,全部樹的結論累加起來作最終答案。咱們用它來預測高銷量,但歷史規律不明顯的商品。
RNN:這種網絡的內部狀態能夠展現動態時序行爲。不一樣於前饋神經網絡的是,RNN能夠利用它內部的記憶來處理任意時序的輸入序列,這讓它能夠更容易處理如時序預測、語音識別等。
LASSO:該方法是一種壓縮估計。它經過構造一個罰函數獲得一個較爲精煉的模型,使得它壓縮一些係數,同時設定一些係數爲零。所以保留了子集收縮的優勢,是一種處理具備復共線性數據的有偏估計。用來預測低銷量,歷史數據平穩的商品效果較好。
2. 時間序列主要包括ARIMA和Holt winters :
ARIMA:全稱爲自迴歸積分滑動平均模型,於70年代初提出的一個著名時間序列預測方法,咱們用它來主要預測相似庫房單量這種平穩的序列。
Holt winters:又稱三次指數平滑算法,也是一個經典的時間序列算法,咱們用它來預測季節性和趨勢都很明顯的商品。
3. 結合業務開發的獨有算法包括WMAStockDT、SimilarityModel和NewProduct等:
WMAStockDT:庫存決策樹模型,用來預測受庫存狀態影響較大的商品。
SimilarityModel:類似品模型,使用指定的同類品數據來預測某商品將來銷量。
NewProduct:新品模型,顧名思義就是用來預測新品的銷量。
預測系統核心流程
預測核心流程主要包括兩類:以機器學習算法爲主的流程和以時間序列分析爲主的流程。
1. 以機器學習算法爲主的流程以下:
特徵構建:經過數據分析、模型試驗肯定主要特徵,經過一系列任務生成標準格式的特徵數據。
模型選擇:不一樣的商品有不一樣的特性,因此首先會根據商品的銷量高低、新品舊品、假節日敏感性等因素分配不一樣的算法模型。
特徵選擇:對一批特徵進行篩選過濾不須要的特徵,不一樣類型的商品特徵不一樣。
樣本分區:對訓練數據進行分組,分紅多組樣本,真正訓練時針對每組樣本生成一個模型文件。通常是同類型商品被分紅一組,好比按品類維度分組,這樣作是考慮並行化以及模型的準確性。
模型參數:選擇最優的模型參數,合適的參數將提升模型的準確度,由於須要對不一樣的參數組合分別進行模型訓練和預測,因此這一步是很是耗費資源。
模型訓練:待特徵、模型、樣本都肯定好後就能夠進行模型訓練,訓練每每會耗費很長時間,訓練後會生成模型文件,存儲在HDFS中。
模型預測:讀取模型文件進行預測執行。
多模型擇優:爲了提升預測準確度,咱們可能會使用多個算法模型,當每一個模型的預測結果輸出後系統會經過一些規則來選擇一個最優的預測結果。
預測值異常攔截:咱們發現越是複雜且不易解釋的算法越容易出現極個別預測值異常偏高的狀況,這種預測偏高沒法結合歷史數據進行解釋,所以咱們會經過一些規則將這些異常值攔截下來,而且用一個更加保守的數值代替。
模型評價:計算預測準確度,咱們一般用使用mapd來做爲評價指標。
偏差分析:經過分析預測準確度得出一個偏差在不一樣維度上的分佈,以便給算法優化提供參考依據。
2. 以時間序列分析爲主的預測流程以下:
生成歷史時序:將歷史銷量、價格、庫存等數據按照規定格式生成時序數據。
節假日因子:計算節假日與銷量之間的關係,用來平滑節假日對銷量影響。
週日因子:計算週一到週日這7天與銷量的關係,用來平滑週日對銷量的影響。
促銷因子:計算促銷與銷量之間的關係,用來平滑促銷對銷量的影響。
因子平滑:歷史銷量是不穩定的,會受到節假日、促銷等影響,在這種狀況下進行預測有很大難度,因此須要利用以前計算的各種因子對歷史數據進行平滑處理。
時序預測:在一個相對平穩的銷量數據上經過算法進行預測。
因子疊加:結合將來節假日、促銷計劃等因素對預測結果進行調整。
Spark在預測核心層的應用
咱們使用Spark SQL和Spark RDD相結合的方式來編寫程序,對於通常的數據處理,咱們使用Spark的方式與其餘無異,可是對於模型訓練、預測這些須要調用算法接口的邏輯就須要考慮一下並行化的問題了。咱們平均一個訓練任務在一天處理的數據量大約在500G左右,雖然數據規模不是特別的龐大,可是Python算法包提供的算法都是單進程執行。咱們計算過,若是使用一臺機器訓練所有品類數據須要一個星期的時間,這是沒法接收的,因此咱們須要藉助Spark這種分佈式並行計算框架來將計算分攤到多個節點上實現並行化處理。
咱們實現的方法很簡單,首先須要在集羣的每一個節點上安裝所需的所有Python包,而後在編寫Spark程序時考慮經過某種規則將數據分區,好比按品類維度,經過groupByKey操做將數據從新分區,每個分區是一個樣本集合並進行獨立的訓練,以此達到並行化。流程以下圖所示:
僞碼以下:
repartitionBy方法即設置一個重分區的邏輯返回(K,V)結構RDD,train方法是訓練數據,在train方法裏面會調用Python算法包接口。saveAsPickleFile是Spark Python獨有的一個Action操做,支持將RDD保存成序列化後的sequnceFile格式的文件,在序列化過程當中會以10個一批的方式進行處理,保存模型文件很是適合。
雖然原理簡單,但存在着一個難點,即以什麼樣的規則進行分區,key應該如何設置。爲了解決這個問題咱們須要考慮幾個方面,第一就是哪些數據應該被聚合到一塊兒進行訓練,第二就是如何避免數據傾斜。
針對第一個問題咱們作了以下幾點考慮:
被分在一個分區的數據要有必定的類似性,這樣訓練的效果纔會更好,好比按品類分區就是個典型例子。
分析商品的特性,根據特性的不一樣選擇不一樣的模型,例如高銷商品和低銷商品的預測模型是不同的,即便是同一模型使用的特徵也可能不一樣,好比對促銷敏感的商品就須要更多與促銷相關特徵,相同模型相同特徵的商品應傾向於分在一個分區中。
針對第二個問題咱們採用了以下的方式解決:
對於數據量過大的分區進行隨機抽樣選取。
對於數據量過大的分區還能夠作二次拆分,好比圖書小說這個品類數據量明顯大於其餘品類,因而就能夠分析小說品類下的子品類數據量分佈狀況,並將子品類合併成新的幾個分區。
對於數據量太小這種狀況則須要考慮進行幾個分區數據的合併處理。
總之對於後兩種處理方式能夠單獨經過一個Spark任務按期運行,並將這種分區規則保存。
結合圖解Spark進行應用、優化
注:《圖解Spark:核心技術與案例實戰》爲本文做者所著。
《圖解Spark:核心技術與案例實戰》一書以Spark2.0版本爲基礎進行編寫,系統介紹了Spark核心及其生態圈組件技術。其內容包括Spark生態圈、實戰環境搭建和編程模型等,重點介紹了做業調度、容錯執行、監控管理、存儲管理以及運行架構,同時還介紹了Spark生態圈相關組件,包括了Spark SQL的即席查詢、Spark Streaming的實時流處理、MLlib的機器學習、GraphX的圖處理和Alluxio的分佈式內存文件系統等。下面介紹京東預測系統如何進行資源調度,並描述如何使用Spark存儲相關知識進行系統優化。
結合系統中的應用
在圖解Spark書的第六章描述了Spark運行架構,介紹了Spark集羣資源調度通常分爲粗粒度調度和細粒度調度兩種模式。粗粒度包括了獨立運行模式和Mesos粗粒度運行模式,在這種狀況下以整個機器做爲分配單元執行做業,該模式優勢是因爲資源長期持有減小了資源調度的時間開銷,缺點是該模式中沒法感知資源使用的變化,易形成系統資源的閒置,從而形成了資源浪費。
而細粒度包括了Yarn運行模式和Mesos細粒度運行模式,該模式的優勢是系統資源可以獲得充分利用,缺點是該模式中每一個任務都須要從管理器獲取資源,調度延遲較大、開銷較大。
因爲京東Spark集羣屬於基礎平臺,在公司內部共享這些資源,因此集羣採用的是Yarn運行模式,在這種模式下能夠根據不一樣系統所須要的資源進行靈活的管理。在YARN-Cluster模式中,當用戶向YARN集羣中提交一個應用程序後,YARN集羣將分兩個階段運行該應用程序:
第一個階段是把Spark的SparkContext做爲Application Master在YARN集羣中先啓動;第二個階段是由Application Master建立應用程序,而後爲它向Resource Manager申請資源,並啓動Executor來運行任務集,同時監控它的整個運行過程,直到運行完成。下圖爲Yarn-Cluster運行模式執行過程:
結合系統的優化
咱們都知道大數據處理的瓶頸在IO。咱們藉助Spark能夠把迭代過程當中的數據放在內存中,相比MapReduce寫到磁盤速度提升近兩個數量級;另外對於數據處理過程儘量避免Shuffle,若是不能避免則Shuffle前儘量過濾數據,減小Shuffle數據量;最後,就是使用高效的序列化和壓縮算法。在京東預測系統主要就是圍繞這些環節展開優化,相關Spark存儲原理知識能夠參見圖解Spark書第五章的詳細描述。
因爲資源限制,分配給預測系統的Spark集羣規模並非很大,在有限的資源下運行Spark應用程序確實是一個考驗,由於在這種狀況下常常會出現諸如程序計算時間太長、找不到Executor等錯誤。咱們經過調整參數、修改設計和修改程序邏輯三個方面進行優化:
參數調整
減小num-executors,調大executor-memory,這樣的目的是但願Executor有足夠的內存可使用。
查看日誌發現沒有足夠的空間存儲廣播變量,分析是因爲Cache到內存裏的數據太多耗盡了內存,因而咱們將Cache的級別適當調成MEMORY_ONLY_SER和DISK_ONLY。
針對某些任務關閉了推測機制,由於有些任務會出現暫時沒法解決的數據傾斜問題,並不是節點出現問題。
調整內存分配,對於一個Shuffle不少的任務,咱們就把Cache的內存分配比例調低,同時調高Shuffle的內存比例。
修改設計
參數的調整雖然容易作,但每每效果很差,這時候須要考慮從設計的角度去優化:
原先在訓練數據以前會先讀取歷史的幾個月甚至幾年的數據,對這些數據進行合併、轉換等一系列複雜的處理,最終生成特徵數據。因爲數據量龐大,任務有時會報錯。通過調整後當天只處理當天數據,並將結果保存到當日分區下,訓練時按天數須要讀取多個分區的數據作union操做便可。
將「模型訓練」從天天執行調整到每週執行,將「模型參數選取」從每週執行調整到每個月執行。由於這兩個任務都十分消耗資源,而且屬於不須要頻繁運行,這麼作雖然準確度會略微下降,但都在可接受範圍內。
經過拆分任務也能夠很好的解決資源不夠用的問題。能夠橫向拆分,好比原先是將100個品類數據放在一個任務中進行訓練,調整後改爲每10個品類提交一次Spark做業進行訓練。這樣雖然總體執行時間變長,可是避免了程序異常退出,保證任務能夠執行成功。除了橫向還能夠縱向拆分,即將一個包含10個Stage的Spark任務拆分紅兩個任務,每一個任務包含5個Stage,中間數據保存到HDFS中。
修改程序邏輯
爲了進一步提升程序的運行效率,經過修改程序的邏輯來提升性能,主要是在以下方面進行了改進:避免過多的Shuffle、減小Shuffle時須要傳輸的數據和處理數據傾斜問題等。
1. 避免過多的Shuffle
Spark提供了豐富的轉換操做,可使咱們完成各種複雜的數據處理工做,可是也正由於如此咱們在寫Spark程序的時候可能會遇到一個陷阱,那就是爲了使代碼變的簡潔過度依賴RDD的轉換操做,使原本僅需一次Shuffle的過程變爲了執行屢次。咱們就曾經犯過這樣一個錯誤,原本能夠經過一次groupByKey完成的操做卻使用了兩回。
業務邏輯是這樣的:咱們有三張表分別是銷量(s)、價格(p)、庫存(v),每張表有3個字段:商品id(sku_id)、品類id(category)和歷史時序數據(data),如今須要按sku_id將s、p、v數據合併,而後再按category再合併一次,最終的數據格式是:[category,[[sku_id, s , p, v], [sku_id, s , p, v], […],[…]]]。一開始咱們先按照sku_id + category做爲key進行一次groupByKey,將數據格式轉換成[sku_id, category , [s,p, v]],而後按category做爲key再groupByKey一次。
後來咱們修改成按照category做爲key只進行一次groupByKey,由於一個sku_id只會屬於一個category,因此後續的map轉換裏面只須要寫一些代碼將相同sku_id的s、p、v數據group到一塊兒就能夠了。兩次groupByKey的狀況:
修改後變爲一次groupByKey的狀況:
多表join時,若是key值相同,則可使用union+groupByKey+flatMapValues形式進行。好比:須要將銷量、庫存、價格、促銷計劃和商品信息經過商品編碼鏈接到一塊兒,一開始使用的是join轉換操做,將幾個RDD彼此join在一塊兒。後來發現這樣作運行速度很是慢,因而換成union+groypByKey+flatMapValue形式,這樣作只需進行一次Shuffle,這樣修改後運行速度比之前快多了。實例代碼以下:
若是兩個RDD須要在groupByKey後進行join操做,可使用cogroup轉換操做代替。好比, 將歷史銷量數據按品類進行合併,而後再與模型文件進行join操做,流程以下:
使用cogroup後,通過一次Shuffle就可完成了兩步操做,性能大幅提高。
2. 減小Shuffle時傳輸的數據量
在Shuffle操做前儘可能將不須要的數據過濾掉。
使用comebineyeByKey能夠高效率的實現任何複雜的聚合邏輯。
comebineyeByKey屬於聚合類操做,因爲它支持map端的聚合因此比groupByKey性能好,又因爲它的map端與reduce端能夠設置成不同的邏輯,因此它支持的場景比reduceByKey多,它的定義以下:
educeByKey和groupByKey內部實際是調用了comebineyeByKey,
咱們以前有不少複雜的沒法用reduceByKey來實現的聚合邏輯都經過groupByKey來完成的,後來所有替換爲comebineyeByKey後性能提高了很多。
3.處理數據傾斜
有些時候通過一系列轉換操做以後數據變得十分傾斜,在這樣狀況下後續的RDD計算效率會很是的糟糕,嚴重時程序報錯。遇到這種狀況一般會使用repartition這個轉換操做對RDD進行從新分區,從新分區後數據會均勻分佈在不一樣的分區中,避免了數據傾斜。若是是減小分區使用coalesce也能夠達到效果,但比起repartition不足的是分配不是那麼均勻。
參考:http://www.zhihu.com/question/19593207:
Facebook開發Cassandra初衷是用於Inbox Search,可是後來的Message System則使用了HBase,Facebook對此給出的解釋是Cassandra的
最終一致性模型
不適合Message System,HBase具備更簡單的一致性模型,固然還有其餘的緣由。HBase更加的成熟,成功的案例也比較多等等。Twitter和Digg都曾經很高調的選用Cassandra,可是最後也都放棄了,固然Twitter還有部分項目也還在使用Cassandra,可是主要的Tweet已經不是了。