Spark應用HanLP對中文語料進行文本挖掘--聚類詳解教程

軟件:IDEA201四、Maven、HanLP、JDK;html

用到的知識:HanLP、Spark TF-IDF、Spark kmeans、Spark mapPartition;java

用到的數據集:http://www.threedweb.cn/thread-1288-1-1.html(不須要下載,已經包含在工程裏面);git

工程下載:https://github.com/fansy1990/hanlp-test 。github

一、問題描述web

如今有一箇中文文本數據集,這個數據集已經對其中的文本作了分類,以下:算法

其中每一個文件夾中含有個數不等的文件,好比環境有200個,藝術有248個;同時,每一個文件的內容基本上就是一些新聞報道或者中文描述,以下:apache

如今須要作的就是,把這些文檔進行聚類,看其和原始給定的類別的重合度有多少,這樣也能夠反過來驗證咱們聚類算法的正確度。app

2.、解決思路:框架

2.1 文本預處理:ide

1. 因爲文件的編碼是GBK的,讀取到Spark中所有是亂碼,因此先使用Java把代碼轉爲UTF8編碼;  

2. 因爲文本存在多個文件中(大概2k多),使用Spark的wholeTextFile讀取速度太慢,因此考慮把這些文件所有合併爲一個文件,這時又結合1.的轉變編碼,因此在轉變編碼的時候就直接把全部的數據存入同一個文件中;

其存儲的格式爲: 每行: 文件名.txt\t文件內容

如:  41.txt 【 日  期 】199601....

這樣子的話,就能夠經過.txt\t 來對每行文本進行分割,獲得其文件名以及文件內容,這裏每行其實就是一個文件了。

2.2 分詞

分詞直接採用HanLP的分詞來作,HanLP這裏選擇兩種:Standard和NLP(還有一種就是HighSpeed,可是這個木有用戶自定義詞典,因此前期考慮先用兩種),具體參考:https://github.com/hankcs/HanLP ;

2.3 詞轉換爲詞向量

在Kmeans算法中,一個樣本須要使用數值類型,因此須要把文本轉爲數值向量形式,這裏在Spark中有兩種方式。其一,是使用TF-IDF;其二,使用Word2Vec。這裏暫時使用了TF-IDF算法來進行,這個算法須要提供一個numFeatures,這個值越大其效果也越好,可是相應的計算時間也越長,後面也能夠經過實驗驗證。

2.4 使用每一個文檔的詞向量進行聚類建模

在進行聚類建模的時候,須要提供一個初始的聚類個數,這裏面設置爲10,由於咱們的數據是有10個分組的。可是在實際的狀況下,通常這個值是須要經過實驗來驗證獲得的。

2.5 對聚類後的結果進行評估

這裏面採用的思路是:

1. 獲得聚類模型後,對原始數據進行分類,獲得原始文件名和預測的分類id的二元組(fileName,predictId);

2. 針對(fileName,predictId),獲得(fileNameFirstChar ,fileNameFirstChar.toInt - predictId)的值,這裏須要注意的是fileNameFirstChar其實就是表明這個文件的原始所屬類別了。

3. 這裏有一個通常假設,就是使用kmeans模型預測獲得的結果大多數是正確的,因此fileNameFirstChar.toInt-predictId獲得的衆數其實就是分類的正確的個數了(這裏可能比較難以理解,後面會有個小李子來講明這個問題);

4. 獲得每一個實際類別的預測的正確率後就能夠去平均預測率了。

5. 改變numFeatuers的值,看下是否numFeatures設置的比較大,其正確率也會比較大?

三、具體步驟:

3.1 開發環境--Maven

首先第一步,固然是開發環境了,由於用到了Spark和HanLP,因此須要在pom.xml中加入這兩個依賴:

1. <!-- 中文分詞框架 -->

2.<dependency>

3.<groupId>com.hankcs</groupId>

4.<artifactId>hanlp</artifactId>

5.<version>${hanlp.version}</version>

6.</dependency>

7.<!-- Spark dependencies -->

8.<dependency>

9.<groupId>org.apache.spark</groupId>

10.<artifactId>spark-core_2.10</artifactId>

11.<version>${spark.version}</version>

12.</dependency>

13.<dependency>

14.<groupId>org.apache.spark</groupId>

15.<artifactId>spark-mllib_2.10</artifactId>

16.<version>${spark.version}</version>

17.</dependency>

其版本爲:

<hanlp.version>portable-1.3.4</hanlp.version>、 <spark.version>1.6.0-cdh5.7.3</spark.version>。

