中值和標準差的計算比前面的例子複雜一點。由於這種運算是非關聯的,它們不是那麼容易的能從combiner中獲益。中值是將數據集一分爲兩等份的數值類型,一份比中值大,一部分比中值小。這須要數據集按順序完成清洗。數據必須是排序的,但存在必定障礙,由於MapReduce不會根據values排序。java
方差告訴咱們數據跟平均值之間的差別程度。這就要求咱們以前要先找到平均值。執行這種操做最容易的方法是複製值得列表到臨時列表,以便找到中值,或者再一次迭代集合全部數據獲得標準差。對大的數據量,這種實現可能致使java堆空間的問題,引文每一個輸入組的每一個值都放進內存處理。下一個例子就是針對這種問題的。數據結構
問題:給出用戶評論,計算一天中每一個小時評論長度的中值和標準差。app
Mapper code。Mapper會處理每條輸入記錄計算一天內每一個小時評論長度的中值(貌似事實不是這樣)。輸出鍵是小時,輸出值是評論長度。post
public static class MedianStdDevMapper extends Mapper<Object, Text, IntWritable, IntWritable> { private IntWritable outHour = new IntWritable(); private IntWritable outCommentLength = new IntWritable(); private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { Map<String, String> parsed = MRDPUtils.transformXmlToMap(value.toString()); // Grab the "CreationDate" field, // since it is what we are grouping by String strDate = parsed.get("CreationDate"); // Grab the comment to find the length String text = parsed.get("Text"); // get the hour this comment was posted in Date creationDate = null; try { creationDate = frmt.parse(strDate); } catch (ParseException e) { e.printStackTrace(); } outHour.set(creationDate.getHours()); // set the comment length outCommentLength.set(text.length()); // write out the user ID with min max dates and count context.write(outHour, outCommentLength); } }
Reducer code。Reducer會迭代給定值得集合,並把每一個值加到內存列表裏。同時也會計算一個動態的sum和count。迭代以後,評論長度被排序,以便找出中值。若是數量是偶數,中值是中間兩個數的平均值。下面,根據動態的sum和count計算出平均值,而後迭代排序的列表計算出標準差。每一個數跟平均值的差的平方累加求和保存在一個動態sum中,這個sum的平方根就是標準差。最後輸出key,中值和標準差。this
public static class MedianStdDevReducer extends Reducer<IntWritable, IntWritable, IntWritable, MedianStdDevTuple> { private MedianStdDevTuple result = new MedianStdDevTuple(); private ArrayList<Double> commentLengths = new ArrayList<Double>(); public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { double sum = 0; double count = 0; commentLengths.clear(); result.setStdDev(0d); // Iterate through all input values for this key for (IntWritable val : values) { commentLengths.add((double) val.get()); sum += val.get(); ++count; } // sort commentLengths to calculate median Collections.sort(commentLengths); // if commentLengths is an even value, average middle two elements if (count % 2 == 0) { result.setMedian((commentLengths.get((int) count / 2 - 1) + commentLengths.get((int) count / 2)) / 2.0f); } else { // else, set median to middle value result.setMedian(commentLengths.get((int) count / 2)); } // calculate standard deviation double mean = sum / count; double sumOfSquares = 0.0f; for (double f : commentLengths) { sumOfSquares += (f - mean) * (f - mean); } result.setStdDev((double) Math.sqrt(sumOfSquares / (count - 1))); context.write(key, result); } }
Combiner optimization。這種狀況下不能用combiner。reducer須要全部的值去計算中值和標準差。由於combiner僅僅在一個map本地處理中間鍵值對。計算完整的中值,和標準值是不可能的。下面的例子是一種複雜一點的使用自定義的combiner的實現。code
下面的例子跟前一個不一樣,並減小了內存的使用。把值放進列表會致使不少重複的元素。一種去重的方法是標記元素的個數。例如,對於列表< 1, 1, 1, 1, 2, 2, 3,4, 5, 5, 5 >,能夠用一個sorted map保存:(1→4, 2→2, 3→1, 4→1, 5→3)。核心的原理是同樣的:reduce階段會迭代全部值並放入內存數據結構中。數據結構和搜索的方式是改變的地方。Map很大程度上減小了內存的使用。前一個例子使用list,複雜度爲O(n),n是評論條數,本例使用map,使用鍵值對,爲O(max(m)),m是評論長度的最大值。做爲額外的補充,combiner的使用能幫助聚合評論長度的數目,並經過writable對象輸出reducer端將要使用的這個map。orm
問題:同前一個。對象
Mapper code。Mapper處理輸入記錄,輸出鍵是小時,值是sortedmapwritable對象,包含一個元素:評論長度和計數1.這個map在reducer和combiner裏多處用到。blog
public static class MedianStdDevMapper extends Mapper<Object, Text, IntWritable, SortedMapWritable> { private IntWritable commentLength = new IntWritable(); private static final LongWritable ONE = new LongWritable(1); private IntWritable outHour = new IntWritable(); private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { Map<String, String> parsed = MRDPUtils.transformXmlToMap(value.toString()); // Grab the "CreationDate" field, // since it is what we are grouping by String strDate = parsed.get("CreationDate"); // Grab the comment to find the length String text = parsed.get("Text"); // Get the hour this comment was posted in Date creationDate = null; try { creationDate = frmt.parse(strDate); } catch (ParseException e) { e.printStackTrace(); } outHour.set(creationDate.getHours()); commentLength.set(text.length()); SortedMapWritable outCommentLength = new SortedMapWritable(); outCommentLength.put(commentLength, ONE); // Write out the user ID with min max dates and count context.write(outHour, outCommentLength); } }
Reducer code。Reducer經過迭代上面的map生成一個大的treemap,key是評論長度,value是這個長度的評論的數目。排序
迭代之後,中值被計算出來。中值的索引由評論總數除以2得出。而後迭代treemap的entrySet找到key,需知足條件爲:previousCommentCount≤ medianIndex < commentCount,把treeMap的值加到每一步迭代的評論裏。一旦條件知足,若是有偶數條評論且中值索引等於前一條評論的,中值取前一個的長度和當前長度的平均值。不然,中值就是當前評論的長度。
接下來,再一次迭代treemap,計算出平方和,確保相關聯的評論長度和數目相乘。標準差就根據平方和算出來了。中值和標準差就隨着key一塊輸出。
public static class MedianStdDevReducer extends
Reducer<IntWritable, SortedMapWritable,
IntWritable, MedianStdDevTuple> {
private MedianStdDevTuple result = new MedianStdDevTuple();
private TreeMap<Integer, Long> commentLengthCounts =
new TreeMap<Integer, Long>();
public void reduce(IntWritable key, Iterable<SortedMapWritable>values,
Context context) throws IOException, InterruptedException {
float sum = 0;
long totalComments = 0;
commentLengthCounts.clear();
result.setMedian(0);
result.setStdDev(0);
for (SortedMapWritable v : values) {
for (Entry<WritableComparable, Writable> entry : v.entrySet()) {
int length = ((IntWritable) entry.getKey()).get();
long count = ((LongWritable) entry.getValue()).get();
totalComments += count;
sum += length * count;
Long storedCount = commentLengthCounts.get(length);
if (storedCount == null) {
commentLengthCounts.put(length, count);
} else {
commentLengthCounts.put(length, storedCount + count);
}
}
}
long medianIndex = totalComments / 2L;
long previousComments = 0;
long comments = 0;
int prevKey = 0;
for (Entry<Integer, Long> entry : commentLengthCounts.entrySet()) {
comments = previousComments + entry.getValue();
if (previousComments ≤ medianIndex && medianIndex < comments) {
if (totalComments % 2 == 0 &&previousComments == medianIndex) {
result.setMedian((float) (entry.getKey() + prevKey) / 2.0f);
} else {
result.setMedian(entry.getKey());
}
break;
}
previousComments = comments;
prevKey = entry.getKey();
}
// calculate standard deviation
float mean = sum / totalComments;
float sumOfSquares = 0.0f;
for (Entry<Integer, Long> entry : commentLengthCounts.entrySet()) {
sumOfSquares += (entry.getKey() - mean) * (entry.getKey() - mean) *
entry.getValue();
}
result.setStdDev((float) Math.sqrt(sumOfSquares / (totalComments - 1)));
context.write(key, result);
}
}
Combiner optimization。跟前面的例子不一樣,這裏combiner的邏輯跟reducer不一樣。Reducer計算中值和標準差,而combiner對每一個本地map的中間鍵值對聚合sortedMapWritable條目。代碼解析這些條目並在本地map聚合它們,這跟前面部分的reducer代碼是相同的。這裏用一個hashmap替換treemap,由於不須要排序,且hashmap更快。Reducer使用map計算中值和標準差,而combiner是用sortedMapWritable序列化爲reduce階段作準備。
public static class MedianStdDevCombiner extends
Reducer<IntWritable, SortedMapWritable, IntWritable, SortedMapWritable> {
protected void reduce(IntWritable key,
Iterable<SortedMapWritable>values, Context context)
throws IOException, InterruptedException {
SortedMapWritable outValue = new SortedMapWritable();
for (SortedMapWritable v : values) {
for (Entry<WritableComparable, Writable> entry : v.entrySet()) {
LongWritable count = (LongWritable) outValue.get(entry.getKey());
if (count != null) {
count.set(count.get()
+ ((LongWritable) entry.getValue()).get());
} else {
outValue.put(entry.getKey(), new LongWritable(
((LongWritable) entry.getValue()).get()));
}
}
}
context.write(key, outValue);
}
}
Data flow diagram。圖2-4展現了例子的數據流程圖
Figure 2-4. Data flow for the standard deviation example