MLlib協同過濾ALS算法初探

ALS算法作協同過濾大體就是創建用戶商品矩陣,根據評分值以解數獨的形式解出來java

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark.mllib.recommendation.{ALS, Rating }
import org.apache.spark.{SparkContext, SparkConf}


/**
 * Created by hadoop on 2015/7/20.
 */
object MLlibCF {

  def main(args: Array[String]) {
    val time = new SimpleDateFormat("MMddHHmm").format(new Date())
    val sparkConf = new SparkConf().setAppName("MLlibCF-"+time)
    sparkConf.set("mapreduce.framework.name", "yarn")
    sparkConf.set("spark.rdd.compress", "true")//是否須要壓縮序列化的rdd分區,犧牲cpu時間提升空間利用率
    sparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")//配置序列化的接口
    sparkConf.set("spark.storage.memoryFraction", "0.2")
    sparkConf.set("spark.scheduler.mode", "FAIR")
    sparkConf.set("spark.ui.port", "4042")
    sparkConf.set("spark.akka.frameSize", "100")

    val sc = new SparkContext(sparkConf)

    val data = sc.textFile("hdfs://namenode:9000/data/test_in/mahout1.txt", 1)

    //對讀取的文件進行預處理,並放入Rating容器中
    val ratings = data.map(_.split(",") match{
      case(Array(user, product, rate))
        => Rating(user.toInt, product.toInt, rate.toDouble)
    })
    //須要求出的值
    val user1 = sc.parallelize(List("1,105","1,106","2,105","2,107","3,102")).map(
    _.split(",") match {
      case (Array(user, product))
      => (user.toInt, product.toInt)
    })

    val rank = 10
    val numIterations = 20
    //創建ALS模型
    val model = ALS.train(ratings, rank, numIterations, 0.01)
    
    //讀取須要的值
    val predictions = model.predict(user1).map{
      case Rating(user, product, rate)
        => ((user, product), rate)
    }


    predictions.saveAsTextFile("hdfs://10.207.0.217:9000/data/test_out/zk/MLlib-"+time)

  }
}
相關文章
相關標籤/搜索