5.4 RDD編程---綜合案例

1、求top值

任務描述:求出多個文件中數值的最大、最小值python

 

2、求最大最小值

任務描述:求出多個文件中數值的最大、最小值apache

解題思路:經過一我的造的key,讓全部的值都成爲「key」的value-list,而後對value-list進行遍歷,用兩個變量求出最大最小值。編程

代碼以下:oop

import org.apache.spark.{SparkConf, SparkContext}
object MaxAndMin {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(「MaxAndMin「).setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
    val lines = sc.textFile("hdfs://localhost:9000/user/hadoop/spark/chapter5", 2)


    val result = lines.filter(_.trim().length>0).map(line => ("key",line.trim.toInt)).groupByKey().map(x => {
      var min = Integer.MAX_VALUE
      var max = Integer.MIN_VALUE
      for(num <- x._2){
        if(num>max){
          max = num
        }
        if(num<min){
          min = num
        }
      }
      (max,min)
    }).collect.foreach(x => {
      println("max\t"+x._1)
      println("min\t"+x._2)
    })
    }
    }

3、文件排序

任務描述:有多個輸入文件,每一個文件中的每一行內容均爲一個整數。要求讀取全部文件中的整數,進行排序後,輸出到一個新的文件中,輸出的內容個數爲每行兩個整數,第一個整數爲第二個整數的排序位次,第二個整數爲原待排序的整數。測試

因爲輸入文件有多個,產生不一樣的分區,爲了生成序號,使用HashPartitioner將中間的RDD歸約到一塊兒this

代碼以下:spa

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
object FileSort {
    def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("FileSort")
        val sc = new SparkContext(conf)
        val dataFile = "file:///usr/local/spark/mycode/rdd/data"
        val lines = sc.textFile(dataFile,3)
        var index = 0
        val result = lines.filter(_.trim().length>0).map(n=>(n.trim.toInt,"")).partitionBy(new HashPartitioner(1)).sortByKey().map(t => {
      index += 1
            (index,t._1)
        })
        result.saveAsTextFile("file:///usrl/local/spark/mycode/rdd/examples/result")
    }
}

4、二次排序

任務要求:對於一個給定的文件(數據如file1.txt所示),請對數據進行排序,首先根據第1列數據降序排序,若是第1列數據相等,則根據第2列數據降序排序。scala

二次排序,具體的實現步驟:3d

  1. 按照Ordered(繼承排序的功能)和Serializable(繼承可序列化的功能)接口實現自定義排序的key
  2. 將要進行二次排序的文件加載進來生成<key,value>類型的RDD;
  3. 使用sortByKey基於自定義的Key進行二次排序;
  4. 去除掉排序的Key,只保留排序的結果

SecondarySortKey.scala代碼以下:code

package sparkDemo
class SecondarySortKey(val first:Int,val second:Int) extends Ordered [SecondarySortKey] with Serializable {
def compare(other:SecondarySortKey):Int = {
    if (this.first - other.first !=0) {
         this.first - other.first 
    } else {
      this.second - other.second
    }
  }
}

package cn.edu.xmu.spark
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object SecondarySortApp {
  def main(args:Array[String]){
     val conf = new SparkConf().setAppName("SecondarySortApp").setMaster("local")
       val sc = new SparkContext(conf)
       val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/examples/file1.txt", 1)
       val pairWithSortKey = lines.map(line=>(new SecondarySortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt),line))
       val sorted = pairWithSortKey.sortByKey(false)
       val sortedResult = sorted.map(sortedLine =>sortedLine._2)
       sortedResult.collect().foreach (println)
  }
}

5、鏈接操做

任務描述:在推薦領域有一個著名的開放測試集,下載連接,該測試集包含三個文件,分別是ratings.dat、sers.dat、movies.dat,具體介紹可閱讀:README.txt。請編程實現:經過鏈接ratings.dat和movies.dat兩個文件獲得平均得分超過4.0的電影列表,採用的數據集是:ml-1m

文件1:movies.dat(MovieID::Title::Genres

文件2:ratings.dat(UserID::MovieID::Rating::Timestamp

keyBy的key保持不變,value是把原來一整串的元素的值,整個做爲新的RDD元素的一個value。

代碼以下:

import org.apache.spark._ 
import SparkContext._ 
object SparkJoin { 
  def main(args: Array[String]) { 
    if (args.length != 3 ){ 
      println("usage is WordCount <rating> <movie> <output>")      
      return 
    } 
   val conf = new SparkConf().setAppName("SparkJoin").setMaster("local")
   val sc = new SparkContext(conf)  
   // Read rating from HDFS file 
   val textFile = sc.textFile(args(0)) 

//extract (movieid, rating) 
    val rating = textFile.map(line => { 
        val fileds = line.split("::") 
        (fileds(1).toInt, fileds(2).toDouble) 
       }) 
 //get (movieid,ave_rating) 
    val movieScores = rating 
       .groupByKey() 
       .map(data => { 
         val avg = data._2.sum / data._2.size 
         (data._1, avg) 
       }) 

// Read movie from HDFS file 
     val movies = sc.textFile(args(1)) 
     val movieskey = movies.map(line => { 
       val fileds = line.split("::") 
        (fileds(0).toInt, fileds(1))   //(MovieID,MovieName)
     }).keyBy(tup => tup._1) 
  
     // by join, we get <movie, averageRating, movieName> 
     val result = movieScores 
       .keyBy(tup => tup._1) 
       .join(movieskey) 
       .filter(f => f._2._1._2 > 4.0) 
       .map(f => (f._1, f._2._1._2, f._2._2._2)) 
  
    result.saveAsTextFile(args(2)) 
  } 
} 

  

 

參考文獻:

【1】Spark編程基礎_中國大學MOOC(慕課)

相關文章
相關標籤/搜索