MapReduce Design Patterns(chapter 2(part 1))(二)

隨着天天都有更多的數據加載進系統,數據量變得很龐大。這一章專一於對你的數據頂層的,歸納性意見的設計模式,從而使你能擴展思路,但可能對局部數據是不適用的。歸納性的分析都是關於對類似數據的分組和執行統計運算,建立索引,或僅僅爲了計數。java

 

經過分組數據集計算聚合排序是一種快速獲取結果的好方法。例如,你可能想按某種規則計算出所存的錢的總數,或者按人口計算人們在互聯網花費的平均時長。對於新的數據集,你能夠開始用這些分析類型幫你計算出數據中什麼東西有趣或惟一,和哪些須要仔細研究。sql

 

本章的模式有數值聚合,反向索引,和用計數器計數。書中簡潔的MapReduce程序比其它的模式要多。這是由於根據key分組數據是MapReduce規範的核心機制:全部key經過分組聚在一塊兒並在reduces端收集。當你把想要分組的字段做爲key發送時,分組是MapReduce框架自動處理的。apache

 

Numerical Summarizations

Pattern Description

數值聚合模式是一個用於計算細節上的數據的統計值的通用模式,這種模式很容易形成誤解。這種模式下,使用combiner和理解要執行的計算很是重要。設計模式

 

Intent

根據key分組記錄並每組計算聚合值,能夠對大數據集有更高層次的認識。假設θ是咱們想要執行的聚合方法,要計算的值是列表values(v1, v2, v3,…, vn)),要想求出聚合值λ,令:λ=θ(v1, v2, v3, …, vn).θ的種類有最大值,最小值,平均值,中值,標準差。網絡

 

Motivation

如今對於不少大的數據集,咱們手動讀它們並獲得有意義的信息是很困難的。例如你的網站日誌,一個用戶每次登錄,鍵入查詢,或執行其餘明顯的動做,要想靠閱讀上TB的文本監控這個用戶實時的行爲是極其困難的。若是按天天的小時分組,計算每組記錄的數量,你將會描繪出數量的直方圖,並識別網站的活躍時間。類似的,若是把廣告按類型分組,你將會把廣告推向更好的市場定位。也許你會基於在一天有效的時間投放循環廣告。全部這種類型的問題均可以用數值聚合解決。app

Applicability

數值聚合的使用需知足如下兩個條件:框架

一、  處理數值類型數據或作計數。less

二、  數據能根據指定字段分組。oop

 

Structure

圖2-1展現了MapReduce中數值聚合執行的結構圖。MapReduce組件每部分都有詳細的描述:post

 

Figure 2-1. The structure of the numerical summarizations pattern

 

•mapper的輸出keys由分組的字段組成,values是任意相關數值型的條目。能夠假設mapper配置一張關係表,表的列跟要執行θ方法的字段關聯,而且每一行都包含mapper輸出的記錄。Mapper輸出的value包含每一列的值,輸出key將表做爲一個總體,由於每一個表都是由MapReduce的分組方法建立的。

 

Notice:分組會涉及到將大量子數據集發送到到要運行的reduce端,每一個輸入記錄都有可能成爲map的輸出。確保儘可能少的須要分析的數據發送到reduce端,而且處理好壞的輸入條件。

 

•combiner經過數值聚合能有效減小經過網絡傳給reduce的中間鍵值對的數目。若是θ方法是關聯的而且是可交換的就能達到目的。就是說,若是能任意改變值得順序和進行任意的分組計算而對最終結果無影響,就能夠用combiner。這樣的combiner在下面的部分會論述。

 

•作數值求和時能從自定義partitioner中更好的向若干reduce任務分發鍵值對受益。這種需求不多,一旦有job執行時間吃緊,數據量龐大,並且有嚴重數據傾斜時,它能發揮做用。

 

Notice:自定義的partitioner常常被人們忽略,可是,花時間理解基於此的作分組時輸出鍵的分佈和分區會提升性能(還有其它這種狀況的)。假如啓動一百個reduce任務,80個用30秒完成,其它的用25分鐘,這是很低效的。

 

•reducer接收一系列與根據key分組的記錄相關聯的數值型values(v1, v2, v3,…, vn),執行方法λ =θ(v1, v2, v3, …, vn).λ的值同給定的輸入key一塊輸出。

 

