如何快速作一個山寨的實時「大數據」處理

 

前言

爲啥寫這篇文章?由於我如今作的這套實時計算系統在公司裏很難玩下去了。去年年初來到ctrip,主要就是作兩個實時應用,一個是實時報警,功能是作出來了,但應用效果很差;一個是XXX(敏感應用,不敢寫出來,以XXX代替),也是實現了功能需求,但想繼續按本身的思路往下走是不可能了,我捉急的表達能力很難讓上頭去理解實時計算和傳統的request-response方式的應用不一樣點在哪裏,爲啥要這麼幹,也很難明白上頭喜歡的系統是什麼樣的,真是撓破頭。個人方式看來是作不下去了,但總以爲仍是有點價值,因此寫下來,看看有沒有也想山寨的同窗,能夠做爲參考,下面一段是扯淡,想看實際內容的請跳到系統結構。

至於爲何起這個標題:html

  • 如何:
    會介紹我目前的系統是怎麼作到如今這個程度的;以及原本想嘗試的一些開發方向。
  • 快速:
    • 這套系統基本上是我去年一我的搭起來的,最初徹底不知道如何去作實時數據處理,因此架構翻了好幾次(感謝ctrip給的機會),所以到最近的一個整個版本的話,核心代碼也就小几千行(這個要感謝開源社區),找一個熟練的java工程師來作的話,最多在幾個星期內就能搭起來,應該說建設的成本不會太大。
    • 由於起手就有兩個項目,因此一開始就瞄着通用的實時計算框架/平臺,過去一年還零散開發了兩三個額外的應用,這幾個應用基本上都是共享一套代碼,差異只存在於配置。所以當底層設施建好後,能夠很快的開發一個應用,咱們曾經有個應用大概花了半天,若是能作的完善的話,新應用的開發和部署能夠作到分鐘級,由於現實場景中大部分應用的數據獲取和計算本質上差異很小,很是類似。
  • 山寨:
    介紹的系統和那些大公司的系統確定不在一個層次上,高手們能夠繞道了。本文適合於像我同樣的三流程序員,若是不想花費太大的代價,而關注點在於如何應用現有的開源產品(咱們是storm+esper)來作能夠cover一部分狀況的實時數據處理,能夠參考下本文。
  • 實時
    上面說了開發和部署的目標是分鐘級。而整個系統的響應速度和新規則的上線速度是在秒級,但實在是無法作到毫秒級,畢竟是山寨的。用簡單的方式作到實時計算,犧牲了必定的可靠性,若是真有需求的話,須要另外的手段去彌補。
    另外最近被人用實時操做系統的定義challenge了對實時系統的理解。我仍是想表達一個觀點,互聯網裏的實時系統應該還具有一個特徵,便是關注latest的數據,實時系統不能徹底等於響應快,還對應於數據新。若是是歷史時刻的某一數據特徵,那是offline的系統去考慮的,混到實時系統中只會對系統設計拖後腿

  • 大是相對的,跟巨頭們的數據量無法比,只是ctrip這樣的流量仍是沒有壓力的。老實點,放在引號裏。
  • 數據
    業內大數據講的太多,太空,連「數據」是什麼都沒有明確的定義。我本身心中的數據要有以下的特徵(我的觀點,因此在引號裏):
    • 可描述。各個公司應該都差很少,基本數據來源於各類各樣的log,但最好仍是能用統一且簡單的方式去描述這種數據,而不是光禿禿的沒有任何schema的text,否則在其上就比較難作文章(若是隻是簡單使用的話),後面幾點特徵也很難保證。固然這種格式會比較簡單,以防止帶來太多限制
    • 可計算。提供一些通用的操做能夠對數據進行變換和處理,由數據產生數據,並能支持遞歸的迭代。這樣才能在raw data上獲得一些更有意義的數據,從而去作更有價值的事情。
    • 多條記錄Over單條記錄。在計算中,對於某一個實體(例如一個ip或是一個uid)而言,其單條記錄每每用處不大,更應該關注其一組記錄的特徵值。因此須要大量的聚合、統計操做。目前咱們作的這套系統重點就在這,本文大部份內容會與此相關。
    • 羣體記錄Over單個實體的記錄。在計算中,單個實體也沒有太大的價值,更重要的是看整個羣體的特徵,用羣體的特徵再去比對個體。找到人民大衆的特徵,再去找裏面的遵紀守法者(正常值,可用在推薦)和離經叛道者(異常點,多用於報警)。我一直想作到這個,也有大概的思路,但無法繼續嘗試了,後文會簡要介紹一些。
    • 可度量和監控。要對數據有統一的度量和監控,從而對數據有個掌控,也方便找出異常點
    • 數據Over規則。估計大多數公司的應用和ctrip的同樣,業務方只會提出一系列規則和幾個原始的數據源,指望一個萬能的規則引擎去把全部數據採進來並按照規則的邏輯去實現。大部分業務方會是自然以規則爲中心的,根據具體的case,人肉找出必定的規律,而後再看過程當中用到了什麼數據源,從而給出需求給pd。然而在大數據量的條件下,這是不成立的,一是在大量數據下作複雜的邏輯計算,從開發和運行的效率都很難保證高效;二是大數據量下,人肉很難去發現規則了,並且老的規則也很難維護,由於規則這東西難以度量和演化,也難作自動化生成。因此必須轉向數據爲中心,抽出不一樣的特徵維度,而後纔可能用高大上的機器學習等方法去自動化生成規則或模型,這個纔是王道;如今很成熟的hadoop,storm,也都是以數據爲中心,儘可能簡化計算模式。
    • 數據特徵Over業務特徵。數據處理系統只應該跟數據相關,考慮的是數據類型(double,string等少許類型)和數據特徵(單記錄大小、吞吐量、流速等),這樣才能讓系統的適用性最大化。雖然主流的說法是設計要按照業務來,但我的仍是比較頑固,認爲其本質是業務裏的數據特徵,業務自己太繁瑣,對底層系統也沒多大意義,應該把業務底下的數據特徵提取抽象出來。目前我所作的系統只適合於較短期內的數據的大量的、實時的計算,不考慮大時間跨度的數據,也暫時忍受短暫的失效。如今的幾個應用基本擁有這個特徵,因此能夠號稱開發應用只要配置就行了,雖然你們就是不相信。
  • 不是數據大
    如今不少同窗認爲跟H*的系統掛上鉤,或是用個nosql,就吹是大數據了。其本質仍是之前基於數據庫的業務驅動的應用換了個dao而已,頂多稱爲數據大。實際上「數據」和「大」都值得商榷。雖然是山寨,也要跟他們堅定劃清界限。
  • 處理
    火熱的大數據一應俱全,這裏只涉及到實時環境下的數據處理,不怎麼涉及到存儲、展示等其餘方面

