小白的數據進階之路(上)——從Shell腳本到MapReduce

那一年,小白剛從學校畢業,學的是計算機專業。最開始他也不清楚本身想要一份怎樣的工做,只知道本身先找個互聯網公司乾乾技術再說。javascript

有一天,小白來到一家剛成立不久的小創業公司參見面試。公司雖小,但團隊倒是華麗麗的。兩位創始人都是MIT的MBA,Co-CEO。他們號稱,公司的運營、財務、市場以及銷售人員,都是從大公司高薪挖過來的。此外,他們還告訴小白,公司另有一位政府背景深厚但不肯透露姓名的神祕股東加盟。html

咱們手頭有百萬美金的風險投資,團隊也基本到位。如今萬事俱備,就差一位程序員了。java

小白日常在學校裏就很喜歡閱讀那些創業成功的勵志故事,對於能拿到風險投資的人特別崇拜。git

大家要作的是什麼產品?小白問道。程序員

這個涉及到咱們的創意,暫時保密。但能夠告訴你的是,咱們要作一款偉大的產品,它將顛覆整個互聯網行業。兩位CEO神祕地回答。github

隨後,他們又補充道,面試

咱們計劃在兩年內上市。編程

小白聽完不由心潮澎湃,隨後就入職了這家公司。網絡

公司已經有了一個能運行的網站系統,小白天天的工做就是維護這個網站。工做並不忙,日常只是讀讀代碼,改改bug。架構

忽然有一天,其中一位CEO讓小白統計一下網站的數據,好比日活躍(Dayliy Active Users)、周活躍(Weekly Active Users)、月活躍(Monthly Active Users),說是要給投資人看。

小白想了想,他手頭現有的一份基礎數據,就是訪問日誌(Access Log)。訪問日誌是天天一個文件,每一個文件裏面每一行的數據格式以下:

[時間] [用戶ID] [操做名稱] [其它參數...]複製代碼

比方說要統計日活躍,就要把一個文件(對應一天)裏出現一樣用戶ID的行進行去重,去重後的文件行數就是日活躍數據了。周活躍和月活躍與此相似,只是要分別在一週和一個月內進行去重。

小白那時只會寫Java程序,因此他寫了個Java程序來進行統計:

逐行讀取一個文件的內容,同時內存裏維護一個HashSet用於作去重判斷。每讀一行,解析出用戶ID,判斷它在HashSet裏是否存在。若是不存在,就把這個用戶ID插入HashSet;若是存在,則忽略這一行,繼續讀下一行。一個文件處理完以後,HashSet裏面存放的數據個數就是那一天對應的日活躍數據。

一樣,統計周活躍和月活躍只須要讓這個程序分別讀取7天和30天的文件進行處理。

今後之後,兩位CEO時不時地來找小白統計各類數據。小白知道,他們最近頻繁地出入於投資圈的各類會議和聚會,大概是想給公司進行第二輪融資。

每次看到數據以後,他們都一臉難以置信的表情。沒有統計錯誤吧?咱們就這麼點兒用戶嗎?

小白竟無言以對。

轉眼間一年時間過去了。小白髮現,他們與上市目標的距離跟一年前一樣遙遠。更糟糕的是,公司以前融到的錢已經花得差很少了,而第二輪融資又遲遲沒有結果,小白上個月的工資也被拖欠着沒發。因而,他果斷辭職。

截止到小白辭職的那一天,公司的日活躍數據也沒有超過四位數。


小白的第二份工做,是一家作手機App的公司。

這家公司的技術總監,自稱老王。在面試的時候,老王據說小白作過數據統計,二話沒說把他招了進來。

小白進了公司才知道,CEO之前是作財務出身,很是重視數據,天天他本身提出的各類大大小小的數據統計需求就不下十幾項。

小白天天忙着寫各類統計程序,處理各類數據格式,常常加班到晚上十一二點。並且,更令他沮喪的是,不少統計需求都是一次性的,他寫的統計程序大部分也是隻運行過一次,之後就扔到一邊再也用不到了。