Consequences

Job的輸出會由每一個reducer輸入組生成的包含一條記錄的多個文件組成。每條記錄包含key和聚合值。

 

Known uses

Word count:

就是MapReduce的hello world程序。程序對每一個單詞先輸出key爲單詞,value爲整數1,而後根據key分組。Reduce階段輸出每一個惟一單詞和整數加起來的和。第一章能夠看到這個例子。

Record count:

一種經常使用的根據特定時間週期(周,日,時等)獲取數據流量規律的分析方法。

Min/max/count:

一種計算最小,最大值,或特定事件總和的分析。例如,用戶第一次發帖時間,最後一次發帖時間,和一段時間內發帖的總數。你沒必要一次計算出這三個聚合值,其餘使用案例也列在這了,若是僅對其中某個感興趣。

Average/Median/Standard deviation:

跟最大最小求和類似,但不是一種簡單的實現,由於操做是不相關的。三個均可以用combiner,但相比單純重複reduce的邏輯,它們須要一種更復雜的處理過程。

 

Resemblances

SQL:

數值聚合模式跟sql裏分組後再聚合類似:

SELECT MIN(numericalcol1), MAX(numericalcol1), COUNT(*) FROM TABLE GROUP BY groupcol2;

Pig:

Group by部分用foreach generate替換:b = GROUP a BY groupcol2;

c = FOR EACH b GENERATE group, MIN(a.numericalcol1),

MAX(a.numericalcol1), COUNT_STAR(a);

 

Performance analysis

若是combiner適當的運用,使用這種模式能讓聚合運算可以執行的很好。MapReduce就是爲這些種類的工做出現的。跟書中大多數模式同樣,開發者須要關注使用適當的reduce的個數而且考慮可能在reduce組裏出現的數據傾斜。就是說,若是一個key產生的中間鍵值對比其餘key多,這個key對應的reducer就會比其餘reducer執行更多的工做。

 

Numerical Summarization Examples

Minimum, maximum, and count example

這三種計算都是數值聚合模式的優秀的程序。分組操做之後,reducer端只須要迭代跟分組相關聯的值並找到最小,最大和每一個key分組的和。因爲關聯性和可互換性,combiner能極大得減小須要發送的reduce端shffled的中間鍵值對。若是實現的功能恰當,reducer的代碼能夠跟combiner一致。

 

下面每部分代碼描述了這種問題的情形。

Problem:給出用戶評論內容的列表,獲得第一次和最後一次評論時間,和這個用戶評論總條數。

 

Minmaxcounttuple code。