後續系統設計都是以這些爲出發點的前端

背景知識

storm

storm 是目前比較火熱的實時處理系統,雖然不能和H系的比,但資料也仍是很多,我這就默認你們已經知道storm的概況了,具體的資料就不舉了!java

國內而言,阿里系對storm的應用比較多,網上有不少的文章;在ctrip,也有另一個team在用storm作前端的用戶行爲分析,感受是蠻成功的,應該算公司裏拿得出手的項目之一了。還有不少公司也是用storm在作一些實時的業務開發。mysql

storm自己只是提供了一個實時處理的框架,幫咱們解決了大部分的可靠性,數據流向組織,併發性等基本問題,可是實際的數據處理仍是要根據業務需求去開發代碼適配。所以它只是解決了實時計算的組織和管理,而對計算自己是沒有支持的,直接用是達不到我想要的「不寫代碼只配置」的效果,因此我把重心放在esper上,storm只做爲外部的容器,幫我作數據的簡單處理和sharding,節點的自動分配和重啓,數據源的組織和數據結果的分配等等外圍的功能,計算就交給esper了。git

esper

esper 絕壁是個好東西,功能強大,但門檻過高。資料的話官網上看下例子,也有一篇很詳盡的文檔,這是html版,有興趣的同窗能夠上官網下pdf版。
簡單的介紹的話,esper是一個用於針對數據流進行流式處理的lib庫(java和.net都有),他跟傳統的數據庫不一樣之處在於:數據庫是數據先寫進來,再定義一個sql,而後去拉取數據並計算一次獲得結果;esper是定義一個計算規則,並做爲一個計算節點,而後再把數據不停的推給他,由計算節點不停的作增量計算,並出來一系列的結果。
Alt text
上圖是傳統的數據庫(也有用nosql的)的方式,目前不少項目應該都是採起這種設計,好比我要作一個5分鐘的數據統計,就須要不停的跑sql去拉數據作統計和計算(固然能夠提早作一些預聚合,數據庫也應該有觸發器之類的功能,不過只能做爲優化,也很難去掌控)。這種pull的方式容易理解,也有已經成熟到爛街的數據庫技術支持,pull的時候靠分庫分表來sharding,只是要本身寫點聚合的代碼,對單個查詢來講很快,由於只要一次數據庫查詢,一跳就完了,數據的查詢和存儲都由數據庫來保證高效。但對於數據量大,而後又要實時更新的場景來講,這種低效的方式是走不通的,圖上就能夠看到由於數據和計算節點的分離,勢必形成冗餘的數據被屢次拉取(或者要寫複雜的代碼去優化),並且實時度靠輪詢來達到。這種設計要麼只能適合於數據量和計算量比較小的場景,要麼只能適合於人傻錢多的場景。但它好理解。程序員

也有說拿redis作counter的,但這種方式仍是解決不了數據和計算分離的問題;在者,對於功能稍複雜的計算就力不從心了,並且redis的邏輯抽象程度遠不如sql,開發工做比sql都要大不少。具體的不展開了
Alt text
此圖中下方每一個方框表明一個用esper實現的實時計算引擎,配置好運算規則後,咱們主動去把數據餵過去,引擎不停的作增量計算,每次有新結果就經過回調通知咱們。這個圖是公司內部寫ppt時臨時畫的,不大恰當。由於esper是個lib,單實例的可靠性和性能無法scale,因此咱們是架在storm上,由storm去自動部署和分配多個esper進程,並在前端作sharding來達到高擴展性。github

select avg(price) from StockTickEvent.win:time(30 sec) 