3.2 文件轉爲UTF-8編碼及存儲到一個文件

 

這部份內容能夠直接參考:src/main/java/demo02_transform_encoding.TransformEncodingToOne 這裏的實現,由於是Java基本的操做,這裏就不加以分析了。

3.3 Scala調用HanLP進行中文分詞

Scala調用HanLP進行分詞和Java的是同樣的,同時,由於這裏有些詞語格式不正常,因此把這些特殊的詞語添加到自定義詞典中,其示例以下:

1.import com.hankcs.hanlp.dictionary.CustomDictionary

2.import com.hankcs.hanlp.dictionary.stopword.CoreStopWordDictionary

3.import com.hankcs.hanlp.tokenizer.StandardTokenizer

4.import scala.collection.JavaConversions._

5./**

6.* Scala 分詞測試

7.* Created by fansy on 2017/8/25.

 8.*/

9.object SegmentDemo {

10.def main(args: Array[String]) {

11.val sentense = "41,【 日 期 】19960104 【 版 號 】1 【 標 題 】合巢蕪高速公路巢蕪段竣工 【 做 者 】彭建中 【 正 文 】 安徽合(肥)巢(湖)蕪(湖)高速公路巢蕪段日前竣工通車並投入營運。合巢蕪 高速公路是國家規劃的京福綜合運輸網的重要幹線路段,是交通部肯定1995年建成 的全國10條重點公路之一。該條高速公路正線長88千米。(彭建中)"

12.CustomDictionary.add("日 期")

13.CustomDictionary.add("版 號")

14.CustomDictionary.add("標 題")

15.CustomDictionary.add("做 者")

16.CustomDictionary.add("正 文")

17.val list = StandardTokenizer.segment(sentense)

18.CoreStopWordDictionary.apply(list)

19.println(list.map(x => x.word.replaceAll(" ","")).mkString(","))

20.}

21.}

運行完成後,便可獲得分詞的結果,以下:

考慮到使用方便,這裏把分詞封裝成一個函數:

1./**

2.* String 分詞

3.* @param sentense

4.* @return

5.*/

6.def transform(sentense:String):List[String] ={

7.val list = StandardTokenizer.segment(sentense)

8.CoreStopWordDictionary.apply(list)

9.list.map(x => x.word.replaceAll(" ","")).toList

10.}

11.}

 

輸入便是一箇中文的文本,輸出就是分詞的結果,同時去掉了一些經常使用的停用詞。

3.4 求TF-IDF

在Spark裏面求TF-IDF,能夠直接調用Spark內置的算法模塊便可,同時在Spark的該算法模塊中還對求得的結果進行了維度變換(能夠理解爲特徵選擇或「降維」,固然這裏的降維多是提高維度)。代碼以下:

1.val docs = sc.textFile(input_data).map{x => val t = x.split(".txt\t");(t(0),transform(t(1)))}

2..toDF("fileName", "sentence_words")

3.

4.// 3. 求TF

5.println("calculating TF ...")

6.val hashingTF = new HashingTF()

7..setInputCol("sentence_words").setOutputCol("rawFeatures").setNumFeatures(numFeatures)

8.val featurizedData = hashingTF.transform(docs)

9.

10.// 4. 求IDF

11.println("calculating IDF ...")

12.val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")

13.val idfModel = idf.fit(featurizedData)

14.val rescaledData = idfModel.transform(featurizedData).cache()

變量docs是一個DataFrame[fileName, sentence_words] ,通過HashingTF後,變成了變量 featurizedData ,一樣是一個DataFrame[fileName,sentence_words, rawFeatures]。這裏經過setInputCol以及SetOutputCol能夠設置輸入以及輸出列名(列名是針對DataFrame來講的,不知道的能夠看下DataFrame的API)。

接着,通過IDF模型,獲得變量 rescaledData ,其DataFrame[fileName,sentence_words, rawFeatures, features] 。

執行結果爲:

3.5 創建KMeans模型

 

直接參考官網給定例子便可:

1.println("creating kmeans model ...")

2.val kmeans = new KMeans().setK(k).setSeed(1L)

3.val model = kmeans.fit(rescaledData)

4.// Evaluate clustering by computing Within Set Sum of Squared Errors.

5.println("calculating wssse ...")

6.val WSSSE = model.computeCost(rescaledData)

7.println(s"Within Set Sum of Squared Errors = $WSSSE")

這裏有計算cost值的,可是這個值評估不是很準確,好比我numFeature設置爲2000的話,那麼這個值就很大,可是其實其正確率會比較大的。

3.6 模型評估