MinMaxCountTuple類有三個屬性,並實現writable接口,用於mapper的輸出值。當用分隔符把這些值放進一個Text對象,最好建立個自定義的writable。這樣不只整潔,也沒必要擔憂從reduce階段獲取這些值是的字符串解析。這種自定義writable對象也廣泛用於這種模式下的其餘例子。下面就是代碼,本章其它writables跟這個相似,爲了簡介,咱們會省略掉。

 

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class MinMaxCountTuple implements Writable {
    private Date min = new Date();
    private Date max = new Date();
    private long count = 0;

    private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");

    public Date getMin() {
        return min;
    }

    public void setMin(Date min) {
        this.min = min;
    }

    public Date getMax() {
        return max;
    }

    public void setMax(Date max) {
        this.max = max;
    }

    public long getCount() {
        return count;
    }

    public void setCount(long count) {
        this.count = count;
    }

    public void readFields(DataInput in) throws IOException {
        // Read the data out in the order it is written,
        // creating new Date objects from the UNIX timestamp
        min = new Date(in.readLong());
        max = new Date(in.readLong());
        count = in.readLong();
    }

    public void write(DataOutput out) throws IOException {
        // Write the data out in the order it is read,
        // using the UNIX timestamp to represent the Date
        out.writeLong(min.getTime());
        out.writeLong(max.getTime());
        out.writeLong(count);
    }

    public String toString() {
        return frmt.format(min) + "\t" + frmt.format(max) + "\t" + count;
    }

  

Mapper code。Mapper會從每行輸入記錄(用戶id和建立數據)中抽取的xml屬性做爲輸入值,執行預處理。輸入鍵忽略掉,建立數據爲了在combiner和reduce中容易比較而轉換成java date類型。輸出鍵是用戶id,值是將要輸出的三個列:最小日期,最大日期,和用戶評論的總條數。三個列存在writable類型對象裏,前兩個時間類型,最後一個long類型。這些對reducer來講很精確,但不會影響到mapper中的使用,咱們也但願在mapper和reducer中使用相同的數據類型。在mapper中,設置最小最大建立日期。爲了充分發揮隨後講到的combiner的優點,日期輸出兩次。第三列給計數值1,代表這個用戶提交了一條評論。事實上,在reducer階段,全部的計數會被加到一塊兒,也會算出最大最小日期。

 

    public static class MinMaxCountMapper extends Mapper<Object, Text, Text, MinMaxCountTuple> {
        // Our output key and value Writables
        private Text outUserId = new Text();
        private MinMaxCountTuple outTuple = new MinMaxCountTuple();
        // This object will format the creation date string into a Date object
        private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            Map<String, String> parsed = MRDPUtils.transformXmlToMap(value.toString());
            // Grab the "CreationDate" field since it is what we are finding
            // the min and max value of
            String strDate = parsed.get("CreationDate");
            // Grab the「UserID」 since it is what we are grouping by
            String userId = parsed.get("UserId");
            // Parse the string into a Date object
            Date creationDate = null;
            try {
                creationDate = frmt.parse(strDate);
            } catch (ParseException e) {
                e.printStackTrace();
            }
            // Set the minimum and maximum date values to the creationDate
            outTuple.setMin(creationDate);
            outTuple.setMax(creationDate);
            // Set the comment count to 1
            outTuple.setCount(1);
            // Set our user ID as the output key
            outUserId.set(userId);
            // Write out the hour and the average comment length
            context.write(outUserId, outTuple);
        }
    }

  

Reducer code。Reducer會迭代全部值找出最小,最大日期和統計總和。一開始咱們對每一個輸入組初始化要輸出的結果。對組內的每一個輸入值,若是輸出結果的最小值沒設定,或比當前輸出結果中存的當前最小值小,咱們就把這個輸入值設置爲輸出結果的最小值。最大值的邏輯也是這樣,惟一不一樣的是用了大於號。每一個值的計數值被加到sum和中,跟word count程序類似。經過全部輸入值算出最大最小值之後,最終的計數就是要輸出的值。鍵和值被寫到文件系統中。

 

    public static class MinMaxCountReducer extends Reducer<Text, MinMaxCountTuple, Text, MinMaxCountTuple> {
        // Our output value Writable
        private MinMaxCountTuple result = new MinMaxCountTuple();

        public void reduce(Text key, Iterable<MinMaxCountTuple> values, Context context) throws IOException, InterruptedException {
            // Initialize our result
            result.setMin(null);
            result.setMax(null);
            result.setCount(0);
            int sum = 0;
            // Iterate through all input values for this key
            for (MinMaxCountTuple val : values) {
                // If the value's min is less than the result's min
                // Set the result's min to value's
                if (result.getMin() == null || val.getMin().compareTo(result.getMin()) < 0) {
                    result.setMin(val.getMin());
                }

                // If the value's max is more than the result's max
                // Set the result's max to value's
                if (result.getMax() == null || val.getMax().compareTo(result.getMax()) > 0) {
                    result.setMax(val.getMax());
                }
                // Add to our sum the count for value
                sum += val.getCount();
            }
            // Set our count to the number of input values
            result.setCount(sum);
            context.write(key, result);
        }
    }

  

Combiner optimization。本例的Reducer正好能夠用做job的combiner。由於咱們僅僅關心記錄條數,最小時間,最大時間。同一個用戶的多條記錄沒必要都發送到reducer。在本地map任務上能夠先算出最大最小評論日期,這樣對最終的值是沒有影響的。當計數操做是關聯的,並是可交換的時,combiner的使用不會影響計數結果。

 

Data flow diagram。圖2-2給出了mapper,combiner,reducer之間的流程來幫助咱們描述他們之間的交互過程。用數值簡單的表明日期,概念上是同樣的。Combiner極可能執行mapper輸出的全部組,決定最大最小值做爲前兩個列,並分組求和。而後輸出最小最大值,和這個心的計數和。若是combiner沒有運行在任何記錄上,在reducer裏仍然是能夠計算的。

 

 