有一天,他正在加班統計數據。老王走過來,發現他用的是Java語言,感到很驚訝。老王跟小白一塊兒分析後指出,大部分數據需求呢,其實均可以從訪問日誌統計出來。而處理文本文件的日誌數據,用Shell腳本會比較方便。

因而,小白猛學了一陣Shell編程。他發現,用一些Shell命令來統計一些數據,好比日活躍,變得很是簡單。以某一天的訪問日誌文件"access.log"爲例,它每行的格式以下:

[時間] [用戶ID] [操做名稱] [其它參數...]複製代碼

只用一行命令就統計出了日活躍:

cat access.log | awk '{print $2}' | sort | uniq | wc -l複製代碼

這行命令,使用awk把access.log中的第二列(也就是用戶ID)過濾出來,而後進行排序,這樣就使得相同的用戶ID挨在了一塊兒。再通過uniq命令處理,對相鄰行進行去重,就獲得了獨立用戶ID。最後用wc命令算出一共有多少行,就是日活躍。

寫了一些Shell腳本以後,小白慢慢發現,使用一些簡單的命令就能夠很快速地對文件數據集合進行並、交、差運算。

假設a和b是兩個文件,裏面每一行看做一個數據元素,且每一行都各不相同。

那麼,計算a和b的數據並集,使用下面的命令:

cat a b | sort | uniq > a_b.union複製代碼

交集:

cat a b | sort | uniq -d > a_b.intersect複製代碼

這裏uniq命令的-d參數表示:只打印相鄰重複的行。

計算a和b的差集稍微複雜一點:

cat a_b.union b | sort | uniq -u > a_b.diff複製代碼

這裏利用了a和b的並集結果a_b.union,將它與b一塊兒進行排序以後,利用uniq的-u參數把相鄰沒有重複的行打印出來,就獲得了a和b的差集。

小白髮現,不少數據統計均可以用集合的並、交、差運算來完成。

首先,把天天的訪問日誌加工一下,就獲得了一個由獨立用戶ID組成的集合文件(每行一個用戶ID,不重複):

cat access.log | awk '{print $2}' | sort | uniq > access.log.uniq複製代碼

好比,要計算周活躍,就先收集7天的獨立用戶集合:

  • access.log.uniq.1
  • access.log.uniq.2
  • ......
  • access.log.uniq.7

把7個集合求並集就獲得周活躍:

cat access.log.uniq.[1-7] | sort | uniq | wc -l複製代碼

一樣,要計算月活躍就對30天的獨立用戶集合求並集。

再好比,計算用戶留存率(Retention),則須要用到交集。先從某一天的日誌文件中把新註冊的用戶集合分離出來,以它爲基礎:

  • 計算這個新用戶集合和1天以後的獨立活躍用戶集合的交集,這個交集與原來新用戶集合的大小的比值,就是該天的1日留存率。
  • 計算這個新用戶集合和2天以後的獨立活躍用戶集合的交集,這個交集與原來新用戶集合的大小的比值,就是該天的2日留存率。
  • ......
  • 以此類推,能夠計算出N日留存率。

再好比,相似這樣的統計需求:「在過去某段時間內執行過某個操做的用戶,他們N天以後又執行了另外某個操做的比率」,或者計算用戶深刻到某個層級的頁面留存率,基本均可以經過並集和交集來計算。而相似「使用了某個業務但沒有使用另外某個業務的用戶」的統計,則涉及到差集的計算。

在掌握了Shell腳本處理數據的一些技巧以後,小白又深刻學習了awk編程,今後他作起數據統計的任務來,愈加地輕鬆自如。而公司的CEO和產品團隊,天天仔細分析這些數據以後,有針對性地對產品進行調整,也取得了不錯的成績。

隨着用戶的增多和業務的發展,訪問日誌愈來愈大,從幾百MB到1個GB,再到幾十個GB,上百GB。統計腳本執行的時間也愈來愈長,不少統計要跑幾個小時,甚至以天來計。之前那種靈活的數據統計需求,再也作不到「立等可取」了。並且,更糟糕的是,單臺機器的內存已經捉襟見肘。機器雖然內存已經配置到了很大,但仍是常常出現嚴重的swap,之前的腳本眼看就要「跑不動」了。