這裏的模型評估直接使用一個小李子來講明:好比,如今有這樣的數據:

其中,1開頭,2開頭和4開頭的屬於同一類文檔,後面的0,3,2,1等,表明這個文檔被模型分類的結果,那麼能夠很容易的看出針對1開頭的文檔,

其分類正確的有4個,其中("123.txt",3)以及(「126.txt」,1)是分類錯誤的結果,這是由於,在這個類別中預測的結果中0是最多的,因此0是和1開頭的文檔對應起來的,這也就是前面的假設。

1. 把同一類文檔分到同一個partition中;

1.val data = sc.parallelize(t)

2.val file_index = data.map(_._1.charAt(0)).distinct.zipWithIndex().collect().toMap

3.println(file_index)

4.val partitionData = data.partitionBy(MyPartitioner(file_index))

這裏的file_index,是對不一樣類的文檔進行編號,這個編號就對應每一個partition,看MyPartitioner的實現:

1.case class MyPartitioner(file_index:Map[Char,Long]) extends Partitioner

2.override def getPartition(key: Any): Int = key match {

3.case _ => file_index.getOrElse(key.toString.charAt(0),0L).toInt

4.}

 

5..override def numPartitions: Int = file_index.size

6.}

2. 針對每一個partition進行整合操做:

在整合每一個partition以前,咱們先看下咱們自定義的MyPartitioner是否在正常工做,能夠打印下結果:

1.val tt = partitionData.mapPartitionsWithIndex((index: Int, it: Iterator[(String,Int)]) => it.toList.map(x => (index,x)).toIterator)

2.tt.collect().foreach(println(_))

運行以下:

其中第一列表明每一個partition的id,第二列是數據,發現其數據確實是按照預期進行處理的;接着能夠針對每一個partition進行數據整合:

1.// firstCharInFileName , firstCharInFileName - predictType

2.val combined = partitionData.map(x =>( (x._1.charAt(0), Integer.parseInt(x._1.charAt(0)+"") - x._2),1) )

3..mapPartitions{f => var aMap = Map[(Char,Int),Int]();

4.for(t <- f){

5.if (aMap.contains(t._1)){

6.aMap = aMap.updated(t._1,aMap.getOrElse(t._1,0)+1)

7.}else{

8.aMap = aMap + t

9.}

10.}

11.val aList = aMap.toList

12.val total= aList.map(_._2).sum

13.val total_right = aList.map(_._2).max

14.List((aList.head._1._1,total,total_right)).toIterator

15.// aMap.toIterator //打印各個partition的總結

16. }

在整合以前先執行一個map操做,把數據變成((fileNameFirstChar, fileNameFirstChar.toInt - predictId), 1),其中fileNameFirstChar表明文件的第一個字符,其實也就是文件的所屬實際類別,後面的fileNameFirstChar.toInt-predictId 其實就是判斷預測的結果是否對了,這個值的衆數就是預測對的;最後一個值代碼前面的這個鍵值對出現的次數,其實就是統計屬於某個類別的實際文件個數以及預測對的文件個數,分別對應上面的total和total_right變量;輸出結果爲:

(4,6,3)

(1,6,4)

(2,6,4)

發現其打印的結果是正確的,第一列表明文件名開頭,第二個表明屬於這個文件的個數,第三列表明預測正確的個數

這裏須要注意的是,這裏由於文本的實際類別和文件名是一致的,因此才能夠這樣處理,若是實際數據的話,那麼mapPartitions函數須要更改。

3. 針對數據結果進行統計:

最後只須要進行簡單的計算便可:

1.for(re <- result ){

2.println("文檔"+re._1+"開頭的 文檔總數:"+ re._2+",分類正確的有:"+re._3+",分類正確率是:"+(re._3*100.0/re._2)+"%")

3.}

4.val averageRate = result.map(_._3).sum *100.0 / result.map(_._2).sum

5.println("平均正確率爲:"+averageRate+"%")

輸出結果爲:

4. 實驗

  設置不一樣的numFeature,好比使用200和2000,其對比結果爲:

因此設置numFeatures值越大,其準確率也越高,不過計算也比較複雜。

 5. 總結

1. HanLP的使用相對比較簡單,這裏只使用了分詞及停用詞,感謝開源;

2. Spark裏面的TF-IDF以及Word2Vector使用比較簡單,不過使用這個須要先分詞;

3. 這裏是在IDEA裏面運行的,若是使用Spark-submit的提交方式,那麼須要把hanpl的jar包加入,這個有待驗證

 

文章來源於fansy1990的博客

相關文章
相關標籤/搜索