select a.id, count(*) from pattern [
        every a=Status -> (timer:interval(10 sec) and not Status(id=a.id)
] group by id

esper聲稱本身是SEP(stream event processing)和cep(complex event processing)。上面從官網抄了兩個比較有表明性的例子來分別說明。redis

  • 第一個很直觀,是從StockTickEvent中計算最近30秒的平均price,當你配置好規則和StockTickEvent事件的schema,並把數據組裝成具體的StockTickEvent事件源源不斷push給引擎後,引擎會根據新數據的到來和時間的流逝(意味着老數據的expire)不斷作計算,每當值發生變化了就會經過回調來通知調用方。從這個例子能夠看出,esper採用了相似sql的寫法,其稱之爲epl,基本包含了sql的大部分用法,還算比較親切,只是用的時候思惟要轉換一下,這是一個持續的計算過程,而不是sql那種一錘子買賣
  • 第二個例子稍微複雜些,展現了esper描述事件之間關係的能力,這個例子是說當某個Status事件發生10秒以內,沒有相同id的另外一個Status事件發生,即通知調用方。

這兩個例子還只是揭露了esper能力和複雜度冰山一角。對於流式的數據處理和事件間pattern的描述,它提供了不少的底層支持和選項。基本咱們的需求都能獲得知足,一種功能還能用好多種寫法來實現,越用就越是以爲它強。
然而,強大的東西不容易掌控。我是在前公司paypal時據說這個開源軟件的,當時老闆的另一個team花了很大的力氣和資源在上面,但願作實時大數據,我離開不久據說這個team就散了,大概由於沒出好的成果,員工圖譜上,我和cto之間那條線上的老闆們都陸續黯然離開了;而我本身,是在來到ctrip才接觸,由於大概知道前同事的不順利,因此是帶着敬畏心在作,當心翼翼,儘可能讓它更易用,結果雖然功能是出來了,獎也拿了,仍是被罵,結局也離屎差很少了。因此想用它的要小心啊,不祥之物啊:)
不過失敗不能白失敗,經驗教訓要總結下的,但願能成爲它人的成功之母:算法

  • 要在esper上封裝一層給使用。esper過於複雜,一應俱全,是個大雜燴,學習曲線開始時比較陡峭;並且從上面兩張圖比較,它的思惟是跟傳統的數據庫正好相反,因此要想說服用戶能理解並直接去用就很困難。前同事推銷給paypal的analyst,結果人家不感冒,我在ctrip時也向用戶推銷過,但願用戶能直接配epl規則來完成功能,結局是一年多下來仍是隻有我一我的能寫一點。因此我花了很大力氣去嘗試在這上層去封裝,但願能封裝出更簡單,更好用的一層。如今我大概有四個應用,已經近千條epl,每一個都比上面的例子要長好多,若是直接在epl上操做是不可想象的事情。
  • 既然要作封裝,不是作全部功能的封裝。剛開始用esper,會以爲很好玩,好比我一個報警的cooldown功能就有好多種寫法。後來逐漸簡單化,嘗試只用sep那套sql like的語法(不包含pattern那種擴展),一是已經知足目前遇到所需功能;二是容易駕馭,在上面作封裝;三是正如我以前所說,想作數據爲中心而不是規則爲中心,因此只用了它簡單而有效的聚合功能,擯棄了複雜和強大的CEP功能。
  • 必定要作好監控。前公司用的商業版,帶一個稱爲「dashboard」的功能,據說沒什麼用;我本身用的開源版,沒有這方面的支持,因此很受困擾。由於當那麼大的數據量部署上去而後源源不斷的跑時,你只能看到你的輸入(源數據)和輸出(esper最後經過回調傳出來的數據),裏面一大坨計算過程是黑盒的,徹底搞不清情況,調試什麼的基本不可能。仍是須要想辦法能讓其輸出一些中間信息,作到必定的監控,方便使用。最近用戶提了點監控和報警的需求,已經想到了簡單的solution,爭取能在掛掉前實現上。
  • 界面是很重要的。以前都是一我的瞎玩,重點放在後端,一方面是前端基本沒經驗,另外一方面是後端還在摸石頭,因此忽視了前段展現。結局是我覺得實現了功能就完了,因此只作到了restful api一層。但沒有界面就向人展現理念,不討喜,雖然本身以爲仍是有獨到之處的,結果仍是留下了很爛的印象。這是一個很好的教訓。
  • 說服別人採用異步批處理的思惟。不管是storm仍是esper,都不是request-response的同步調用方式。storm的調用方通常經過queue去push數據,計算節點是本身不斷運行的,由源源不斷到來的數據trigger,多個物理上可能分離的節點處理後,產生最終結果,最後一個節點可能執行些落地的操做,但不會往回傳,雖然有drpc,但也只是模擬了一個rpc的錶殼,內部仍是經過不一樣的queue去鏈接各個計算節點的,並且是分佈式系統:queue+分佈式節點+分佈式數據來源的歸併同步,單個數據的處理很難作到毫秒級,但它本質是一種批處理的方式,是跑量的,能夠作到海量數據的秒級處理;esper也是,它的操縱對象是數據流,內部是事件驅動,因此也是一種異步批處理的方式,若是用同步調的話很難達到如此高效。其實從硬件到軟件(網絡),再到現有的大數據處理系統,基本都是異步批處理的思路。因此當最外層使用的時候,最好是去適應這種新環境下的新特徵,而不是說老的數據庫能作到100ms,你這咋不行,太差了。這個思惟方式不同,不轉換很難理解。

dashboard

我同事在opentsdb(後端是hbase)的思路上開發了一個叫dashboard的系統,能夠對海量metrics進行實時的存儲和查詢。,同事對整個先後端都進行了重寫和改進,細節上有蠻多獨到之處,我全部程序的監控,包括storm的監控和想要作的計算平臺的監控,都是基於此。這應該是ctrip裏很是好的一個系統了。sql

系統結構

下面分別從邏輯和物理上描述整個系統的結構。

邏輯結構

