問題描述:如今有n個文本文件,使用MapReduce的方法實現詞頻統計。python
附上統計詞頻的關鍵代碼,首先是一個通用的MapReduce模塊:app
class MapReduce: __doc__ = '''提供map_reduce功能''' @staticmethod def map_reduce(i, mapper, reducer): """ map_reduce方法 :param i: 須要MapReduce的集合 :param mapper: 自定義mapper方法 :param reducer: 自定義reducer方法 :return: 以自定義reducer方法的返回值爲元素的一個列表 """ intermediate = [] # 存放全部的(intermediate_key, intermediate_value) for (key, value) in i.items(): intermediate.extend(mapper(key, value)) # sorted返回一個排序好的list,由於list中的元素是一個個的tuple,key設定按照tuple中第幾個元素排序 # groupby把迭代器中相鄰的重複元素挑出來放在一塊兒,key設定按照tuple中第幾個元素爲關鍵字來挑選重複元素 # 下面的循環中groupby返回的key是intermediate_key,而group是個list,是1個或多個 # 有着相同intermediate_key的(intermediate_key, intermediate_value) groups = {} for key, group in itertools.groupby(sorted(intermediate, key=lambda im: im[0]), key=lambda x: x[0]): groups[key] = [y for x, y in group] # groups是一個字典,其key爲上面說到的intermediate_key,value爲全部對應intermediate_key的intermediate_value # 組成的一個列表 return [reducer(intermediate_key, groups[intermediate_key]) for intermediate_key in groups]
而後須要針對詞頻統計這個實際問題寫好本身的mapper方法和reducer方法:測試
class WordCount: __doc__ = '''詞頻統計''' def mapper(self, input_key, input_value): """ 詞頻統計的mapper方法 :param input_key: 文件名 :param input_value: 文本內容 :return: 以(詞,1)爲元素的一個列表 """ return [(word, 1) for word in self.remove_punctuation(input_value.lower()).split()] def reducer(self, intermediate_key, intermediate_value_list): """ 詞頻統計的reducer方法 :param intermediate_key: 某個詞 :param intermediate_value_list: 出現記錄列表,如[1,1,1] :return: (詞,詞頻) """ return intermediate_key, sum(intermediate_value_list) @staticmethod def remove_punctuation(text): """ 去掉字符串中的標點符號 :param text: 文本 :return: 去掉標點的文本 """ return re.sub(u"\p{P}+", "", text)
用3個文本文件進行測試:大數據
text\a.tex:
The quick brown fox jumped over the lazy grey dogs.ui
text\b.txt:
That's one small step for a man, one giant leap for mankind.code
text\c.txt:
Mary had a little lamb,
Its fleece was white as snow;
And everywhere that Mary went,
The lamb was sure to go.blog
調用以下:排序
filenames = ["text\\a.txt", "text\\b.txt", "text\\c.txt"] i = {} for filename in filenames: f = open(filename) i[filename] = f.read() f.close() wc = WordCount() print(MapReduce.map_reduce(i, wc.mapper, wc.reducer))
輸出結果:rem
[('white', 1), ('little', 1), ('sure', 1), ('snow;', 1), ('went,', 1), ('as', 1), ('lamb,', 1), ('go.', 1), ('lamb', 1), ('its', 1), ('a', 1), ('was', 2), ('to', 1), ('fleece', 1), ('that', 1), ('the', 1), ('mary', 2), ('everywhere', 1), ('had', 1), ('and', 1)]
上面提出的方法只使用了最基本的MapReduce思想,因此不支持大數據量的測試,畢竟各類調度之類的內容沒有考慮到。字符串