爲了加快統計腳本執行速度,小白打算找到一個辦法可以讓數據統計腳本在多臺機器上並行執行,並使用較小的內存就能運行。他左思右想,終於想到了一個樸素但有效的辦法。

仍是以計算日活躍爲例。他先把某一天的日誌文件從頭到尾順序掃描一遍,獲得10個用戶ID文件。對於日誌文件中出現的每一個用戶ID,他經過計算用戶ID的哈希值,來決定把這個用戶ID寫入這10個文件中的哪個。因爲是順序處理,這一步執行所須要的內存並不大,並且速度也比較快。

而後,他把獲得的10個文件拷貝到不一樣的機器上,分別進行排序、去重,並計算各自的獨立用戶數。因爲10個文件中的用戶ID相互之間沒有交集,因此最後把計算出來的10個獨立用戶數直接加起來,就獲得了這一天的日活躍數據。

依靠這種方法,小白把須要處理的數據規模下降到了原來的1/10。他發現,無論原始數據文件多麼大,他只要在第一步掃描處理文件的時候選擇拆分的文件數多一些,總能把統計問題解決掉。可是,他也看到了這種方法的一些缺點:

  • 第一步掃描處理文件的過程仍然是順序的,雖然內存佔用很少,但速度上不去。
  • 拆分後的文件要所有拷貝到別的機器上,跨機器的網絡傳輸也很耗時。
  • 最重要的一個問題,這所有的過程至關繁瑣,極易出錯,且不夠通用。

特別是最後這個問題,讓小白非常苦惱。看起來每次統計過程相似,彷佛都在重複勞動,但每次又都有些不同的地方。好比,是根據什麼規則進行文件拆分?拆分到多少份?拆分後的數據文件又是怎麼處理?哪些機器空閒可以執行這些處理?都要根據具體的統計需求和計算過程來定。

整個過程無法自動化。小白雖然手下招了兩個實習生來分擔他的工做,但涉及到這種較大數據量的統計問題時,他仍是不放心交給他們來作。

因而,小白在思考,如何才能設計出一套通用的數據計算框架,讓每一個會寫腳本的人都能分佈式地運行他們的腳本呢?

這一思考就是三年。在這期間,他無數次地感受到本身已經很是接近於那個問題背後的本質了,但每次都沒法達到融會貫通的那個突破點。

而與此同時,公司的業務發展也進入了瓶頸期。小白逐漸認識到,在原有的業務基礎上進行精耕細做的微小改進,當然能帶來必定程度的提高,但終究沒法造就巨大的價值突破。這猶如他正在思考的問題,他須要換一個視野來從新審視。

正在這時,另外一家處於高速增加期的互聯網公司要挖他過去。再三考慮以後,他選擇了一個恰當的時機提交了辭職信,告別了他的第二份工做。


小白在新公司入職之後,被分配到數據架構組。他的任務正是他一直想實現的那個目標:設計一套通用的分佈式的數據計算框架。這一次,他面臨的是動輒幾個T的大數據。