目前的系統設計,對一個應用,包含了四大部分
Alt text

  1. data source system. Data sources負責管理系統的輸入數據,全部輸入源經過配置的形式給出,系統儘量自動化的提取數據,並轉化爲內部的數據流
  2. variable engine. variable部分負責對原始數據流進行實時計算,以封裝的esper引擎對原始數據流進行拆分/合併/過濾/聚合操做,加工後獲得新的,更有價值的數據流。這一部分focus在計算,具體詳見後文。
  3. rule engine. 針對variable部分處理後的數據流,咱們須要過濾出符合用戶需求的部分,須要進行閾值比較,數據cooldown等工做,最後產生可直接供用戶使用的數據(稱爲Alert)。
  4. dispatch engine
    對每個rule的Alert,可能有不一樣的action(郵件、DB、調用特定url、mq),這一部分管理如何輸出數據。

因此整套系統是一個典型的輸入(part 1) --> 處理(part 2&3) --> 輸出(part 4)的結構,每一個應用只需給出四大部分的配置,就能夠獲得一個實時事件處理應用。
這裏須要補充說明的是:

  1. data source、variable、rule本質上都是數據流,每一個元素的config信息會描述它是什麼(shema,有哪些字段,字段是什麼類型,僅限於string,double,long,object等簡單類型),它從何來(它的source是什麼,datasource來自於外部,variable來至於datasource或自己,rule來自於variable),它如何獲得(對source的計算方式)
  2. data source這塊應該僅侷限於外部數據的集成和簡單處理。這點以前沒想清楚,也把一些計算功能混進去了,結果挖了個坑。
  3. rule單列開來,是由於給外部使用時須要一些額外的配置信息。咱們的系統中基本往外的數據都是以alert/alarm爲形式的,爲了方便辨識,增長了一些名稱、標示符等屬性,方便數據的外部集成。
  4. variable目前在公司內部稱爲counter,但我的仍是傾向於叫variable,主要是由於:
    • 大多數analyst的的規則和模型都是創建在變量上。我設計的初衷是爲了能擺脫肉眼對原始數據進行判斷的規則分析模式,但願能從多個數據源中抽象出供analyst去作挖掘分析的可精確描述、獨立性較強的變量。固然這種變量在實時系統中是一個隨時間變化的數據流(永遠是latest值,不停變化)。雖然失敗了,但理由還在。
    • counter這個名字只反映了聚合這一種計算類型,當時妥協改了這個名字,後面後悔了,由於要描述這個系統更難了
    • 對於整個計算過程的配置來講,都是OPERATION(input1, input2)--> output1的最簡單的範式,input和output都是系統裏的數據流,用variable的叫法更貼切。

Data Source System

對於data source來講,框架部分開發出不一樣類型數據源的數據抽取驅動,能夠對如下數據類型數據源進行數據拉取:

  1. db(mysql/sqlserver)
  2. hbase
  3. dashboard api拉取
  4. dashboard數據直連
  5. mq
  6. url拉取
  7. other

對於每一種數據源,大體只須要定義元數據信息,就能夠完成外部的數據拉取併到系統內部數據(通常稱爲event,包括name、key、timestamp、value四大固有屬性和其餘屬性)格式的轉換:

  1. 鏈接信息(dashboard url,db connection & sql, mq connection & queue name)
  2. event轉換信息。對每一種通用的數據源,能夠配置一些參數來自動完成源數據-->event的轉換。

目前大部分數據源都是根據應用寫死,但長期但願抽象出特徵來,能夠經過配置自動完成,只剩下少數特別的經過開發完成。
這一份的結果是咱們能夠從每一個datasource源源不斷的獲得數據,所以就是一條條數據流,通過alignment後(時間對齊),要求全部事件都以相同的時鐘下匯聚成一個邏輯上的總的數據流,這是系統最大的limitation)

Variable Engine

對於進入系統的數據流(stream),咱們能夠對此進行一些操做(包括但不限於split/join/aggreation/filtering),實時造成新的數據流,獲得一系列variable(能夠對應爲BI中的維度,風控模型中的variable),以下圖所示

Alt text

進入這部分系統時有兩個數據流DS1, DS2;通過處理後,獲得6個Variable(每一個Variable主要包括name,key,timestamp,value幾個固有屬性和其餘用戶定義的屬性),每一個variable其實也是隨時間變化的數據流,經過加工後的數據會更有利於做進一步的決策;最後在某一個時間點,對全部variable進行切片,能夠獲得一系列的latest值,這樣就能夠作爲決策規則(模型)的輸入依據,這一部分由rule management部分完成。

實際上,從數學的角度看,這部分工做但願以variable(數據流/維度,counter只是其中一種)這種數據流做爲基本單元,完成一個帶少許操做符的代數系統,從而整個計算過程能夠由這樣一些基本的操做符去搭建一個DAG,而不是從頭至尾所有由程序員編碼實現功能。

操做部分基本由esper完成,經過封裝,將esper實現相關的部分封裝掉,只提供邏輯上的運算符給用戶/admin,減輕使用負擔。目前只提供少許基本的操做類型:
a. (已實現)單線聚合/過濾。提供基於單個數據流(variable或data source)的按時間聚合、按條件過濾。經過此操做能夠實現split/aggregate/filter等邏輯操做
b. (已實現)雙線merge。對兩條數據流(variable)進行合併操做,實現簡單的join(根據key和timestamp嚴格匹配)。
c. (未實現)多線merge。將多條數據流(variable)根據key去全部的latest值,進行合併計算。
以上是系統中如今和將要實現的操做符,目前看效果不是很好,好比操做符a帶的功能太多,但願一個操做符就能解決多個問題,對用戶並不貼切,應該拆分爲aggregate,filter(split用多個filter來實現)。

