基於spark2.0文本分詞+多分類模型

文本分類 spark算法

spark2.0開始引入dataframe做爲RDD的上層封裝,以屏蔽RDD層次的複雜操做,本文使用spark milib中ml機器學習庫進行新聞文本多分類預測,包含數據預預處理,分詞,標籤和特徵向量化轉換、多分類模型訓練(包含樸素貝葉斯、邏輯迴歸、決策樹和隨機森林),分類預測和模型評估等完整的機器學習demo。本文分詞方法選用HanLP分詞工具包(文檔豐富、算法公開、代碼開源,而且經測試分詞效果比較好)。apache

1.數據預處理

1.1文本數據

本文使用的數據爲4類新聞,每條數據包含標籤,標題,時間和新聞內容,以"\u00EF"符號做爲分割符,數據格式以下:數組

首頁|文化新聞ï第十一屆全國優秀舞蹈節目展演將在武漢舉辦ï2016-07-05 19:25:00ï新華社北京7月5日電(記者周瑋)由文化部、湖北省人民政府主辦的...
首頁|財經中心|財經頻道ï新通教育收購杭州藍海旅行社100%股權 發力出境遊學市場ï2016-07-04 21:49:00ï杭州7月4日電(胡豐盛)7月4日,新通教育...
首頁|軍事新聞ï環太軍演中國參演潛水分隊開展潛水事故應急醫學處置演練ï2016-07-04 19:40:00ï夏威夷7月4日電 (李純 於超)當地時間3日,參加...
首頁|體育新聞ï斯坦科維奇盃首戰將爲王治郅舉辦退役儀式ï2016-07-04 10:39:00ï週二晚上八點,中國男籃將在北京以里約奧運陣容出戰在國內...
複製代碼

1.2預處理流程

文本清洗 -> 標籤索引化 -> 內容文本分詞 -> 去除停用詞 -> 分詞取前5000個詞做爲特徵 -> 特徵向量化 -> 保存預處理模型 -> 調用預處理模型 -> 輸出預處理數據(indexedLabel,features)bash

1.3標籤索引化

首先將文本讀取成Dataframe格式,將標籤列數據索引化,{文化,經濟,軍事和體育}向量化後爲{0,1,2,3}數據結構

/** * 數據清洗 可根據具體數據結構和業務場景的不一樣進行重寫. 注意: 輸出必需要有標籤字段"label" * @param filePath 數據路徑 * @param spark SparkSession * @return 清洗後的數據, 包含字段: "label", "title", "time", "content" */
  def clean(filePath: String, spark: SparkSession): DataFrame = {
    import spark.implicits._
    val textDF = spark.sparkContext.textFile(filePath).flatMap { line =>
      val fields = line.split("\u00EF")   //分隔符:ï,分紅標籤,標題,時間,內容
      //首頁|文化新聞ï第十一屆全國優秀舞蹈節目展演將在武漢舉辦ï2016-07-05 19:25:00ï新華社北京7月5日電(記者周瑋)由文化部...
      //首頁|財經中心|財經頻道ï上半年浙江口岸原油進口量創同期歷史新高ï2016-07-04 21:54:00ï杭州7月4日...
      if (fields.length > 3) {
        val categoryLine = fields(0)
        val categories = categoryLine.split("\\|")
        val category = categories.last
        //分紅4個標籤名和其餘,最後去除標籤爲其餘的數據
        var label = "其餘"
        if (category.contains("文化")) label = "文化"
        else if (category.contains("財經")) label = "財經"
        else if (category.contains("軍事")) label = "軍事"
        else if (category.contains("體育")) label = "體育"
        else {}
        //輸出標籤,標題,時間,內容
        val title = fields(1)
        val time = fields(2)
        val content = fields(3)
        if (!label.equals("其餘")) Some(label, title, time, content) else None
      } else None
    }.toDF("label", "title", "time", "content")
    //輸出標籤,標題,時間,內容DF
    textDF
  }
  /** * 處理label轉換爲索引形式 * @param data 輸入label字段的數據 * @return 標籤索引模型, 模型增長字段: "indexedLabel" */
  def indexrize(data: DataFrame): StringIndexerModel = {
    val labelIndexer = new StringIndexer()
      .setInputCol("label")
      .setOutputCol("indexedLabel")
      .fit(data)
    labelIndexer
  }