小白作了無數次調研,自學了不少知識,最後,他從Lisp以及其它一些函數式語言的map和reduce原語中得到了靈感。他從新設計了整個數據處理過程,以下圖:

  • (1) 輸入多個文件。不少數據統計都須要輸入多個文件,好比統計周活躍就須要同時輸入7個日誌文件。
  • (2) 把每一個文件進行邏輯分塊,分紅指定大小的數據塊,每一個數據塊稱爲InputSplit。
    • 因爲要處理很大的文件,因此首先必需要進行分塊,這樣接下來才便於數據塊的處理和傳輸。
    • 並且,這裏的文件分塊是與業務無關的,這與小白在上家公司根據計算某個哈希值來進行文件拆分是不一樣的。以前根據哈希值進行的文件拆分,須要編程人員根據具體統計需求來肯定如何進行哈希,好比根據什麼字段進行哈希,以及哈希成多少份。而如今的文件分塊,只用考慮分塊大小就能夠了。分塊過程與業務無關,意味着這個過程能夠寫進框架,而無需使用者操心了。
    • 另外,還須要注意的一點是,這裏的分塊只是邏輯上的,並無真正地把文件切成不少個小文件。實際上那樣作是成本很高的。所謂邏輯上的分塊,就是說每一個InputSplit只須要指明當前分塊是對應哪個文件、起始字節位置以及分塊長度就能夠了。
  • (3) 爲每個InputSplit分配一個Mapper任務,它容許調度到不一樣機器上並行執行。這個Mapper定義了一個高度抽象的map操做,它的輸入是一對key-value,而輸出則是key-value的列表。這裏可能產生的疑問點有下面幾個:
    • 輸入的InputSplit每次傳多少數據給Mapper呢?這些數據又是怎麼變成key-value的格式的呢?實際上,InputSplit的數據確實要通過必定的變換,一部分一部分地變換成key-value的格式傳進Mapper。這個轉換過程使用者本身能夠指定。而對於通常的文本輸入文件來講(好比訪問日誌),數據是一行一行傳給Mapper的,其中value=當前行,key=當前行在輸入文件中的位置。
    • Mapper裏須要對輸入的key-value執行什麼處理呢?這其實正是須要使用者來實現的部分。通常來講呢,須要對輸入的一行數據進行解析,獲得關鍵的字段。以統計日活躍爲例,這裏至少應該從輸入的一行數據中解析出「用戶ID」字段。
    • Mapper輸出的key和value怎麼肯定呢?這裏輸出的key很關鍵。整個系統保證,從Mapper輸出的相同的key,無論這些key是從同一個Mapper輸出的,仍是從不一樣Mapper輸出的,它們後續都會歸類到同一個Reducer過程當中去處理。好比要統計日活躍,那麼這裏就但願相同的用戶ID最終要送到一個地方去處理(計數),因此輸出的key就應該是用戶ID。對於輸出日活躍的例子,輸出的value是什麼並不重要。
  • (4) Mapper輸出的key-value列表,根據key的值哈希到不一樣的數據分塊中,這裏的數據塊被稱爲Partition。後面有多少個Reducer,每一個Mapper的輸出就對應多少個Partition。最終,一個Mapper的輸出,會根據(PartitionId, key)來排序。這樣,Mapper輸出到同一個Partition中的key-value就是有序的了。這裏的過程其實有點相似於小白在前一家公司根據哈希值來進行文件拆分的作法,但那裏是對所有數據進行拆分,這裏只是對當前InputSplit的部分數據進行劃分,數據規模已經減少了。
  • (5) 從各個Mapper收集對應的Partition數據,並進行歸併排序,而後將每一個key和它所對應的全部value傳給Reducer處理。Reducer也能夠調度到不一樣機器上並行執行。
    • 因爲數據在傳給Reducer處理以前進行了排序,因此前面全部Mapper輸出的同一個Partition內部相同key的數據都已經挨在了一塊兒,所以能夠把這些挨着的數據一次性傳給Reducer。若是相同key的數據特別多,那麼也沒有關係,由於這裏傳給Reducer的value列表是以Iterator的形式傳遞的,並非所有在內存裏的列表。
    • Reducer在處理後再輸出本身的key-value,存儲到輸出文件中。每一個Reducer對應一個輸出文件。

上面的數據處理過程,通常狀況下使用者只須要關心Map(3)和Reduce(5)兩個過程,即重寫Mapper和Reducer。所以,小白把這個數據處理系統稱爲MapReduce。

仍是以統計日活躍爲例,使用者須要重寫的Mapper和Reducer代碼以下:

public class MyMapper extends Mapper<Object, Text, Text, Text> {
    private final static Text empty = new Text("");
    private Text userId = new Text();

    public void map(Object key, Text value, Context context ) throws IOException, InterruptedException {
        //value格式: [時間] [用戶ID] [操做名稱] [其它參數...]
        StringTokenizer itr = new StringTokenizer(value.toString());
        //先跳過第一個字段
        if (itr.hasMoreTokens()) itr.nextToken();
        if (itr.hasMoreTokens()) {
            //找到用戶ID字段
            userId.set(itr.nextToken());
            //輸出用戶ID
            context.write(userId, empty);
        }
    }
}