對於每一個應用來講,直接拿底層框架的操做符進行配置能夠基本知足需求。在資源充足的狀況下,每一個應用能夠在框架的variable體系開發一套更高一級抽象層的操做符,來方便應用的使用。參見實時報警的實現

同時,variable部分會提供dump操做,按期的將variable值dump到hbase中,供後續查詢。這個功能主要是爲了過後分析和離線查詢,目前尚未在生產啓用。最近打算先簡單寫一路進dashboard,以利用其實時查詢和展示的能力。至因而否之後要擴展,要看最後項目的發展了

Rule Engine

Rule這塊會直接對上一部的variable進行操做,經過用戶提供的閾值來得出有價值的信息(暫且稱爲alert),而且根據後續用戶配置的action操做分發到外部處理的地方。
Alt text

目前,規則管理準備實現如下三種:

  1. (已實現)單variable固定閾值。閾值能夠寫死在規則裏,這種方式簡單,但不夠靈活,適合那種比較穩定的規則
  2. (已實現)單variable浮動閾值。閾值經過api由用戶管理,對每一個variable[key],以分鐘爲精度,進行閾值報警。這種方式使用比較麻煩,單能夠提供必定的靈活性。
  3. (未實現)多variable組合報警。多個variable經過給定的公式來報警。這個操做能夠做爲後續更復雜的規則引擎的基礎。目前是實現了兩個variable的組合報警,方便用戶使用。對於BI提供的規則和模型來講,使用多variable的組合是自然的手段,目前還沒達到這個階段,因此也沒加上。

這裏須要註明的事,rule和variable有部分重合,rule的一些功能後續也能擴展到variable裏實現,二者的區別在於:

  • variable是一種較爲穩定的邏輯對象,對它有很好的管理:它能夠做爲計算單元在整個variable引擎中做爲計算輸入;經過variable dump功能能夠有過後分析和查詢的好處。所以,當須要對結果數據進行更細緻的分析後後續處理時,能夠創建一個variable
  • 當只須要對結果數據進行分發到後續的handler時,能夠用rule,邏輯上更易懂一些。

Dispatch Engine

Dispatch引擎會將rule engine產生的alerts進行分發,供用戶進行進一步訪問。這一快將與data source一塊造成相對應的關係,目標是經過配置將alerts推送到制定的地方:

  • db
  • mq
  • url gateway
  • hbase

App Management

App 管理做爲後續計劃(nice to have),應該是沒可能實現了,本打算實現如下功能。

  1. 根據data source system, variable engine, rule engine, dispatch engine,再加上一些額外的配置,自動生成storm topology
  2. 提供app(其實就是topology)的界面化部署、啓動、中止
  3. 整合各個模塊的監控,並開發console模塊
    寫的簡單,仍是有蠻多細節要考慮的,有精力的能夠去嘗試

這一部分主要設計了邏輯上數據流的定義,以及其整個生命週期,下一段講一下咱們簡單的物理結構是如何去分別實現各個功能的。

物理結構

以前已經提到過,咱們是storm+esper的形式,esper負責內部絕大部分的計算,storm負責外圍的組織。整個系統實現起來很是簡單,如圖
Alt text

外部數據過來有兩種方式:

  1. 外部數據主動push。然而因爲storm更適合用主動向外拉的方式,因此咱們中間用了一個mq 中轉,目前是一個activemq,打算往分佈式queue遷移。
  2. 外部數據被pull。最初專門寫了一個puller的程序,去拉取各類數據,再導入到storm中。後來發現畫蛇添足了,storm自己就提供了高併發和故障恢復,因而逐漸把數據採集轉移到storm上,組織好的話,代碼仍然能夠保持簡潔性,並輕鬆得到高併發的特性。如今基本上只要不向外提供接口服務的程序都已經往storm集羣上遷了

storm的多個spout/bolt節點獲取外部數據後,成爲統一的格式event(包含name,key,timestamp和其它特定屬性),並sharding(須要key)到不一樣esper 節點,做進一步計算。

Alt text

每個esper節點,都已是運行在單個機器的單個進程上了,因此在這裏將不一樣來源的數據流對齊(須要timestamp),並做一些優化處理(好比去除冗餘的對象)。最後這些event就分配到esper引擎了。
variable計算引擎一方面將這些外部數據轉化爲簡單變量(一種我本身約定格式的esper數據流),這些簡單變量在一些操做符下能夠生成其它變量,有最簡單的操做符

  1. filtering. insert into outputVar select * from inputVar where $condition
  2. aggregation.insert into outputVar select count/sum/avg(*) as value,... from inputVar:time:win(5 min)
  3. split. 跟filter相似,用兩個或多個filter來實現便可insert into outputVar1 select * from inputVar where $condition;insert into outputVar2 select * from inputVar where not $condition
  4. join(根據實際須要有多種join方式,這裏列出一種)。insert into outputVar select inputVar1.*, inputVar2.* from inputVar1.latest(key), inputVar2.latest(key) where inputVar1.key = inputVar2.key,這個僞epl表示將兩個原始數據流,以其key做爲 單位,將最新值拼接起來,這裏還可做一些運算。舉個例子,有一個變量源數據來源於pc訪問,另外一個變量數據來源於移動app(可能用戶再家裏同時用手機和pc訪問),按ip/uid去作join,就能夠獲得這個ip的完整視圖

