Spark- 自定義排序

考察spark自定義排序apache

方式一:自定義一個類繼承Ordered和序列化,Driver端將數據變成RDD,整理數據轉成自定義類類型的RDD,使用自己排序便可。網絡

package com.rz.spark.base

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

// 自定義排序
object CustomSort1 {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
    val sc = new SparkContext(conf)

    // 排序規則:首先按照顏值的降序,若是產值相等,再按照年齡的升序
    val users = Array("xiaohong 30 50","xiaobai 90 50","xiaozhang 78 100", "xiaolong 66 66")

    // 將Driver端的數據並行化變成RDD
    val lines:RDD[String] = sc.parallelize(users)

    // 切分整理數據
    val userRDD: RDD[User] = lines.map(line => {
      val fields = line.split(" ")
      val name = fields(0)
      val age = fields(1).toInt
      val fv = fields(2).toInt
      //(name, age, fv)
      new User(name, age, fv)
    })

    // 不知足要求
    // tpRDD.sortBy(tp=> tp._3, false)

    // 將RDD裏面封裝在User類型的數據進行排序
    val sorted: RDD[User] = userRDD.sortBy(u=>u)
    val result = sorted.collect()
    println(result.toBuffer)


    sc.stop()
  }
}

// shuffle時數據要經過網絡傳輸,須要對數據進行序列化
class User(val name:String, val age:Int, val fv:Int) extends Ordered[User] with Serializable {
  override def compare(that: User): Int = {
    if (this.fv == that.fv){
      this.age - that.age
    }else{
      -(this.fv - that.fv)
    }
  }

  override def toString: String = s"name: $name, age: $age, fv: $fv"
}

方式2:自定義一個類繼承Ordered和序列化,Driver端將數據變成RDD,整理數據轉成元組類型的RDD,使用就自定義類作排序規則。ide

package com.rz.spark.base

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object CustomSort2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
    val sc = new SparkContext(conf)

    // 排序規則:首先按照顏值的降序,若是產值相等,再按照年齡的升序
    val users = Array("xiaohong 30 50","xiaobai 90 50","xiaozhang 66 50", "xiaolong 66 66")

    // 將Driver端的數據並行化變成RDD
    val lines:RDD[String] = sc.parallelize(users)

    // 切分整理數據
    val userRDD: RDD[(String, Int, Int)] = lines.map(line => {
      val fields = line.split(" ")
      val name = fields(0)
      val age = fields(1).toInt
      val fv = fields(2).toInt
      (name, age, fv)
    })

    // 排序(傳入了一個排序規則, 不會改變數據的格式,只會以改變順序)  class Boy不是多例
    val sorted: RDD[(String, Int, Int)] = userRDD.sortBy(tp=> new Boy(tp._2,tp._3))
    val result = sorted.collect()
    println(result.toBuffer)

    sc.stop()
  }
}

// shuffle時數據要經過網絡傳輸,須要對數據進行序列化
class Boy(val age:Int, val fv:Int) extends Ordered[Boy] with Serializable {
  override def compare(that: Boy): Int = {
    if (this.fv == that.fv){
      this.age - that.age
    }else{
      -(this.fv - that.fv)
    }
  }
}

方式3:做用多例的case class來作排序規則this

package com.rz.spark.base

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object CustomSort3 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
    val sc = new SparkContext(conf)

    // 排序規則:首先按照顏值的降序,若是產值相等,再按照年齡的升序
    val users = Array("xiaohong 30 50","xiaobai 90 50","xiaozhang 66 50", "xiaolong 66 66")

    // 將Driver端的數據並行化變成RDD
    val lines:RDD[String] = sc.parallelize(users)

    // 切分整理數據
    val userRDD: RDD[(String, Int, Int)] = lines.map(line => {
      val fields = line.split(" ")
      val name = fields(0)
      val age = fields(1).toInt
      val fv = fields(2).toInt
      (name, age, fv)
    })

    // 排序(傳入了一個排序規則, 不會改變數據的格式,只會以改變順序)
    val sorted: RDD[(String, Int, Int)] = userRDD.sortBy(tp=> Man(tp._2,tp._3))
    val result = sorted.collect()
    println(result.toBuffer)

    sc.stop()
  }
}