Figure 2-2. The Min/Max/Count MapReduce data flow through the combiner

 

Average example

計算平均值,假設在分組裏面須要兩個值:須要要求和的值的個數和值的總和。這兩個值能在reduce端細緻的計算。經過遍歷集合中的每一個值,累加到一個保存總和的變量裏。以後,簡單的經過計數劃分結果,並輸出平均值。然而,在這裏咱們不能用reduce的實現當作combiner,由於平均值的計算是非關聯操做。相反,Mapper要輸出兩列數據,數值個數和平均值。每條輸入記錄計數1。Reduce經過計數和平均值的成績得到總和,累加計數做爲總的數值個數和。這樣經過動態的計數計算動態的數值和,而後輸出計數和平均值。使用這種迂迴策略,reducer代碼就能用做combiner,由於相關性獲得了保存。

 

下面代碼描述了這種問題。

問題:給出用戶評論數據,計算一天內每一個小時評論的長度的平均值。

 

Mapper code。Mapper將會處理每條輸入記錄並計算基於時間的評論內容的平均長度。輸出鍵是小時,從xml數據文件屬性中可獲得。輸出值有兩列,評論的條數和這一小時內評論的平均長度。由於mapper每次處理一條記錄,計數爲1,平均長度就是這條評論的長度。這兩個值經過自定義的writable類輸出,這個類包含兩個float數值字段,一個計數字段,還有一個平均值。

 

    public static class AverageMapper extends Mapper<Object, Text, IntWritable, CountAverageTuple> {
        private IntWritable outHour = new IntWritable();
        private CountAverageTuple outCountAverage = new CountAverageTuple();
        private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            Map<String, String> parsed = MRDPUtils.transformXmlToMap(value.toString());
            // Grab the "CreationDate" field,
            // since it is what we are grouping by
            String strDate = parsed.get("CreationDate");
            // Grab the comment to find the length
            String text = parsed.get("Text");
            // get the hour this comment was posted in
            Date creationDate = null;
            try {
                creationDate = frmt.parse(strDate);
            } catch (ParseException e) {
                e.printStackTrace();
            }
            outHour.set(creationDate.getHours());
            // get the comment length
            outCountAverage.setCount(1);
            outCountAverage.setAverage(text.length());
            // write out the hour with the comment length
            context.write(outHour, outCountAverage);
        }
    }

  

Reducer code。Reducer代碼迭代某小時內全部值並保存在兩個本地變量:動態的count變量和動態的sum變量。對每一個值,count乘上平均值加到sum上。Count簡單的加到動態count變量裏。迭代以後,輸入key,動態count,和經過動態count,動態sum計算出來的平均值寫到文件中。

 

    public static class AverageReducer extends Reducer<IntWritable, CountAverageTuple, IntWritable, CountAverageTuple> {
        private CountAverageTuple result = new CountAverageTuple();

        public void reduce(IntWritable key, Iterable<CountAverageTuple> values, Context context) throws IOException, InterruptedException {
            double sum = 0;
            long count = 0;
            // Iterate through all input values for this key
            for (CountAverageTuple val : values) {
                sum += val.getCount() * val.getAverage();
                count += val.getCount();
            }
            result.setCount(count);
            result.setAverage(sum / count);
            context.write(key, result);
        }
    }

  

Combiner optimization。計算平均值時,當reduce代碼輸出計數和平均值時能夠用做combiner。求平均值不是相關聯的操做,若是count和平均值(原文爲count,本人認爲有誤)從reducer一塊輸出,這兩個值的乘積能夠保存起來用於reduce階段的sum。若是不輸出這個count,combiner就不能用,由於平均數的平均數並非正確的平均數。通常來講,count和平均值一塊寫到文件系統不會有問題。然而,若是count妨礙了接下來的分析,那就去掉count,編寫一個跟reduce類似的combiner的實現。這兩種實現的惟一區別是寫不寫count。

 

Data flow diagram。圖2-3展現了流程圖。

Figure 2-3. Data flow for the average example

相關文章
相關標籤/搜索