這些基本的操做符都很是簡單,不會太多,開發起來也比較容易。只是要定義一下配置項,以及運算到底層esper語句的映射。咱們目前新加一個基本variable操做的話,後端只用新增一個文件便可,但前端比較難作,尚未想到很好的解決方法。
固然,對於一些特殊的應用,簡單的操做符可能抽象度較低,難以直接使用,能夠在簡單操做之上進一步封裝,詳細的參考case study裏面實時報警的就能夠了

變量出來以後,就能夠經過rule來過濾出咱們感興趣的內容了:

  1. 單變量規則:insert into rule1 select * from inputVar where value > 5
  2. 多變量規則:insert into rule1 select ... from inputVar1.latest(key), inputVar2.latest(key) where inputVar1.key = inputVar2.key and (inputVar1.value 5 or inputVar2.value <10)

最後產生的結果,經過各類方式送到系統下游的各個handler。因爲storm這種靈活的代碼運行框架,這一點很容易作到,不詳細敘述了。

case study

實時報警

ctrip內部用dashboard系統對各個應用的內部狀態進行監控,包括物理的/邏輯的,利用hbase的特性,得到了實時聚合和展現的能力。咱們的實時報警項目,就但願能自動化的去拉取數據,找出異常點。
公司內部也有其餘報警系統,會對訂單這些做一些同環比,閾值報警,系統跑的很好頗有效。咱們這邊的報警系統更多的在於系統監控方面,會有一些更大的挑戰,一是數量更大,我可能要對每一個hostip,每一個url,甚至是他們的組合做爲單位去檢查並報警;二是簡單的閾值規則對上層業務來講有價值,對底層來講沒太大指導做用,超出定義的閾值是比較常見的狀況,結局就是大量的報警郵件發出來了,但沒人關心,本身反倒成了公司內部最大的垃圾郵件製造商,因此須要各類更加複雜類型的報警:

  1. 連續上升超過50%的報警。原始數據-->簡單變量-->(單變量聚合,取相鄰兩個值的比例合成一個新變量)-->比例變量-->(閾值爲大於1.5即報警)-->done
  2. 同環比報警。咱們的數據源拉取配了時間參數,能夠拉取當前時間的數據,也能拉取一段時間前這個時段的數據,因而兩條原始數據流-->兩個簡單變量-->(雙變量join後,新數據/老數據)-->同環比變量-->閾值報警-->done
  3. 多個metrics值組合報警,好比一個metrics描述了>10s延時的統計,另外一個metrics描述了全部延時的統計,我想知道佔比的狀況。兩個metrics對應的原始數據流-->簡單變量-->(雙變量join,分子/分母)-->比例變量-->閾值報警-->done
  4. 多閾值報警。單閾值的話,規則過於剛性,容易誤報。因此添加了閾值管理功能,經過開放api接口,用戶能夠以時間、key爲單位定義不一樣的閾值,這樣能夠達到分時段報警(多個time的閾值),差別報警(不一樣key對應於不一樣閾值),基線報警(天天的每一分鐘預先算好閾值並寫入)。這得益於esper能夠輕易的調用java的方法,只作簡單的開發就能夠擴充esper的功能。
  5. 。。。

這裏能夠看出,儘管有各類複雜的規則類型,但基本的操做是相同的,因此只要在基本操做上封裝一層便可。下面是個人一條規則配置,因爲沒有太多資源專門管理這些五花八門的規則,我將全部配置項揉到了一塊兒,看着複雜些,但管理成本稍微低些(由於各個部分的配置相互間能夠排列組合,分開來成本過高)。

{
    "namespace": "ns",
    "group": "group,
    "name": "rulename", // 這三項只是名字標識,方便rule管理
    "config": {
      "dashboardUrl": "http://xxxxx",  // dashboard url,返回json數據
      "timeAdjustment": 300,   // 表示取多久以前的數據,同環比就在這個配置項有不一樣
      "dataType": "Single", // 是否拉取到的值直接報警,仍是要作同環比,或者是多值計算
      "period": 0,  // 如下四項配置同環比的參數
      "ops": "-",
      "oldCondition": "",
      "newCondition": "",
      "secondUrl": "",  // 如下四項配置配置雙metrics計算 
      "secondTimeAdjustment": 0,
      "dualOps": "-",
      "firstCondition": "",
      "secondCondition": "",
      "valueType": "", // 如下三項配置是對以前的原始值或計算值直接檢測,仍是要看變化率(差或比值)
      "formerPointCondition": "",
      "latterPointCondition": "",
      "triggerType": "fixed",  // 如下四項配置閾值類型,要麼直接給閾值,要麼給個標識符,用戶本身去寫複雜的閾值
      "lower": 0,
      "upper": 30000,
      "thresholdName": "",
      "conditionWindow": 0, // 如下兩項控制規則屢次命中才報警,減小誤報
      "conditionCount": 0,
      "cooldown": 600, // 報警cooldown,防止短時間內重複報警
      "to": "wenlu@ctrip.com", // 報警郵件配置
      "cc": "",
      "bcc": "",
      "mailInterval": 60,
      "cats": "",
      "catsEnv": "",
      "catsLevel": "info",
      "catsMessage": "",
      "catsDevid": "",
      "catsName": "",
      "catsPriority": "info",
      "type": "MetricsAlertingRule", // 表示規則類型,不一樣值會觸發不一樣操做
      "desc": ""
    },
    "type": "MetricsAlertingRule",
    "desc": "",
    "status": "on",
    "app_subid": "auto@hotel_product_common_utility_logging_responsetime_30s" // 自動生成底層數據時用給的標識符
  }
View Code

 

