任務描述:求出多個文件中數值的最大、最小值python
任務描述:求出多個文件中數值的最大、最小值apache
解題思路:經過一我的造的key,讓全部的值都成爲「key」的value-list,而後對value-list進行遍歷,用兩個變量求出最大最小值。編程
代碼以下:oop
import org.apache.spark.{SparkConf, SparkContext} object MaxAndMin { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(「MaxAndMin「).setMaster("local") val sc = new SparkContext(conf) sc.setLogLevel("ERROR") val lines = sc.textFile("hdfs://localhost:9000/user/hadoop/spark/chapter5", 2) val result = lines.filter(_.trim().length>0).map(line => ("key",line.trim.toInt)).groupByKey().map(x => { var min = Integer.MAX_VALUE var max = Integer.MIN_VALUE for(num <- x._2){ if(num>max){ max = num } if(num<min){ min = num } } (max,min) }).collect.foreach(x => { println("max\t"+x._1) println("min\t"+x._2) }) } }
任務描述:有多個輸入文件,每一個文件中的每一行內容均爲一個整數。要求讀取全部文件中的整數,進行排序後,輸出到一個新的文件中,輸出的內容個數爲每行兩個整數,第一個整數爲第二個整數的排序位次,第二個整數爲原待排序的整數。測試
因爲輸入文件有多個,產生不一樣的分區,爲了生成序號,使用HashPartitioner將中間的RDD歸約到一塊兒this
代碼以下:spa
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.HashPartitioner object FileSort { def main(args: Array[String]) { val conf = new SparkConf().setAppName("FileSort") val sc = new SparkContext(conf) val dataFile = "file:///usr/local/spark/mycode/rdd/data" val lines = sc.textFile(dataFile,3) var index = 0 val result = lines.filter(_.trim().length>0).map(n=>(n.trim.toInt,"")).partitionBy(new HashPartitioner(1)).sortByKey().map(t => { index += 1 (index,t._1) }) result.saveAsTextFile("file:///usrl/local/spark/mycode/rdd/examples/result") } }
任務要求:對於一個給定的文件(數據如file1.txt所示),請對數據進行排序,首先根據第1列數據降序排序,若是第1列數據相等,則根據第2列數據降序排序。scala
二次排序,具體的實現步驟:3d
SecondarySortKey.scala代碼以下:code
package sparkDemo class SecondarySortKey(val first:Int,val second:Int) extends Ordered [SecondarySortKey] with Serializable { def compare(other:SecondarySortKey):Int = { if (this.first - other.first !=0) { this.first - other.first } else { this.second - other.second } } } package cn.edu.xmu.spark import org.apache.spark.SparkConf import org.apache.spark.SparkContext object SecondarySortApp { def main(args:Array[String]){ val conf = new SparkConf().setAppName("SecondarySortApp").setMaster("local") val sc = new SparkContext(conf) val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/examples/file1.txt", 1) val pairWithSortKey = lines.map(line=>(new SecondarySortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt),line)) val sorted = pairWithSortKey.sortByKey(false) val sortedResult = sorted.map(sortedLine =>sortedLine._2) sortedResult.collect().foreach (println) } }
任務描述:在推薦領域有一個著名的開放測試集,下載連接,該測試集包含三個文件,分別是ratings.dat、sers.dat、movies.dat,具體介紹可閱讀:README.txt。請編程實現:經過鏈接ratings.dat和movies.dat兩個文件獲得平均得分超過4.0的電影列表,採用的數據集是:ml-1m
文件1:movies.dat(MovieID::Title::Genres)
文件2:ratings.dat(UserID::MovieID::Rating::Timestamp)
keyBy的key保持不變,value是把原來一整串的元素的值,整個做爲新的RDD元素的一個value。
代碼以下:
import org.apache.spark._ import SparkContext._ object SparkJoin { def main(args: Array[String]) { if (args.length != 3 ){ println("usage is WordCount <rating> <movie> <output>") return } val conf = new SparkConf().setAppName("SparkJoin").setMaster("local") val sc = new SparkContext(conf) // Read rating from HDFS file val textFile = sc.textFile(args(0)) //extract (movieid, rating) val rating = textFile.map(line => { val fileds = line.split("::") (fileds(1).toInt, fileds(2).toDouble) }) //get (movieid,ave_rating) val movieScores = rating .groupByKey() .map(data => { val avg = data._2.sum / data._2.size (data._1, avg) }) // Read movie from HDFS file val movies = sc.textFile(args(1)) val movieskey = movies.map(line => { val fileds = line.split("::") (fileds(0).toInt, fileds(1)) //(MovieID,MovieName) }).keyBy(tup => tup._1) // by join, we get <movie, averageRating, movieName> val result = movieScores .keyBy(tup => tup._1) .join(movieskey) .filter(f => f._2._1._2 > 4.0) .map(f => (f._1, f._2._1._2, f._2._2._2)) result.saveAsTextFile(args(2)) } }
參考文獻: