MapReduce Design Patterns(chapter 2 (part 2))(三)

Median and standard deviation

中值和標準差的計算比前面的例子複雜一點。由於這種運算是非關聯的,它們不是那麼容易的能從combiner中獲益。中值是將數據集一分爲兩等份的數值類型,一份比中值大,一部分比中值小。這須要數據集按順序完成清洗。數據必須是排序的,但存在必定障礙,由於MapReduce不會根據values排序。java

 

方差告訴咱們數據跟平均值之間的差別程度。這就要求咱們以前要先找到平均值。執行這種操做最容易的方法是複製值得列表到臨時列表,以便找到中值,或者再一次迭代集合全部數據獲得標準差。對大的數據量,這種實現可能致使java堆空間的問題,引文每一個輸入組的每一個值都放進內存處理。下一個例子就是針對這種問題的。數據結構

 

問題:給出用戶評論,計算一天中每一個小時評論長度的中值和標準差。app

 

Mapper code。Mapper會處理每條輸入記錄計算一天內每一個小時評論長度的中值(貌似事實不是這樣)。輸出鍵是小時,輸出值是評論長度。post

 

    public static class MedianStdDevMapper extends Mapper<Object, Text, IntWritable, IntWritable> {
        private IntWritable outHour = new IntWritable();
        private IntWritable outCommentLength = new IntWritable();
        private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            Map<String, String> parsed = MRDPUtils.transformXmlToMap(value.toString());
            // Grab the "CreationDate" field,
            // since it is what we are grouping by
            String strDate = parsed.get("CreationDate");
            // Grab the comment to find the length
            String text = parsed.get("Text");
            // get the hour this comment was posted in
            Date creationDate = null;
            try {
                creationDate = frmt.parse(strDate);
            } catch (ParseException e) {
                e.printStackTrace();
            }
            outHour.set(creationDate.getHours());
            // set the comment length
            outCommentLength.set(text.length());
            // write out the user ID with min max dates and count
            context.write(outHour, outCommentLength);
        }
    }

  

Reducer code。Reducer會迭代給定值得集合,並把每一個值加到內存列表裏。同時也會計算一個動態的sum和count。迭代以後,評論長度被排序,以便找出中值。若是數量是偶數,中值是中間兩個數的平均值。下面,根據動態的sum和count計算出平均值,而後迭代排序的列表計算出標準差。每一個數跟平均值的差的平方累加求和保存在一個動態sum中,這個sum的平方根就是標準差。最後輸出key,中值和標準差。this

 

    public static class MedianStdDevReducer extends Reducer<IntWritable, IntWritable, IntWritable, MedianStdDevTuple> {
        private MedianStdDevTuple result = new MedianStdDevTuple();
        private ArrayList<Double> commentLengths = new ArrayList<Double>();

        public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            double sum = 0;
            double count = 0;
            commentLengths.clear();
            result.setStdDev(0d);
            // Iterate through all input values for this key
            for (IntWritable val : values) {
                commentLengths.add((double) val.get());
                sum += val.get();
                ++count;
            }
            // sort commentLengths to calculate median
            Collections.sort(commentLengths);
            // if commentLengths is an even value, average middle two elements
            if (count % 2 == 0) {
                result.setMedian((commentLengths.get((int) count / 2 - 1) +
                        commentLengths.get((int) count / 2)) / 2.0f);
            } else {
                // else, set median to middle value
                result.setMedian(commentLengths.get((int) count / 2));
            }
            // calculate standard deviation
            double mean = sum / count;
            double sumOfSquares = 0.0f;
            for (double f : commentLengths) {
                sumOfSquares += (f - mean) * (f - mean);
            }
            result.setStdDev((double) Math.sqrt(sumOfSquares / (count - 1)));
            context.write(key, result);
        }
    }

  

Combiner optimization。這種狀況下不能用combiner。reducer須要全部的值去計算中值和標準差。由於combiner僅僅在一個map本地處理中間鍵值對。計算完整的中值,和標準值是不可能的。下面的例子是一種複雜一點的使用自定義的combiner的實現。code

 

Memory-conscious median and standard deviation

下面的例子跟前一個不一樣,並減小了內存的使用。把值放進列表會致使不少重複的元素。一種去重的方法是標記元素的個數。例如,對於列表< 1, 1, 1, 1, 2, 2, 3,4, 5, 5, 5 >,能夠用一個sorted map保存:(1→4, 2→2, 3→1, 4→1, 5→3)。核心的原理是同樣的:reduce階段會迭代全部值並放入內存數據結構中。數據結構和搜索的方式是改變的地方。Map很大程度上減小了內存的使用。前一個例子使用list,複雜度爲O(n),n是評論條數,本例使用map,使用鍵值對,爲O(max(m)),m是評論長度的最大值。做爲額外的補充,combiner的使用能幫助聚合評論長度的數目,並經過writable對象輸出reducer端將要使用的這個map。orm

 

問題:同前一個。對象

 