這算是一條頂層規則,系統會自動生成一系列底層的data source/variable/rule/action,對頂層的CRUD操做也會自動映射爲底層的操做。
這樣開發一條新規則只用考慮如何去運用底層的計算平臺,能夠在更高的抽象層次上去開發,而不是從頭至尾從新開發一遍。
整體而言,實時報警的功能是實現了,但運用的很差,並且是給底層人民用的,可視度不高。

XXX應用

XXX應用是敏感項目,會講的含糊些。
這個項目數據量比較大,基本上對ctrip的每次訪問都要促發一系列的運算和規則檢驗。過濾後,每秒k級數據,目前有幾十個變量和規則,意味着每秒上w次的聚合操做和檢測。生產上用了三臺機器跑storm集羣,實時上藉助於storm+esper的高效,單臺測試機(16g內存+6核cpu),已經能基本扛得住如此量的運行(不過最近隨着變量的增長和流量的上升已經比較勉強,須要考慮可能的優化了)。
公司另一個項目,用的是我以前提的數據庫的方式。他們的數據跨度比較大,須要很長時間段的數據,而不侷限於當前,總數據量是xxx項目當前時間段數據的幾倍,但計算觸發頻次較低,每秒幾十次,若是不算db的話,用了10臺服務器,聽說cpu使用率20%不到。雖然我很不喜歡拿這兩個系統去比較,由於解決的問題和適用的場景不同,就好像拿喬丹和貝利去比較,但由此得出XXX項目效率比較低這種結論是很難讓人心服口服的。esper這種基於本地內存的效率遠不是基於db或者是redis這種異地內存的系統能夠比擬的,他的滑動窗口的計算效率也不是簡單算法就能超過的。若是說可靠性和成熟度卻是可讓人服帖的。

這個項目一方面展現了當前架構能扛得住中等流量的衝擊,另外一方面本意是嘗試讓用戶能自主的動態的建立變量:
Alt text
上圖的拓撲是根據用戶的需求演化而來,其實具備必定的普適性,經過必定的過濾找到具體的感興趣的數據,進行聚合獲得統計特徵,在此之上才能建出靈活的規則來。
圖中最下方有個join變量,目前還沒用到,是爲了多數據源(多設備或者多數據中心的數據)的數據整合。有種說法是直接從數據源上進行合併,但這樣會增長數據源的複雜度,破壞總體的結構,還不能徹底覆蓋;另外一方面,join過濾後的數據遠比join原始數據高效的多得多
這一套以前一半用代碼,一半靠實時計算系統的規則,最近才抽象出來,原本打算徹底遷移到實時計算系統中,把整個圖的掌控交給用戶,由用戶去控制整個DAG的結構構造和節點配置,這樣靈活性比較高,經過統一的監控功能可讓用戶掌控每一環節的具體信息。這讓我想起去年有家國內公司來推銷大數據的,他們學術背景是可視計算。如今想來若是這套能實現的話,是否是也有點可視計算的味道了。
不過現實是這條路已經斷掉了,一是咱們team,尤爲是個人前端能力還比較差,另外一方面老闆以爲這個太複雜,易用性比較差,不能讓人家去管理樹(內部介紹還停留在樹一層),因而改爲了與業務適配,拓撲定死的結構,只讓用戶配底層的聚合變量,內部稱爲counter。如今只但願能知足用戶需求,能多多的建出變量來,這樣纔有往下一步走的可能。

一些實現細節和演進方向

如下基本是未實現,只是思考過的部分

  1. 要增強監控。以前對variable的監控力度不夠,整個系統徹底變成黑盒。其實能夠經過諸如簡單的epl如insert into varInfo select count(*),.... from inputVar.win:time_length(1 min)就能夠得到每一個變量的運行時統計,直接導入ctrip的dashboard系統,就能夠得到監控和展現能力,這樣還能導入到實時報警應用裏面去,得到實時報警能力(用戶提的需求,因此說多讓用戶參與討論是頗有用的,不要把用戶當傻瓜)。對XXX項目而言,這些統計信息甚至能做爲業務指標使用,例如若是我須要公司某一業務線的訪問量,只要配個變量+監控就ok了。沒什麼難度,但應該頗有用
  2. 提升sharding的靈活性。目前只在前端作一次sharding,都是事先決定的(根據event裏的key字段),能夠考慮改進這塊,根據後續變量的sharding來自動選擇、複製和分發數據。
  3. 多迭代。目前esper節點只有一層,若是有更復雜的功能,能夠考慮用mq做中轉,或是創建多層esper節點,從而實現多道處理。
  4. 多集羣配合。幾個集羣配合起來工做,好比在多數據中心環境下,每一個數據中心部署一套算各自的集羣,只要把處理過的數據再統一的聚合一遍,就能夠得到統一的結果,簡單又不失效率;另外第一點也提到了XXX項目的監控數據能夠由實時報警去檢測異常,但二者核心代碼是一致的,實時報警自己的統計也能用來監控本身,這種帶點遞歸的特性仍是比較好玩的。
  5. 2-4都描述了複雜化整個計算流程,其實甚至能夠經過對整個DAG圖進行分析去自動分配到各個節點,自動去作同步的操做。不過這些都過於複雜了,如今還只能想一想
  6. 這套實時系統有可靠性的弱點。由於基於內存的計算,若是某一節點不幸掛掉了,內存裏的數據就丟了。這個目前沒有太好的辦法,要麼作主從備份(一樣的計算分配到不一樣的機器,只取其中一個的計算結果),要麼用storm的可靠性方式,掛了後恢復到某一個時間段開始重放,不過這就要看業務是否能容忍至少一次的一致性。要作到通用的徹底的可靠度很是具備挑戰性,仍是得根據業務對數據可靠性的不一樣需求才能作出合適的設計。
  7. 因爲可靠性的因素,目前這套系統比較推薦用於短時數據的分析,這樣即便掛了,storm重啓後也不會有很大的影響,不少業務都能容忍。對於時間跨度比較長的數據分析和聚合,按個人想法,須要摒棄掉esper,但數據庫那種方式仍是太慢,必定要作好流式計算(或者說是online的計算)。我目前的思路是隻採用storm,將每一個步驟分攤到各個節點:
    • 對於過濾和join比較簡單,直接來就能夠了
    • 對於基於滑動窗口的聚合,稍微麻煩些,但像count和sum這類都是能夠流式計算的,對於每一個進了變量窗口的事件,咱們只要添加一個進入事件和窗口事件後的退出事件便可,計算節點只要順序處理這些事件便可。
    • count distinct的比較麻煩些。能夠用一些hashmap的技術去優化,不過我更傾向於採用分段做基數估計的方法去取近似值。
      這樣子的話項目規模會大很多,效率確定比單機版的esper差,但可控性加強,方即可靠性和一致性方面的工做。還只是想法,沒有詳細思考過
  8. 對於歷史性的數據,若是跨度很是大,能夠仍舊採用老的db的方式,但能夠有優化,能夠隔一段時間用hadoop跑一下,作些預處理工做,儘可能減小在線的計算量。事實上,咱們的實時系統自己能夠也改形成後臺job的形式,計算寫基於老數據的變量,若是和實時版的變量相結合,會很大擴展適用範圍。
  9. 續上一點,最理想的情況就是把實時變量提供接口暴露出來(下一步有時間會嘗試,這下要測下整個系統的延時度了),與其餘系統產生的歷史數據相集成,能有個通用的rule引擎去統一運用這些數據。這樣的話,每一個系統各司其職,有的系統跑得慢,但穩定,能夠利用較多的數據;有的系統雖然可靠性很難保證,但跑得快,能夠提供掌控當前情況的能力。

