package wikipedia import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.log4j.{Level,Logger} case class WikipediaArticle(title: String, text: String) { /** * @return Whether the text of this article mentions `lang` or not * @param lang Language to look for (e.g. "Scala") */ def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang) } object WikipediaRanking { // 設置日誌 Logger.getLogger("org").setLevel(Level.ERROR) val langs = List( "JavaScript", "Java", "PHP", "Python", "C#", "C++", "Ruby", "CSS", "Objective-C", "Perl", "Scala", "Haskell", "MATLAB", "Clojure", "Groovy") val conf: SparkConf = new SparkConf() val sc: SparkContext = new SparkContext("local[*]", "Wikipedia") // Hint: use a combination of `sc.textFile`, `WikipediaData.filePath` and `WikipediaData.parse` val wikiRdd: RDD[WikipediaArticle] = sc.textFile(WikipediaData.filePath).map(WikipediaData.parse) /** Returns the number of articles on which the language `lang` occurs. 返回lang語言出現的文章篇數 * Hint1: consider using method `aggregate` on RDD[T]. * Hint2: consider using method `mentionsLanguage` on `WikipediaArticle` */ def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int = rdd.filter(_.mentionsLanguage(lang)).count().toInt /* (1) Use `occurrencesOfLang` to compute the ranking of the languages * (`val langs`) by determining the number of Wikipedia articles that * mention each language at least once. Don't forget to sort the * languages by their occurrence, in decreasing order! * * Note: this operation is long-running. It can potentially run for * several seconds. */ def rankLangs(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = { rdd.cache() // 容許數據存儲在內存 langs.map(lang => (lang, occurrencesOfLang(lang, rdd))).sortBy(_._2).reverse /* 對於langs的每個元素找到包含它的文章篇數。 其中sortBy(_._2)指根據occurrencesOfLang(lang, rdd))來排序, 若是是sortBy(_._1)則根據lang來排序 默認從小到大排序,因此加上.reverse */ } /* Compute an inverted index of the set of articles, mapping each language * to the Wikipedia pages in which it occurs. */ def makeIndex(langs: List[String], rdd: RDD[WikipediaArticle]): RDD[(String, Iterable[WikipediaArticle])] = { val articles_Languages = rdd.flatMap(article => { langs.filter(lang => article.mentionsLanguage(lang)) .map(lang => (lang, article)) }) articles_Languages.groupByKey } /* (2) Compute the language ranking again, but now using the inverted index. Can you notice * a performance improvement? * * Note: this operation is long-running. It can potentially run for * several seconds. */ def rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])]): List[(String, Int)] = index.mapValues(_.size).sortBy(-_._2).collect().toList /* (3) Use `reduceByKey` so that the computation of the index and the ranking are combined. * Can you notice an improvement in performance compared to measuring *both* the computation of the index * and the computation of the ranking? If so, can you think of a reason? * * Note: this operation is long-running. It can potentially run for * several seconds. */ def rankLangsReduceByKey(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = { rdd.flatMap(article => { langs.filter(article.mentionsLanguage) // 至關於langs.filter(lang => article.mentionsLanguage(lang)) 或者 langs.filter(article.mentionsLanguage(_)) .map((_, 1)) }).reduceByKey(_ + _) .sortBy(_._2) .collect() .toList .reverse } def main(args: Array[String]) { /* Languages ranked according to (1) */ val langsRanked: List[(String, Int)] = timed("Part 1: naive ranking", rankLangs(langs, wikiRdd)) /* An inverted index mapping languages to wikipedia pages on which they appear */ def index: RDD[(String, Iterable[WikipediaArticle])] = makeIndex(langs, wikiRdd) /* Languages ranked according to (2), using the inverted index */ val langsRanked2: List[(String, Int)] = timed("Part 2: ranking using inverted index", rankLangsUsingIndex(index)) /* Languages ranked according to (3) */ val langsRanked3: List[(String, Int)] = timed("Part 3: ranking using reduceByKey", rankLangsReduceByKey(langs, wikiRdd)) /* Output the speed of each ranking */ println(timing) sc.stop() } val timing = new StringBuffer def timed[T](label: String, code: => T): T = { val start = System.currentTimeMillis() val result = code val stop = System.currentTimeMillis() timing.append(s"Processing $label took ${stop - start} ms.\n") result } }