Spark Week1 HomeWork

package wikipedia

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.log4j.{Level,Logger}


case class WikipediaArticle(title: String, text: String) {
    /**
      * @return Whether the text of this article mentions `lang` or not
      * @param lang Language to look for (e.g. "Scala")
      */
    def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)
}

object WikipediaRanking {
    //  設置日誌
    Logger.getLogger("org").setLevel(Level.ERROR)

    val langs = List(
        "JavaScript", "Java", "PHP", "Python", "C#", "C++", "Ruby", "CSS",
        "Objective-C", "Perl", "Scala", "Haskell", "MATLAB", "Clojure", "Groovy")

    val conf: SparkConf = new SparkConf()
    val sc: SparkContext = new SparkContext("local[*]", "Wikipedia")

    // Hint: use a combination of `sc.textFile`, `WikipediaData.filePath` and `WikipediaData.parse`
    val wikiRdd: RDD[WikipediaArticle] = sc.textFile(WikipediaData.filePath).map(WikipediaData.parse)

    /** Returns the number of articles on which the language `lang` occurs. 返回lang語言出現的文章篇數
      *  Hint1: consider using method `aggregate` on RDD[T].
      *  Hint2: consider using method `mentionsLanguage` on `WikipediaArticle`
      */
    def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int =
        rdd.filter(_.mentionsLanguage(lang)).count().toInt

    /* (1) Use `occurrencesOfLang` to compute the ranking of the languages
     *     (`val langs`) by determining the number of Wikipedia articles that
     *     mention each language at least once. Don't forget to sort the
     *     languages by their occurrence, in decreasing order!
     *
     *   Note: this operation is long-running. It can potentially run for
     *   several seconds.
     */
    def rankLangs(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = {
        rdd.cache()     // 容許數據存儲在內存
        langs.map(lang => (lang, occurrencesOfLang(lang, rdd))).sortBy(_._2).reverse
        /*
        對於langs的每個元素找到包含它的文章篇數。
        其中sortBy(_._2)指根據occurrencesOfLang(lang, rdd))來排序,
        若是是sortBy(_._1)則根據lang來排序
        默認從小到大排序,因此加上.reverse
        */
    }


    /* Compute an inverted index of the set of articles, mapping each language
     * to the Wikipedia pages in which it occurs.
     */
    def makeIndex(langs: List[String], rdd: RDD[WikipediaArticle]): RDD[(String, Iterable[WikipediaArticle])] = {
        val articles_Languages = rdd.flatMap(article => {
            langs.filter(lang => article.mentionsLanguage(lang))
                                        .map(lang => (lang, article))
        })
        articles_Languages.groupByKey
    }

    /* (2) Compute the language ranking again, but now using the inverted index. Can you notice
     *     a performance improvement?
     *
     *   Note: this operation is long-running. It can potentially run for
     *   several seconds.
     */
    def rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])]): List[(String, Int)] =
        index.mapValues(_.size).sortBy(-_._2).collect().toList

    /* (3) Use `reduceByKey` so that the computation of the index and the ranking are combined.
     *     Can you notice an improvement in performance compared to measuring *both* the computation of the index
     *     and the computation of the ranking? If so, can you think of a reason?
     *
     *   Note: this operation is long-running. It can potentially run for
     *   several seconds.
     */
    def rankLangsReduceByKey(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = {
        rdd.flatMap(article => {
            langs.filter(article.mentionsLanguage) // 至關於langs.filter(lang => article.mentionsLanguage(lang)) 或者 langs.filter(article.mentionsLanguage(_))
                .map((_, 1))
        }).reduceByKey(_ + _)
            .sortBy(_._2)
            .collect()
            .toList
            .reverse
    }

    def main(args: Array[String]) {

        /* Languages ranked according to (1) */
        val langsRanked: List[(String, Int)] = timed("Part 1: naive ranking", rankLangs(langs, wikiRdd))

        /* An inverted index mapping languages to wikipedia pages on which they appear */
        def index: RDD[(String, Iterable[WikipediaArticle])] = makeIndex(langs, wikiRdd)

        /* Languages ranked according to (2), using the inverted index */
        val langsRanked2: List[(String, Int)] = timed("Part 2: ranking using inverted index", rankLangsUsingIndex(index))

        /* Languages ranked according to (3) */
        val langsRanked3: List[(String, Int)] = timed("Part 3: ranking using reduceByKey", rankLangsReduceByKey(langs, wikiRdd))

        /* Output the speed of each ranking */
        println(timing)
        sc.stop()
    }

    val timing = new StringBuffer
    def timed[T](label: String, code: => T): T = {
        val start = System.currentTimeMillis()
        val result = code
        val stop = System.currentTimeMillis()
        timing.append(s"Processing $label took ${stop - start} ms.\n")
        result
    }
}
相關文章
相關標籤/搜索