更大的視角

其實爲啥要這麼折騰,除了人懶想寫點一本萬利的代碼外,主要是從前公司那裏產生了完整的生態系統纔是最重要的這一個想法。因此力圖往這個方向靠。
Alt text
此圖是我在paypal待了一年後最大的感覺:別看技術爛,只要生態系統建成了,整個系統能有機的跑起來,就能源源不斷的帶來收益。
整個系統是一個循環往復的過程:

  1. ops在目前的技術條件下是最重要的,任何的規則系統,總得有人去給你作標註,作出大量的樣本數據來,這是作任何分析系統的基礎。paypal ops的director就自豪的認爲google沒作過paypal就是由於沒「人」的參與;而在ctrip,目前感受幾個應用最缺的就是這塊,沒有樣本數據很難去作任何自動化,規則只能是出過後每次人工去猜想,後續新規則的制定和老規則的維護都很難跟上。即便是公司技術能力突增,會用機器學習了,也很難發力,你只能用用無監督學習,但找到的特徵估計十有八九不是想要的。這一點是致命傷,公司也很難向這個方向投入資源,一是還沒發展到這步,二是得考慮劃不划算
  2. ops產生的樣本數據經由analyst分析,概括出一些變量,就能夠訓練處一部分規則和模型,只要有足夠的數據,就能夠獲得還行的成果。再拿paypal舉例,基本上看家本領就靠teradata數據倉庫,當時聽一個pd的team leader說他們最複雜的方法就是logstic regression了,由於這樣出來的東西好理解(聽說如今也開始高大上,採用神經網絡了,幫他們打個廣告)。別看不是很複雜,但確實頗有效,支撐公司了好久(固然他們的準確率要求不高,由於背後有千把個ops撐腰呢)。切換到ctrip,沒有ops做支撐,因此個人用戶們定規則都當心翼翼的,每次都只有些raw log去看,而後去定規則,這些規則都沒法度量和跟蹤維護。這就是我要作這套變量系統的初衷,其實原本我只要把人家的規則實現就行了,但這永遠是打補丁的方式,不是可持續發展的道路。本來是但願能有套靈活建變量的系統,這樣能夠很輕鬆的讓用戶去見上百個變量,這樣對每一個ip/uid,我都能有成百上千個更有意義的數據(或者叫維度),理論上說,造成了一個risk profile。若是有樣本的話,能夠與BI對接,自動化都能產生規則出來;即便沒有樣本,經過監控統計,也能比raw log更好的揭示系統情況。
  3. ops產生的規則和模型最終由pd的在線系統去run,從而抓出感興趣的數據來。這一塊純粹是執行了。不過對我本身實現的系統而言,還指望達到能靈活的自動化的創建變量,而不是每一個變量都要去考編碼來實現
  4. 整個系統周而復始的運行,不斷調整,不斷演進。

這這個一套系統纔是完整的有機體,本文所述的內容都只是描述了其中最不重要的在線那一塊,而創建整個體系和完善離線系統纔是更重要的,這個是真正作一個完善有用的實時系統須要解決的。我本身還沒作到,就很少說了

總結

想說的都說了,沒想到這麼長。最後只想說句作實時計算真不容易,越作越以爲能力和經驗上的不足益發明顯了,已經不是簡單搭個開源軟件就能搞定的。但願後面能看到別人的作法,有機會能跟風。

什麼,你竟然看到結束了,請你喝酸梅湯。

相關文章
相關標籤/搜索