文本分類
spark
算法
spark2.0開始引入dataframe做爲RDD的上層封裝,以屏蔽RDD層次的複雜操做,本文使用spark milib中ml機器學習庫進行新聞文本多分類預測,包含數據預預處理,分詞,標籤和特徵向量化轉換、多分類模型訓練(包含樸素貝葉斯、邏輯迴歸、決策樹和隨機森林),分類預測和模型評估等完整的機器學習demo。本文分詞方法選用HanLP分詞工具包(文檔豐富、算法公開、代碼開源,而且經測試分詞效果比較好)。apache
本文使用的數據爲4類新聞,每條數據包含標籤,標題,時間和新聞內容,以"\u00EF"符號做爲分割符,數據格式以下:數組
首頁|文化新聞ï第十一屆全國優秀舞蹈節目展演將在武漢舉辦ï2016-07-05 19:25:00ï新華社北京7月5日電(記者周瑋)由文化部、湖北省人民政府主辦的...
首頁|財經中心|財經頻道ï新通教育收購杭州藍海旅行社100%股權 發力出境遊學市場ï2016-07-04 21:49:00ï杭州7月4日電(胡豐盛)7月4日,新通教育...
首頁|軍事新聞ï環太軍演中國參演潛水分隊開展潛水事故應急醫學處置演練ï2016-07-04 19:40:00ï夏威夷7月4日電 (李純 於超)當地時間3日,參加...
首頁|體育新聞ï斯坦科維奇盃首戰將爲王治郅舉辦退役儀式ï2016-07-04 10:39:00ï週二晚上八點,中國男籃將在北京以里約奧運陣容出戰在國內...
複製代碼
文本清洗 -> 標籤索引化 -> 內容文本分詞 -> 去除停用詞 -> 分詞取前5000個詞做爲特徵 -> 特徵向量化 -> 保存預處理模型 -> 調用預處理模型 -> 輸出預處理數據(indexedLabel,features)bash
首先將文本讀取成Dataframe格式,將標籤列數據索引化,{文化,經濟,軍事和體育}向量化後爲{0,1,2,3}數據結構
/** * 數據清洗 可根據具體數據結構和業務場景的不一樣進行重寫. 注意: 輸出必需要有標籤字段"label" * @param filePath 數據路徑 * @param spark SparkSession * @return 清洗後的數據, 包含字段: "label", "title", "time", "content" */
def clean(filePath: String, spark: SparkSession): DataFrame = {
import spark.implicits._
val textDF = spark.sparkContext.textFile(filePath).flatMap { line =>
val fields = line.split("\u00EF") //分隔符:ï,分紅標籤,標題,時間,內容
//首頁|文化新聞ï第十一屆全國優秀舞蹈節目展演將在武漢舉辦ï2016-07-05 19:25:00ï新華社北京7月5日電(記者周瑋)由文化部...
//首頁|財經中心|財經頻道ï上半年浙江口岸原油進口量創同期歷史新高ï2016-07-04 21:54:00ï杭州7月4日...
if (fields.length > 3) {
val categoryLine = fields(0)
val categories = categoryLine.split("\\|")
val category = categories.last
//分紅4個標籤名和其餘,最後去除標籤爲其餘的數據
var label = "其餘"
if (category.contains("文化")) label = "文化"
else if (category.contains("財經")) label = "財經"
else if (category.contains("軍事")) label = "軍事"
else if (category.contains("體育")) label = "體育"
else {}
//輸出標籤,標題,時間,內容
val title = fields(1)
val time = fields(2)
val content = fields(3)
if (!label.equals("其餘")) Some(label, title, time, content) else None
} else None
}.toDF("label", "title", "time", "content")
//輸出標籤,標題,時間,內容DF
textDF
}
/** * 處理label轉換爲索引形式 * @param data 輸入label字段的數據 * @return 標籤索引模型, 模型增長字段: "indexedLabel" */
def indexrize(data: DataFrame): StringIndexerModel = {
val labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
.fit(data)
labelIndexer
}
複製代碼
predictDF.select("label","indexedLabel").show(10, truncate = false)
複製代碼
處理內容字段,首先要進行分詞,而後去除停用詞以及轉換爲特徵向量,方便分類模型進行訓練和預測。本文模仿spark的ml包下的StopWordsRemover類建立了Segmenter類,用於對數據進行分詞,其內部調用了HanLP分詞工具。閉包
因爲spark自帶的StopWordsRemover等使用的閉包僅限於ml包,自定義的類沒法調用,故只是採用了與StopWordsRemover相似的使用形式,內部結構並不相同,而且因爲以上緣由,Segmenter類沒有繼承Transformer類,故沒法進行pipeline管道操做,故在分類模型超參數調優過程當中,沒有加入分詞模型的參數調優。app
/** * 分詞過程,包括"分詞", "去除停用詞" * @param data 輸入須要分詞的字段的數據"content" * @param params 分詞參數 * @return 分詞處理後的DataFrame,增長字段: "tokens", "removed" */
def segment(data: DataFrame, params: PreprocessParam): DataFrame = {
val spark = data.sparkSession
//設置分詞模型
val segmenter = new Segmenter()
.setSegmentType(params.segmentType) //分詞方式
.isDelEn(params.delEn) //是否去除英語單詞
.isDelNum(params.delNum) //是否去除數字
.addNature(params.addNature) //是否添加詞性
.setMinTermLen(params.minTermLen) //最小詞長度
.setMinTermNum(params.minTermNum) //行最小詞數
.setInputCol("content") //輸入內容字段
.setOutputCol("tokens") //輸出分詞後的字段
//進行分詞
val segDF = segmenter.transform(data)
複製代碼
分詞以後,須要對一些經常使用的無心義詞如:「的」、「咱們」、「是」等(統稱爲「停用詞」)進行去除。這些詞沒有多大的意義,但這些詞不去掉會強烈的干擾咱們對特徵的抽取效果。(好比:在體育分類中,「的」出現500次,「足球」共出現300次,但顯然足球更能表示體育分類,而「的」反而影響體育分類的結果。dom
去除停用詞的操做咱們直接調用ml包中的StopWordsRemover類:eclipse
//讀取停用詞數據
val stopWordArray = spark.sparkContext.textFile(params.stopwordFilePath).collect()
//設置停用詞模型
val remover = new StopWordsRemover()
.setStopWords(stopWordArray)
.setInputCol(segmenter.getOutputCol) //讀取"tokens"字段
.setOutputCol("removed") //輸出刪除停用詞後的字段"removed"
//刪除停用詞
val removedDF = remover.transform(segDF)
removedDF
}
複製代碼
因爲目前經常使用的分類、聚類等算法都是基於向量空間模型VSM(即將對象向量化爲一個N維向量,映射成N維超空間中的一個點),VSM將數據轉換爲向量形式,便於對大規模數據進行矩陣操做等,也能夠經過計算超空間中兩個點之間的距離(通常是餘弦距離)來計算兩個向量之間的類似度。所以,咱們須要將通過處理的語料轉換爲向量形式,這個過程叫作向量化。機器學習
這裏咱們也調用spark提供的向量化類CountVectorizer類進行向量化操做:
/** * 特徵向量化處理,包括詞彙表過濾 * @param data 輸入向量化的字段"removed" * @param params 配置參數 * @return 向量模型 */
def vectorize(data: DataFrame, params: PreprocessParam): CountVectorizerModel = {
//設置向量模型
val vectorizer = new CountVectorizer()
.setVocabSize(params.vocabSize)
.setInputCol("removed")
.setOutputCol("features")
val parentVecModel = vectorizer.fit(data)
//過濾停用詞中沒有的數字features
val numPattern = "[0-9]+".r
val vocabulary = parentVecModel.vocabulary.flatMap {
term => if (term.length == 1 || term.matches(numPattern.regex)) None else Some(term)
}
val vecModel = new CountVectorizerModel(Identifiable.randomUID("cntVec"), vocabulary)
.setInputCol("removed")
.setOutputCol("features")
vecModel
}
複製代碼
將字段"content"先進行分詞和去除停用詞獲得"removed",再將全部詞做爲特徵,進行特徵向量化獲得"features"字段:
在模型中能夠設置出現次數最多的前5000個詞做爲分類用的特徵,下圖5000後有兩個數組,第一個數值表示對應前5000個詞的第幾個詞,第二組表示對應第一組出現的詞在本條數據中的出現的次數,取出一條完整的數據看看:爲了方便每一個模型單獨訓練和預測,將預處理也做爲數據處理的模型進行訓練,保存和調用,方法以下:
/** * 訓練預處理模型 * @param filePath 數據路徑 * @param spark SparkSession * @return (預處理後的數據,索引模型,向量模型) * 數據包括字段: "label", "indexedLabel", "title", "time", "content", "tokens", "removed", "features" */
def train(filePath: String, spark: SparkSession): (DataFrame, StringIndexerModel, CountVectorizerModel) = {
val params = new PreprocessParam //預處理參數
val cleanDF = this.clean(filePath, spark) //讀取DF,清洗數據
val indexModel = this.indexrize(cleanDF) //調用索引模型
val indexDF = indexModel.transform(cleanDF) //標籤索引化
val segDF = this.segment(indexDF, params) //將內容字段分詞
val vecModel = this.vectorize(segDF, params) //調用向量模型
val trainDF = vecModel.transform(segDF) //內容分詞特徵向量化
this.saveModel(indexModel, vecModel, params) //保存模型
(trainDF, indexModel, vecModel)
}
/** * 擬合預處理模型 * @param filePath 數據路徑 * @param spark SparkSession * @return (預處理後的數據,索引模型,向量模型) */
def predict(filePath: String, spark: SparkSession): (DataFrame, StringIndexerModel, CountVectorizerModel) = {
val params = new PreprocessParam //預處理參數
val cleanDF = this.clean(filePath, spark) //讀取DF,清洗數據
val (indexModel, vecModel) = this.loadModel(params) //加載索引和向量模型
val indexDF = indexModel.transform(cleanDF) //標籤索引化
val segDF = this.segment(indexDF, params) //內容字段分詞
val predictDF = vecModel.transform(segDF) //內容分詞特徵向量化
(predictDF, indexModel, vecModel)
}
複製代碼
調用預處理模型,數據處理後的結果取出5條:
+-----+--------------------+-------------------+--------------------+------------+--------------------+--------------------+--------------------+
|label| title| time| content|indexedLabel| tokens| removed| features|
+-----+--------------------+-------------------+--------------------+------------+--------------------+--------------------+--------------------+
|財經 |西南乳業巨頭新但願北..|2016-06-27 10:46:00|京華時報訊(記者胡笑...| 1.0|[京華, 時報訊, 記者, 胡笑紅...|[京華, 時報訊, 記者, 胡笑紅...|(5000,[3,4,14,22,...|
|文化 |全國篆刻名家大做在大..|2016-06-02 21:53:00|內江6月2日電王爵陳 ...| 3.0|[內江, 6月, 日電, 王爵, ...|[內江, 6月, 日電, 王爵, ...|(5000,[0,8,10,13,...|
|文化 |世界海洋日進入8天倒...|2016-05-31 15:38:00|北京5月31日電,記者...| 3.0|[北京, 5月, 日電, 記者, ...|[北京, 5月, 日電, 記者, ...|(5000,[0,3,10,13,...|
|軍事 |英媒評中國徵兵放寬體..|2016-06-02 08:30:00|參考消息網英媒稱,隨..| 0.0|[參考消息, 隨着, 中國軍隊, ...|[參考消息, 中國軍隊, 放寬, ...|(5000,[0,5,12,14,...|
|財經 |2016年二十國集團峯會..|2016-06-25 18:52:00|新華社廈門6月25日記...| 1.0|[新華社, 廈門, 6月, 日電,...|[新華社, 廈門, 6月, 日電,...|(5000,[3,8,10,12,...|
+-----+--------------------+-------------------+--------------------+------------+--------------------+--------------------+--------------------+
only showing top 5 rows
複製代碼
本文選用了經常使用的4中多分類模型對文本數據進行訓練,利用了管道Pipeline + 網格搜索Gridsearch + 交叉驗證CrossValidator 進行參數調優,直接將參數調優放在了訓練模型裏,將獲得的最優模型保存。
樸素貝葉斯算法是基於貝葉斯定理與特徵條件獨立假設的分類方法。
條件機率
P(A|B)表示事件B已經發生的前提下,事件A發生的機率,叫作事件B發生下事件A的條件機率。其基本求解公式爲:
貝葉斯定理即是基於條件機率,經過P(A|B)來求P(B|A): 特徵條件獨立假設 樸素貝葉斯模型經常使用的模型主要有3個,多項式、伯努利和高斯模型:
平滑係數
超參數平滑係數α,做用是防止後驗機率爲0,當α = 1時,稱做Laplace平滑,當0 < α < 1時,稱做Lidstone平滑,α = 0時不作平滑。本文主要對平滑係數進行調參。
/** * NB模型訓練處理過程 * @param data 訓練數據集 * @return nbBestModel */
def train(data: DataFrame): NaiveBayesModel = {
val params = new ClassParam
//NB分類模型管道訓練調參
data.persist()
data.show(5)
//NB模型
val nbModel = new NaiveBayes()
.setModelType(params.nbModelType) //多項式模型或者伯努利模型
.setSmoothing(params.smoothing) //平滑係數
.setLabelCol("indexedLabel")
.setFeaturesCol("features")
//創建管道,模型只有一個 stages = 0
val pipeline = new Pipeline()
.setStages(Array(nbModel))
//創建網格搜索
val paramGrid = new ParamGridBuilder()
//.addGrid(nbModel.modelType, Array("multinomial", "bernoulli"))
//伯努利模型須要特徵爲01的數據
.addGrid(nbModel.smoothing, Array(0.01, 0.1, 0.2, 0.5))
.build()
//創建evaluator,必需要保證驗證的標籤列是向量化後的標籤
val evaluator = new BinaryClassificationEvaluator()
.setLabelCol("indexedLabel")
//創建一個交叉驗證的評估器,設置評估器的參數
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(2)
//運行交叉驗證評估器,獲得最佳參數集的模型
val cvModel = cv.fit(data)
//獲取最優邏輯迴歸模型
val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel]
val bestNBModel = bestModel.stages(0).asInstanceOf[NaiveBayesModel]
println("類的數量(標籤可使用的值): " + bestNBModel.numClasses)
println("模型所接受的特徵的數量: " + bestNBModel.numFeatures)
println("最優的modelType的值爲: "+ bestNBModel.explainParam(bestNBModel.modelType))
println("最優的smoothing的值爲: "+ bestNBModel.explainParam(bestNBModel.smoothing))
//更新最優樸素貝葉斯模型,並訓練數據
val nbBestModel = new NaiveBayes()
.setModelType(bestNBModel.getModelType) //多項式模型或者伯努利模型
.setSmoothing(bestNBModel.getSmoothing) //平滑係數
.setLabelCol("indexedLabel")
.setFeaturesCol("features")
.fit(data)
this.saveModel(nbBestModel, params)
data.unpersist()
nbBestModel
}
複製代碼
後續的三個算法原理網上都有不少,訓練的代碼也相似,本文只給出模型調參的部分代碼。
//LR模型
val lrModel = new LogisticRegression()
.setMaxIter(bestLRModel.getMaxIter) //模型最大迭代次數
.setRegParam(bestLRModel.getRegParam) //正則化參數
.setElasticNetParam(params.elasticNetParam) //L1範式比例, L1/(L1 + L2)
.setTol(params.converTol) //模型收斂閾值
.setLabelCol("indexedLabel") //設置索引化標籤字段
.setFeaturesCol("features") //設置向量化文本特徵字段
//創建網格搜索
val paramGrid = new ParamGridBuilder()
.addGrid(lrModel.maxIter, Array(5, 10))
.addGrid(lrModel.regParam, Array(0.1, 0.2))
.build()
複製代碼
//決策樹模型
val dtModel = new DecisionTreeClassifier()
.setMinInfoGain(params.minInfoGain) //最小信息增益閾值
.setMaxDepth(params.maxDepth) //決策樹最大深度
.setImpurity(params.impurity) //節點不純度和信息增益方法gini, entropy
.setLabelCol("indexedLabel") //設置索引化標籤字段
.setFeaturesCol("features") //設置向量化文本特徵字段
//創建網格搜索
val paramGrid = new ParamGridBuilder()
.addGrid(dtModel.minInfoGain, Array(0.0, 0.1))
.addGrid(dtModel.maxDepth, Array(10, 20))
.addGrid(dtModel.impurity, Array("gini", "entropy"))
.build()
複製代碼
隨機森林模型經常須要調試以提升算法效果的兩個參數:numTrees,maxDepth
實際上要想得到一個適當的閾值是至關困難的。高閾值可能致使過度簡化的樹,而低閾值可能簡化不夠。
預剪枝方法 minInfoGain、minInstancesPerNode 其實是經過不斷修改中止條件來獲得合理的結果,這並非一個好辦法,事實上 咱們經常甚至不知道要尋找什麼樣的結果。這樣就須要對樹進行後剪枝了(後剪枝不須要用戶指定參數,是更爲理想化的剪枝方法)
//隨機森林模型(不加fit)
val rfModel = new RandomForestClassifier()
.setMaxDepth(params.maxDepth) //決策樹最大深度
.setNumTrees(params.numTrees) //設置決策樹個數
.setMinInfoGain(params.minInfoGain) //最小信息增益閾值
.setImpurity(params.impurity) //信息增益的指標,選擇熵或者gini不純度
//.setMaxBins(params.maxBins) //最大分桶個數,用於連續特徵離散化時決定每一個節點如何分裂
.setLabelCol("indexedLabel") //設置索引化標籤字段
.setFeaturesCol("features") //設置向量化文本特徵字段
//創建網格搜索
val paramGrid = new ParamGridBuilder()
.addGrid(rfModel.maxDepth, Array(5, 10, 20))
.addGrid(rfModel.numTrees, Array(5, 10, 20))
.addGrid(rfModel.minInfoGain, Array(0.0, 0.1, 0.5))
.build()
複製代碼
機器學期通常都須要一個量化指標來衡量其效果:這個模型的準確率、召回率和F1值(這3個指標是評判模型預測能力經常使用的一組指標),spark提供了用於多分類模型評估的類MulticlassClassificationEvaluator,並將3個指標同時輸出
object Evaluations extends Serializable {
/** * 多分類結果評估 * @param data 分類結果 * @return (準確率, 召回率, F1) */
def multiClassEvaluate(data: RDD[(Double, Double)]): (Double, Double, Double) = {
val metrics = new MulticlassMetrics(data)
val weightedPrecision = metrics.weightedPrecision
val weightedRecall = metrics.weightedRecall
val f1 = metrics.weightedFMeasure
(weightedPrecision, weightedRecall, f1)
}
}
複製代碼
以邏輯迴歸爲例,預測結果以下圖,"probability"中4個值表示4個類別的預測機率:
4個分類模型的評估結果以下: 評估模型代碼:/** * Description: 多分類模型預測結果評估對比 * Created by wy in 2019/4/16 10:07 */
object MultiClassEvalution {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder
.master("local")
.appName("Multi_Class_Evaluation_Demo")
.getOrCreate()
val filePath = "data/dataTest/predict"
//預處理(清洗、分詞、向量化)
val preprocessor = new Preprocessor
val (predictDF, indexModel, _) = preprocessor.predict(filePath, spark)
predictDF.select("content","removed", "features").show(1, truncate = false)
//樸素貝葉斯模型預測
val nbClassifier = new NBClassifier
val nbPredictions = nbClassifier.predict(predictDF, indexModel)
//邏輯迴歸模型預測
val lrClassifier = new LRClassifier //import Classification.LogisticRegression.LRClassifier
val lrPredictions = lrClassifier.predict(predictDF, indexModel)
//決策樹模型預測
val dtClassifier = new DTClassifier
val dtPredictions = dtClassifier.predict(predictDF, indexModel)
//隨機森林模型預測
val rfClassifier = new RFClassifier
val rfPredictions = rfClassifier.predict(predictDF, indexModel)
//多個模型評估
val predictions = Seq(nbPredictions, lrPredictions, dtPredictions, rfPredictions)
val classNames = Seq("樸素貝葉斯模型", "邏輯迴歸模型", "決策樹模型", "隨機森林模型")
for (i <- 0 to 3) {
val prediction = predictions(i)
val className = classNames(i)
val resultRDD = prediction.select("prediction", "indexedLabel").rdd.map {
case Row(prediction: Double, label: Double) => (prediction, label)
}
val (precision, recall, f1) = Evaluations.multiClassEvaluate(resultRDD)
println(s"\n========= $className 評估結果 ==========")
println(s"加權準確率:$precision")
println(s"加權召回率:$recall")
println(s"F1值:$f1")
}
}
}
複製代碼