最近迷上了spark,寫一個專門處理語料庫生成詞庫的項目拿來練練手, github地址:https://github.com/LiuRoy/spark_splitter。代碼實現參考wordmaker項目,有興趣的能夠看一下,此項目用到了很多很tricky的技巧提高性能,單純只想看懂源代碼能夠參考wordmaker做者的一份簡單版代碼。python
這個項目統計語料庫的結果和執行速度都還不錯,但缺點也很明顯,只能處理GBK編碼的文檔,並且不能分佈式運行,恰好最近在接觸spark,因此用python實現了裏面的算法,使之能處理更大規模的語料庫,而且同時支持GBK和UTF8兩種編碼格式。git
wordmaker提供了一個統計大規模語料庫詞彙的算法,和結巴分詞的原理不一樣,它不依賴已經統計好的詞庫或者隱馬爾可夫模型,可是一樣能獲得不錯的統計結果。原做者的文檔提到是用多個線程獨立計算各個文本塊的詞的信息,再按詞的順序分段合併,再計算各個段的字可能組成詞的機率、左右熵,獲得詞語輸出。下面就詳細的講解各個步驟:github
原始的C++代碼挺長,可是用python改寫以後不多,上文中的1 2 3步用spark實現很是簡單,代碼在split函數中。第3部過濾後的結果已經相對較小,能夠直接取出放入內存中,再計算熵過濾,在split中執行target_phrase_rdd.filter(lambda x: _filter(x))
過濾的時候能夠phrasedictmap作成spark中的廣播變量,提高分佈式計算的效率,由於只有一臺機器,因此就沒有這樣作。split代碼以下,原來很長的代碼用python很容易就實現了。算法
1 def split(self): 2 """spark處理""" 3 raw_rdd = self.sc.textFile(self.corpus_path) 4 5 utf_rdd = raw_rdd.map(lambda line: str_decode(line)) 6 hanzi_rdd = utf_rdd.flatMap(lambda line: extract_hanzi(line)) 7 8 raw_phrase_rdd = hanzi_rdd.flatMap(lambda sentence: cut_sentence(sentence)) 9 10 phrase_rdd = raw_phrase_rdd.reduceByKey(lambda x, y: x + y) 11 phrase_dict_map = dict(phrase_rdd.collect()) 12 total_count = 0 13 for _, freq in phrase_dict_map.iteritems(): 14 total_count += freq 15 16 def _filter(pair): 17 phrase, frequency = pair 18 max_ff = 0 19 for i in xrange(1, len(phrase)): 20 left = phrase[:i] 21 right = phrase[i:] 22 left_f = phrase_dict_map.get(left, 0) 23 right_f = phrase_dict_map.get(right, 0) 24 max_ff = max(left_f * right_f, max_ff) 25 return total_count * frequency / max_ff > 100.0 26 27 target_phrase_rdd = phrase_rdd.filter(lambda x: len(x[0]) >= 2 and x[1] >= 3) 28 result_phrase_rdd = target_phrase_rdd.filter(lambda x: _filter(x)) 29 self.result_phrase_set = set(result_phrase_rdd.keys().collect()) 30 self.phrase_dict_map = {key: PhraseInfo(val) for key, val in phrase_dict_map.iteritems()}
進入spark_splitter/splitter目錄,執行命令PYTHONPATH=. spark-submit spark.py
處理test/moyan.txt文本,是莫言全集,統計完成的結果在out.txt中,統計部分的結果以下,因爲算法的緣由,帶有 的 地 這種連詞詞語不能正確的切分。分佈式
(我也不知道爲何這個詞這麼多)函數