1、準備數據apache
數據格式爲:documents: RDD[(Long, Vector)],其中:Long爲文章ID,Vector爲文章分詞後的詞向量;json
經過分詞以及數據格式的轉換,轉換成RDD[(Long, Vector)]便可。分佈式
2、創建模型ide
import org.apache.spark.mllib.clustering._ val ldaOptimizer = new OnlineLDAOptimizer().setOptimizeDocConcentration(true) val lda = new LDA() lda.setK(params.k) .setMaxIterations(params.maxIterations) .setDocConcentration(params.docConcentration) .setTopicConcentration(params.topicConcentration) .setOptimizer(ldaOptimizer) .setCheckpointInterval(10) .setSeed(1234) val modelLDA: LDAModel = lda.run(corpus) modelLDA.save(sc.sparkContext, params.modelPath)
3、模型參數spa
case class NewsParams( k: Int = 100, maxIterations: Int = 100, docConcentration: Double = -1, topicConcentration: Double = -1, stopWordFile: String = "zh_stopwords.txt", modelPath: String = "LDAModel.14.100", ldaJsonPath:String = "ldaModel.14.200.json", vocabPath: String = "vocab_info" )
4、結果輸出scala
topicsMatrix以及topics(word,topic))輸出。mllib上的lda不是分佈式的,目前只存儲topic的信息,而不存儲doc的信息,若是獲取只能使用ml中的lda或者經過如下代碼實現。code
val ldaModel = lda.run(documents) val distLDAModel = ldaModel.asInstanceOf[DistributedLDAModel] distLDAModel.topicDistributions