public class MyReducer extends Reducer<Text,Text,Text,Text> {
    private final static Text empty = new Text("");

    public void reduce(Text key, Iterable<Text> values, Context context ) throws IOException, InterruptedException {
        //key就是用戶ID
        //重複的用戶ID只輸出一個, 去重
        context.write(key, empty);
    }
}複製代碼

假設配置了r個Reducer,那麼通過上面的代碼執行完畢以後,會獲得r個輸出文件。其中每一個文件由不重複的用戶ID組成,且不一樣文件之間不存在交集。所以,這些輸出文件就記錄了全部日活躍用戶,它們的行數累加,就獲得了日活躍數。

設計出MapReduce的概念以後,小白髮現,這是一個頗有效的抽象。它不只能完成日常的數據統計任務,它還有更普遍的一些用途,下面是幾個例子:

  • 分佈式的Grep操做。
  • 反轉網頁引用關係。這在搜索引擎中會用到。輸入數據是網頁source到網頁target的引用關係,如今要把這些引用關係倒過來。讓Mapper輸出(target, source),在Reducer中把同一個target頁面對應的全部source頁面歸到一塊兒,輸出:(target, list(source))。
  • 分佈式排序。原本每一個Reducer的輸出文件內部數據是有序的,但不一樣Reducer的輸出文件之間不是有序的。爲了能作到全局有序,這裏須要在Mapper完成後生成Partition的時候定製一下劃分規則,保證Partition之間是有序的便可。

故事以外的說明

首先,本文出現的故事情節純屬虛構,但裏面出現的技術和思考是真實的。本文在嘗試用一個先後貫穿的故事主線來講明數據統計以及MapReduce的設計思路,重點在於思惟的先後連貫,而不在於細節的面面俱到。所以,有不少重要的技術細節是本文沒有涵蓋的,但讀者們可能須要注意。好比:

  • 本文假設數據都在訪問日誌中可以取到。實際中要複雜得多,數據有不少來源,格式也會比較複雜。在數據統計以前會有一個很重要的ETL過程(Extract-Transform-Load)。
  • 本文在介紹MapReduce的時候,是仿照Hadoop裏的相關實現來進行的,而Hadoop是受谷歌的Jeffrey Dean在2004年發表的一篇論文所啓發的。那篇論文叫作《MapReduce: Simplified Data Processing on Large Clusters》,下載地址:research.google.com/archive/map…
  • 本文沒有對Hadoop集羣的資源管理和任務調度監控系統進行介紹,在Hadoop裏這一部分叫作YARN。它很是重要。
  • 爲了讓Mapper和Reducer在不一樣的機器上都能對文件進行讀寫,實際上還須要一個分佈式文件系統來支撐。在Hadoop裏這部分是HDFS。
  • Hadoop和HDFS的一個重要設計思想是,移動計算自己比移動數據成本更低。所以,Mapper的執行會盡可能就近執行。這部分本文沒有涉及。
  • 關於輸入的InputSplit的邊界問題。原始輸入文件進行邏輯分塊的時候,邊界可能在任意的字節位置。但對於文本輸入文件來講,Mapper接收到的數據都是整行的數據,這是爲何呢?這是由於對一個InputSplit進行輸入處理的時候,在邊界附近也通過了特殊處理。具體作法是:在InputSplit結尾的地方越過邊界多讀一行,而在InputSplit開始的時候跳過第一行數據。
  • 在每一個Mapper結束的時候,還能夠執行一個Combiner,對數據進行局部的合併,以減少從Mapper到Reducer的數據傳輸。可是要知道,從Mapper執行,到排序(Sort and Spill),再到Combiner執行,再到Partition的生成,這一部分至關複雜,在實際應用的時候還需深刻理解、多加當心。
  • Hadoop官網的文檔不是很給力。這裏推薦一個介紹Hadoop運行原理的很是不錯的網站:ercoppa.github.io/HadoopInter…

(完)

其它精選文章

相關文章
相關標籤/搜索