Mapper code。Mapper處理輸入記錄,輸出鍵是小時,值是sortedmapwritable對象,包含一個元素:評論長度和計數1.這個map在reducer和combiner裏多處用到。blog

 

    public static class MedianStdDevMapper extends Mapper<Object, Text, IntWritable, SortedMapWritable> {
        private IntWritable commentLength = new IntWritable();
        private static final LongWritable ONE = new LongWritable(1);
        private IntWritable outHour = new IntWritable();
        private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            Map<String, String> parsed = MRDPUtils.transformXmlToMap(value.toString());
            // Grab the "CreationDate" field,
            // since it is what we are grouping by
            String strDate = parsed.get("CreationDate");
            // Grab the comment to find the length
            String text = parsed.get("Text");
            // Get the hour this comment was posted in
            Date creationDate = null;
            try {
                creationDate = frmt.parse(strDate);
            } catch (ParseException e) {
                e.printStackTrace();
            }
            outHour.set(creationDate.getHours());
            commentLength.set(text.length());
            SortedMapWritable outCommentLength = new SortedMapWritable();
            outCommentLength.put(commentLength, ONE);
            // Write out the user ID with min max dates and count
            context.write(outHour, outCommentLength);
        }
    }

  

Reducer code。Reducer經過迭代上面的map生成一個大的treemap,key是評論長度,value是這個長度的評論的數目。排序

 

迭代之後,中值被計算出來。中值的索引由評論總數除以2得出。而後迭代treemap的entrySet找到key,需知足條件爲:previousCommentCount≤ medianIndex < commentCount,把treeMap的值加到每一步迭代的評論裏。一旦條件知足,若是有偶數條評論且中值索引等於前一條評論的,中值取前一個的長度和當前長度的平均值。不然,中值就是當前評論的長度。

 

接下來,再一次迭代treemap,計算出平方和,確保相關聯的評論長度和數目相乘。標準差就根據平方和算出來了。中值和標準差就隨着key一塊輸出。

public static class MedianStdDevReducer extends

Reducer<IntWritable, SortedMapWritable,

IntWritable, MedianStdDevTuple> {

private MedianStdDevTuple result = new MedianStdDevTuple();

private TreeMap<Integer, Long> commentLengthCounts =

new TreeMap<Integer, Long>();

public void reduce(IntWritable key, Iterable<SortedMapWritable>values,

Context context) throws IOException, InterruptedException {

float sum = 0;

long totalComments = 0;

commentLengthCounts.clear();

result.setMedian(0);

result.setStdDev(0);

for (SortedMapWritable v : values) {

for (Entry<WritableComparable, Writable> entry : v.entrySet()) {

int length = ((IntWritable) entry.getKey()).get();

long count = ((LongWritable) entry.getValue()).get();

totalComments += count;

sum += length * count;

Long storedCount = commentLengthCounts.get(length);

if (storedCount == null) {

commentLengthCounts.put(length, count);

else {

commentLengthCounts.put(length, storedCount + count);

}

}

}

long medianIndex = totalComments / 2L;

long previousComments = 0;

long comments = 0;

int prevKey = 0;

for (Entry<Integer, Long> entry : commentLengthCounts.entrySet()) {

comments = previousComments + entry.getValue();

if (previousComments ≤ medianIndex && medianIndex < comments) {

if (totalComments % 2 == 0 &&previousComments == medianIndex) {

result.setMedian((float) (entry.getKey() + prevKey) / 2.0f);

else {

result.setMedian(entry.getKey());

}

break;

}

previousComments = comments;

prevKey = entry.getKey();

}

// calculate standard deviation

float mean = sum / totalComments;

float sumOfSquares = 0.0f;

for (Entry<Integer, Long> entry : commentLengthCounts.entrySet()) {

sumOfSquares += (entry.getKey() - mean) * (entry.getKey() - mean) *

entry.getValue();

}

result.setStdDev((float) Math.sqrt(sumOfSquares / (totalComments - 1)));

context.write(key, result);

}

}

 

Combiner optimization。跟前面的例子不一樣,這裏combiner的邏輯跟reducer不一樣。Reducer計算中值和標準差,而combiner對每一個本地map的中間鍵值對聚合sortedMapWritable條目。代碼解析這些條目並在本地map聚合它們,這跟前面部分的reducer代碼是相同的。這裏用一個hashmap替換treemap,由於不須要排序,且hashmap更快。Reducer使用map計算中值和標準差,而combiner是用sortedMapWritable序列化爲reduce階段作準備。

 

public static class MedianStdDevCombiner extends

Reducer<IntWritable, SortedMapWritable, IntWritable, SortedMapWritable> {

protected void reduce(IntWritable key,

Iterable<SortedMapWritable>values, Context context)

throws IOException, InterruptedException {

SortedMapWritable outValue = new SortedMapWritable();

for (SortedMapWritable v : values) {

for (Entry<WritableComparable, Writable> entry : v.entrySet()) {

LongWritable count = (LongWritable) outValue.get(entry.getKey());

if (count != null) {

count.set(count.get()

+ ((LongWritable) entry.getValue()).get());

else {

outValue.put(entry.getKey(), new LongWritable(

((LongWritable) entry.getValue()).get()));

}

}

}

context.write(key, outValue);

}

}

 

Data flow diagram。圖2-4展現了例子的數據流程圖

 

Figure 2-4. Data flow for the standard deviation example

相關文章
相關標籤/搜索