// shuffle時數據要經過網絡傳輸,須要對數據進行序列化
// case class 自己已經實現序列化且多例 (缺點是規則寫死,沒法用新的規則排序,可用隱式轉換實現)
case class Man(age:Int, fv:Int) extends Ordered[Man]{
  override def compare(that: Man): Int = {
    if (this.fv == that.fv){
      this.age - that.age
    }else{
      -(this.fv - that.fv)
    }
  }
}

方式4,經過隱式參數指定靈活的排序規則spa

package com.rz.spark.base

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object CustomSort4 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
    val sc = new SparkContext(conf)

    // 排序規則:首先按照顏值的降序,若是產值相等,再按照年齡的升序
    val users = Array("xiaohong 30 50","xiaobai 90 50","xiaozhang 66 50", "xiaolong 66 66")

    // 將Driver端的數據並行化變成RDD
    val lines:RDD[String] = sc.parallelize(users)

    // 切分整理數據
    val userRDD: RDD[(String, Int, Int)] = lines.map(line => {
      val fields = line.split(" ")
      val name = fields(0)
      val age = fields(1).toInt
      val fv = fields(2).toInt
      (name, age, fv)
    })

    // 排序(傳入了一個排序規則, 不會改變數據的格式,只會以改變順序)
    // 傳入一個Ordering類型的隱式參數
    import SortRules.OrderingHero
    val sorted: RDD[(String, Int, Int)] = userRDD.sortBy(tp=> Hero(tp._2,tp._3))
    val result = sorted.collect()
    println(result.toBuffer)

    sc.stop()
  }
}

// shuffle時數據要經過網絡傳輸,須要對數據進行序列化
// case class 自己已經實現序列化,不指定固定的排序規則,由隱式參數指定
case class Hero(age:Int, fv:Int)

方式5:元組有本身的compareTo方法,充分利用元組的比較規則,元組的比較規則:先比第一,相等再比第二個。若是還知足不了再自定義排序的類來排序code

package com.rz.spark.base

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object CustomSort5 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
    val sc = new SparkContext(conf)

    // 排序規則:首先按照顏值的降序,若是產值相等,再按照年齡的升序
    val users = Array("xiaohong 30 50","xiaobai 90 50","xiaozhang 66 50", "xiaolong 66 66")

    // 將Driver端的數據並行化變成RDD
    val lines:RDD[String] = sc.parallelize(users)

    // 切分整理數據
    val userRDD: RDD[(String, Int, Int)] = lines.map(line => {
      val fields = line.split(" ")
      val name = fields(0)
      val age = fields(1).toInt
      val fv = fields(2).toInt
      (name, age, fv)
    })

    // 排序(傳入了一個排序規則, 不會改變數據的格式,只會以改變順序)
    // 充分利用元組的比較規則,元組的比較規則:先比第一,相等再比第二個
    val sorted: RDD[(String, Int, Int)] = userRDD.sortBy(tp=> (-tp._3,tp._2))
    val result = sorted.collect()
    println(result.toBuffer)

    sc.stop()
  }
}

方式6:和方式5類似,可是用到自定義的隱式參數做排序規則blog

package com.rz.spark.base

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object CustomSort6 {  
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
    val sc = new SparkContext(conf)

    // 排序規則:首先按照顏值的降序,若是產值相等,再按照年齡的升序
    val users = Array("xiaohong 30 50","xiaobai 90 50","xiaozhang 66 50", "xiaolong 66 66")

    // 將Driver端的數據並行化變成RDD
    val lines:RDD[String] = sc.parallelize(users)

    // 切分整理數據
    val userRDD: RDD[(String, Int, Int)] = lines.map(line => {
      val fields = line.split(" ")
      val name = fields(0)
      val age = fields(1).toInt
      val fv = fields(2).toInt
      (name, age, fv)
    })

    // 排序(傳入了一個排序規則, 不會改變數據的格式,只會以改變順序)
    // 充分利用元組的比較規則,元組的比較規則:先比第一,相等再比第二個
    val sorted: RDD[(String, Int, Int)] = userRDD.sortBy(tp=> (-tp._3,tp._2))
    val result = sorted.collect()
    println(result.toBuffer)

    sc.stop()
  }
}
相關文章
相關標籤/搜索