複製代碼
predictDF.select("label","indexedLabel").show(10, truncate = false)
複製代碼

1.4內容字段分詞

處理內容字段,首先要進行分詞,而後去除停用詞以及轉換爲特徵向量,方便分類模型進行訓練和預測。本文模仿spark的ml包下的StopWordsRemover類建立了Segmenter類,用於對數據進行分詞,其內部調用了HanLP分詞工具。閉包

因爲spark自帶的StopWordsRemover等使用的閉包僅限於ml包,自定義的類沒法調用,故只是採用了與StopWordsRemover相似的使用形式,內部結構並不相同,而且因爲以上緣由,Segmenter類沒有繼承Transformer類,故沒法進行pipeline管道操做,故在分類模型超參數調優過程當中,沒有加入分詞模型的參數調優。app

/** * 分詞過程,包括"分詞", "去除停用詞" * @param data 輸入須要分詞的字段的數據"content" * @param params 分詞參數 * @return 分詞處理後的DataFrame,增長字段: "tokens", "removed" */
  def segment(data: DataFrame, params: PreprocessParam): DataFrame = {
    val spark = data.sparkSession
    //設置分詞模型
    val segmenter = new Segmenter()
      .setSegmentType(params.segmentType) //分詞方式
      .isDelEn(params.delEn)              //是否去除英語單詞
      .isDelNum(params.delNum)            //是否去除數字
      .addNature(params.addNature)        //是否添加詞性
      .setMinTermLen(params.minTermLen)   //最小詞長度
      .setMinTermNum(params.minTermNum)   //行最小詞數
      .setInputCol("content")             //輸入內容字段
      .setOutputCol("tokens")             //輸出分詞後的字段
    //進行分詞
    val segDF = segmenter.transform(data)
複製代碼

1.5去除停用詞

分詞以後,須要對一些經常使用的無心義詞如:「的」、「咱們」、「是」等(統稱爲「停用詞」)進行去除。這些詞沒有多大的意義,但這些詞不去掉會強烈的干擾咱們對特徵的抽取效果。(好比:在體育分類中,「的」出現500次,「足球」共出現300次,但顯然足球更能表示體育分類,而「的」反而影響體育分類的結果。dom

去除停用詞的操做咱們直接調用ml包中的StopWordsRemover類:eclipse

//讀取停用詞數據
    val stopWordArray = spark.sparkContext.textFile(params.stopwordFilePath).collect()
    //設置停用詞模型
    val remover = new StopWordsRemover()
      .setStopWords(stopWordArray)
      .setInputCol(segmenter.getOutputCol)   //讀取"tokens"字段
      .setOutputCol("removed")               //輸出刪除停用詞後的字段"removed"
    //刪除停用詞
    val removedDF = remover.transform(segDF)
    removedDF
  }
複製代碼

1.6特徵向量化

因爲目前經常使用的分類、聚類等算法都是基於向量空間模型VSM(即將對象向量化爲一個N維向量,映射成N維超空間中的一個點),VSM將數據轉換爲向量形式,便於對大規模數據進行矩陣操做等,也能夠經過計算超空間中兩個點之間的距離(通常是餘弦距離)來計算兩個向量之間的類似度。所以,咱們須要將通過處理的語料轉換爲向量形式,這個過程叫作向量化。機器學習

這裏咱們也調用spark提供的向量化類CountVectorizer類進行向量化操做:

/** * 特徵向量化處理,包括詞彙表過濾 * @param data 輸入向量化的字段"removed" * @param params 配置參數 * @return 向量模型 */
 def vectorize(data: DataFrame, params: PreprocessParam): CountVectorizerModel = {
   //設置向量模型
   val vectorizer = new CountVectorizer()
     .setVocabSize(params.vocabSize)
     .setInputCol("removed")
     .setOutputCol("features")
   val parentVecModel = vectorizer.fit(data)
   //過濾停用詞中沒有的數字features
   val numPattern = "[0-9]+".r
   val vocabulary = parentVecModel.vocabulary.flatMap {
     term => if (term.length == 1 || term.matches(numPattern.regex)) None else Some(term)
   }
   val vecModel = new CountVectorizerModel(Identifiable.randomUID("cntVec"), vocabulary)
     .setInputCol("removed")
     .setOutputCol("features")
   vecModel
 }
複製代碼

將字段"content"先進行分詞和去除停用詞獲得"removed",再將全部詞做爲特徵,進行特徵向量化獲得"features"字段:

在模型中能夠設置出現次數最多的前5000個詞做爲分類用的特徵,下圖5000後有兩個數組,第一個數值表示對應前5000個詞的第幾個詞,第二組表示對應第一組出現的詞在本條數據中的出現的次數,取出一條完整的數據看看:

1.7數據處理模型訓練、保存和調用

爲了方便每一個模型單獨訓練和預測,將預處理也做爲數據處理的模型進行訓練,保存和調用,方法以下:

/** * 訓練預處理模型 * @param filePath 數據路徑 * @param spark SparkSession * @return (預處理後的數據,索引模型,向量模型) * 數據包括字段: "label", "indexedLabel", "title", "time", "content", "tokens", "removed", "features" */
  def train(filePath: String, spark: SparkSession): (DataFrame, StringIndexerModel, CountVectorizerModel) = {

    val params = new PreprocessParam             //預處理參數
    val cleanDF = this.clean(filePath, spark)    //讀取DF,清洗數據
    val indexModel = this.indexrize(cleanDF)     //調用索引模型
    val indexDF = indexModel.transform(cleanDF)  //標籤索引化
    val segDF = this.segment(indexDF, params)    //將內容字段分詞
    val vecModel = this.vectorize(segDF, params) //調用向量模型
    val trainDF = vecModel.transform(segDF)      //內容分詞特徵向量化
    this.saveModel(indexModel, vecModel, params) //保存模型

    (trainDF, indexModel, vecModel)
  }
  /** * 擬合預處理模型 * @param filePath 數據路徑 * @param spark SparkSession * @return (預處理後的數據,索引模型,向量模型) */
  def predict(filePath: String, spark: SparkSession): (DataFrame, StringIndexerModel, CountVectorizerModel) = {
    val params = new PreprocessParam                    //預處理參數
    val cleanDF = this.clean(filePath, spark)           //讀取DF,清洗數據
    val (indexModel, vecModel) = this.loadModel(params) //加載索引和向量模型
    val indexDF = indexModel.transform(cleanDF)         //標籤索引化
    val segDF = this.segment(indexDF, params)           //內容字段分詞
    val predictDF = vecModel.transform(segDF)           //內容分詞特徵向量化
    (predictDF, indexModel, vecModel)
  }
複製代碼

調用預處理模型,數據處理後的結果取出5條:

+-----+--------------------+-------------------+--------------------+------------+--------------------+--------------------+--------------------+
|label|               title|               time|             content|indexedLabel|              tokens|             removed|            features|
+-----+--------------------+-------------------+--------------------+------------+--------------------+--------------------+--------------------+
|財經 |西南乳業巨頭新但願北..|2016-06-27 10:46:00|京華時報訊(記者胡笑...|         1.0|[京華, 時報訊, 記者, 胡笑紅...|[京華, 時報訊, 記者, 胡笑紅...|(5000,[3,4,14,22,...|
|文化 |全國篆刻名家大做在大..|2016-06-02 21:53:00|內江6月2日電王爵陳 ...|         3.0|[內江, 6月, 日電, 王爵, ...|[內江, 6月, 日電, 王爵, ...|(5000,[0,8,10,13,...|
|文化 |世界海洋日進入8天倒...|2016-05-31 15:38:00|北京5月31日電,記者...|         3.0|[北京, 5月, 日電, 記者, ...|[北京, 5月, 日電, 記者, ...|(5000,[0,3,10,13,...|
|軍事 |英媒評中國徵兵放寬體..|2016-06-02 08:30:00|參考消息網英媒稱,隨..|         0.0|[參考消息, 隨着, 中國軍隊, ...|[參考消息, 中國軍隊, 放寬, ...|(5000,[0,5,12,14,...|
|財經 |2016年二十國集團峯會..|2016-06-25 18:52:00|新華社廈門6月25日記...|         1.0|[新華社, 廈門, 6月, 日電,...|[新華社, 廈門, 6月, 日電,...|(5000,[3,8,10,12,...|
+-----+--------------------+-------------------+--------------------+------------+--------------------+--------------------+--------------------+
only showing top 5 rows
複製代碼

2.多分類模型訓練和超參數調優

本文選用了經常使用的4中多分類模型對文本數據進行訓練,利用了管道Pipeline + 網格搜索Gridsearch + 交叉驗證CrossValidator 進行參數調優,直接將參數調優放在了訓練模型裏,將獲得的最優模型保存。

2.1樸素貝葉斯

樸素貝葉斯算法原理

樸素貝葉斯算法是基於貝葉斯定理與特徵條件獨立假設的分類方法。

條件機率

P(A|B)表示事件B已經發生的前提下,事件A發生的機率,叫作事件B發生下事件A的條件機率。其基本求解公式爲:

貝葉斯定理即是基於條件機率,經過P(A|B)來求P(B|A):

特徵條件獨立假設

樸素貝葉斯模型

經常使用的模型主要有3個,多項式、伯努利和高斯模型:

  • 當特徵是離散的時候,使用多項式模型。
  • 伯努利模型也適用於離散特徵的狀況,所不一樣的是,伯努利模型中每一個特徵的取值只能是1和0,以文本分類爲例,某個單詞在文檔中出現過,則其特徵值爲1,不然爲0,而本文是把單詞出現的次數做爲特徵,因此不適應於伯努利模型
  • 當特徵是連續變量的時候,多項式模型及時加入平滑係數也很難描述分類特徵,所以須要使用高斯模型

平滑係數

超參數平滑係數α,做用是防止後驗機率爲0,當α = 1時,稱做Laplace平滑,當0 < α < 1時,稱做Lidstone平滑,α = 0時不作平滑。本文主要對平滑係數進行調參。

/** * NB模型訓練處理過程 * @param data 訓練數據集 * @return nbBestModel */
  def train(data: DataFrame): NaiveBayesModel = {
    val params = new ClassParam
    //NB分類模型管道訓練調參
    data.persist()
    data.show(5)
    //NB模型
    val nbModel = new NaiveBayes()
      .setModelType(params.nbModelType) //多項式模型或者伯努利模型
      .setSmoothing(params.smoothing)   //平滑係數
      .setLabelCol("indexedLabel")
      .setFeaturesCol("features")
    //創建管道,模型只有一個 stages = 0
    val pipeline = new Pipeline()
      .setStages(Array(nbModel))
    //創建網格搜索
    val paramGrid = new ParamGridBuilder()
      //.addGrid(nbModel.modelType, Array("multinomial", "bernoulli"))
      //伯努利模型須要特徵爲01的數據
      .addGrid(nbModel.smoothing, Array(0.01, 0.1, 0.2, 0.5))
      .build()
    //創建evaluator,必需要保證驗證的標籤列是向量化後的標籤
    val evaluator = new BinaryClassificationEvaluator()
      .setLabelCol("indexedLabel")
    //創建一個交叉驗證的評估器,設置評估器的參數
    val cv = new CrossValidator()
      .setEstimator(pipeline)
      .setEvaluator(evaluator)
      .setEstimatorParamMaps(paramGrid)
      .setNumFolds(2)
    //運行交叉驗證評估器,獲得最佳參數集的模型
    val cvModel = cv.fit(data)
    //獲取最優邏輯迴歸模型
    val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel]
    val bestNBModel = bestModel.stages(0).asInstanceOf[NaiveBayesModel]
    println("類的數量(標籤可使用的值): " + bestNBModel.numClasses)
    println("模型所接受的特徵的數量: " + bestNBModel.numFeatures)
    println("最優的modelType的值爲: "+ bestNBModel.explainParam(bestNBModel.modelType))
    println("最優的smoothing的值爲: "+ bestNBModel.explainParam(bestNBModel.smoothing))
    //更新最優樸素貝葉斯模型,並訓練數據
    val nbBestModel = new NaiveBayes()
      .setModelType(bestNBModel.getModelType) //多項式模型或者伯努利模型
      .setSmoothing(bestNBModel.getSmoothing) //平滑係數
      .setLabelCol("indexedLabel")
      .setFeaturesCol("features")
      .fit(data)

    this.saveModel(nbBestModel, params)
    data.unpersist()
    nbBestModel
  }
複製代碼

後續的三個算法原理網上都有不少,訓練的代碼也相似,本文只給出模型調參的部分代碼。

2.2邏輯迴歸

//LR模型
    val lrModel = new LogisticRegression()
      .setMaxIter(bestLRModel.getMaxIter)    //模型最大迭代次數
      .setRegParam(bestLRModel.getRegParam)  //正則化參數
      .setElasticNetParam(params.elasticNetParam) //L1範式比例, L1/(L1 + L2)
      .setTol(params.converTol)          //模型收斂閾值
      .setLabelCol("indexedLabel")       //設置索引化標籤字段
      .setFeaturesCol("features")        //設置向量化文本特徵字段

    //創建網格搜索
    val paramGrid = new ParamGridBuilder()
      .addGrid(lrModel.maxIter, Array(5, 10))
      .addGrid(lrModel.regParam, Array(0.1, 0.2))
      .build()
複製代碼

2.3決策樹

//決策樹模型
    val dtModel = new DecisionTreeClassifier()
      .setMinInfoGain(params.minInfoGain)  //最小信息增益閾值
      .setMaxDepth(params.maxDepth)        //決策樹最大深度
      .setImpurity(params.impurity)        //節點不純度和信息增益方法gini, entropy
      .setLabelCol("indexedLabel")         //設置索引化標籤字段
      .setFeaturesCol("features")          //設置向量化文本特徵字段
    //創建網格搜索
    val paramGrid = new ParamGridBuilder()
      .addGrid(dtModel.minInfoGain, Array(0.0, 0.1))
      .addGrid(dtModel.maxDepth, Array(10, 20))
      .addGrid(dtModel.impurity, Array("gini", "entropy"))
      .build()
複製代碼

2.4隨機森林

隨機森林模型經常須要調試以提升算法效果的兩個參數:numTrees,maxDepth

  • numTrees:增長決策樹的個數會下降預測結果的方差,這樣在測試時會有更高的accuracy。訓練時間大體與numTrees呈線性增加關係
  • maxDepth:限定決策樹的最大可能深度。最終的決策樹的深度可能要比maxDepth小
  • minInfoGain:最小信息增益(設置閾值),但因爲其它終止條件或者是被剪枝的緣故小於該值將不帶繼續分叉
  • maxBins:連續特徵離散化時選用的最大分桶個數,而且決定每一個節點如何分裂。(25,28,31)
  • impurity:計算信息增益的指標,熵和gini不純度("entropy", "gini")
  • minInstancesPerNode:若是某個節點的樣本數量小於該值,則該節點將再也不被分叉。(設置閾值)
  • auto:在每一個節點分裂時是否自動選擇參與的特徵個數
  • seed:隨機數生成種子

實際上要想得到一個適當的閾值是至關困難的。高閾值可能致使過度簡化的樹,而低閾值可能簡化不夠。

預剪枝方法 minInfoGain、minInstancesPerNode 其實是經過不斷修改中止條件來獲得合理的結果,這並非一個好辦法,事實上 咱們經常甚至不知道要尋找什麼樣的結果。這樣就須要對樹進行後剪枝了(後剪枝不須要用戶指定參數,是更爲理想化的剪枝方法)

//隨機森林模型(不加fit)
    val rfModel = new RandomForestClassifier()
      .setMaxDepth(params.maxDepth)          //決策樹最大深度
      .setNumTrees(params.numTrees)          //設置決策樹個數
      .setMinInfoGain(params.minInfoGain)  //最小信息增益閾值
      .setImpurity(params.impurity)        //信息增益的指標,選擇熵或者gini不純度
      //.setMaxBins(params.maxBins) //最大分桶個數,用於連續特徵離散化時決定每一個節點如何分裂
      .setLabelCol("indexedLabel")           //設置索引化標籤字段
      .setFeaturesCol("features")            //設置向量化文本特徵字段
//創建網格搜索
    val paramGrid = new ParamGridBuilder()
      .addGrid(rfModel.maxDepth, Array(5, 10, 20))
      .addGrid(rfModel.numTrees, Array(5, 10, 20))
      .addGrid(rfModel.minInfoGain, Array(0.0, 0.1, 0.5))
      .build()
複製代碼

3.多分類模型預測和模型評估

3.1模型評估類MulticlassClassificationEvaluator

機器學期通常都須要一個量化指標來衡量其效果:這個模型的準確率、召回率和F1值(這3個指標是評判模型預測能力經常使用的一組指標),spark提供了用於多分類模型評估的類MulticlassClassificationEvaluator,並將3個指標同時輸出

object Evaluations extends Serializable {
  /** * 多分類結果評估 * @param data 分類結果 * @return (準確率, 召回率, F1) */
  def multiClassEvaluate(data: RDD[(Double, Double)]): (Double, Double, Double) = {
    val metrics = new MulticlassMetrics(data)
    val weightedPrecision = metrics.weightedPrecision
    val weightedRecall = metrics.weightedRecall
    val f1 = metrics.weightedFMeasure

    (weightedPrecision, weightedRecall, f1)
  }
}
複製代碼

3.2四個多分類模型預測結果和模型評估

以邏輯迴歸爲例,預測結果以下圖,"probability"中4個值表示4個類別的預測機率:

4個分類模型的評估結果以下:

評估模型代碼:

/** * Description: 多分類模型預測結果評估對比 * Created by wy in 2019/4/16 10:07 */
object MultiClassEvalution {

  def main(args: Array[String]): Unit = {

    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
    Logger.getLogger("org").setLevel(Level.ERROR)

    val spark = SparkSession
      .builder
      .master("local")
      .appName("Multi_Class_Evaluation_Demo")
      .getOrCreate()

    val filePath = "data/dataTest/predict"

    //預處理(清洗、分詞、向量化)
    val preprocessor = new Preprocessor
    val (predictDF, indexModel, _) = preprocessor.predict(filePath, spark)

    predictDF.select("content","removed", "features").show(1, truncate = false)
    //樸素貝葉斯模型預測
    val nbClassifier = new NBClassifier
    val nbPredictions = nbClassifier.predict(predictDF, indexModel)

    //邏輯迴歸模型預測
    val lrClassifier = new LRClassifier //import Classification.LogisticRegression.LRClassifier
    val lrPredictions = lrClassifier.predict(predictDF, indexModel)

    //決策樹模型預測
    val dtClassifier = new DTClassifier
    val dtPredictions = dtClassifier.predict(predictDF, indexModel)

    //隨機森林模型預測
    val rfClassifier = new RFClassifier
    val rfPredictions = rfClassifier.predict(predictDF, indexModel)

    //多個模型評估
    val predictions = Seq(nbPredictions, lrPredictions, dtPredictions, rfPredictions)
    val classNames = Seq("樸素貝葉斯模型", "邏輯迴歸模型", "決策樹模型", "隨機森林模型")

    for (i <- 0 to 3) {
      val prediction = predictions(i)
      val className = classNames(i)

      val resultRDD = prediction.select("prediction", "indexedLabel").rdd.map {
        case Row(prediction: Double, label: Double) => (prediction, label)
      }

      val (precision, recall, f1) = Evaluations.multiClassEvaluate(resultRDD)
      println(s"\n========= $className 評估結果 ==========")
      println(s"加權準確率:$precision")
      println(s"加權召回率:$recall")
      println(s"F1值:$f1")
    }
  }
}
複製代碼
相關文章
相關標籤/搜索