spark-shell 中rdd經常使用方法

centos 7.2     spark 2.3.3      scala 2.11.11    java 1.8.0_202-eajava

spark-shell中爲scala語法格式es6

 

1.distinct 去重shell

val c = sc.parallerlize(List("Gnu","Cat","Rat","Dog","Gnu","Rat"),2)      初始化rdd,將數據均勻加載到2個partition中apache

c.distinct.collectcentos

>>res1: Array[String]=Array(Dog,Gnu,Cat,Rat)app

 

2.  c.fisrt                                                                                                 ide

first取RDD第一個Partition中的第一個記錄函數

>>res2:String = Gnu es5

 

3.filter  過濾spa

val a = sc.parallelize(1 to 10,3)

val b = a.filter(_ % 2 ==0)

b.collect

>>res3:Array[Int] = Array(2,4,6,8,10)

 

4.filterByRange          返回指定範圍內RDD記錄,只能做用於排序RDD

val randRDD = sc.parallelize(List((2,"cat"),(6,"mouse"),(7,"cup),(3,"book"),(4,"tv"),(1,"screen"),(5,"heater")),3)

val sortedRDD = randRDD.sortByKey()

sortRDD.filterByRange(1,3).collect

>>res4:Array[(Int,String)] = Array((1,screen),(2,cat),(3,book))

 

5.foreach                    遍歷RDD內每一個記錄

val c = sc.parallelize(List("cat","dog","tiger","lion","gnu"),3)

c.foreach(x => println(x + "is ym"))

>>lion is ym

gnu is ym

cat is ym

tiger is ym

dog is ym

 

6.foreachPartition        遍歷RDD內每個Partition(每一個Partition對應一個值)

val b = sc.parallelize(List(1,2,3,4,5,6,7,8),3)

b.foreachPartition(x => println(x.reduce(_ + _ )))

>> 6

15

15

 

7.fullOuterJoin

rdd1.fullOuterJoin[rdd2]         對兩個PairRDD進行外鏈接 ,相同的key值的所有value組合,沒有相同key的也保留,值用None填充

val pairRDD1 = sc.parallelize(List(("cat",2),("cat",5),("book",40)))

val pairRDD2 = sc.parallelize(List(("cat",2),("cup",5),("book",40)))

pairRDD1.fullOuterJoin(pairRDD2).collect

>>res5: Array[(String,(Option[Int],Option[Int]))] = Array((book,(Some(40),Some(40))),  (cup,(None,Some(5))),  (cat,(Some(2),Some(2))),  (cat,(Some(5),Some(2)))

 

8.groupBy   根據給定的規則 來分組

val a = sc.parallelize(1 to 9,3)

a.groupBy(x => {if (x % 2 == 0) "even" else "odd" }).collect

>> res6:Array[(String,Seq[Int])] = Array((even,ArrayBuffer(2,4,6,8)),(odd,ArrayBuffer(1,3,5,7,9)))

 

groupBy中使用的方法函數寫法還可寫做:

def myfunc(a:Int):Int = 

{

a % 2

}

a.groupBy(myfunc).collect

def myfunc(a:Int):Int=

{

a % 2

}

a.groupBy(x => myfunc(x),3).collect

a.groupBy(myfunc(_),1).collect

 

例  將groupBy的條件設置爲 partition ,同時自定義數據分區的規則

 

package sometest import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object SparkApplication{
  def main(args:Array[String]){
    val conf = new SparkConf()
    val sc = new SparkContext(conf).setAppName("GroupPartition").setMaster("spark://master:7077")
    val a = sc.parallelize(1 to 9 , 3)
    val p = new MyPartitioner()
    val b = a.groupBy((x:Int) => {x},p) //這裏按照自定義分區規則P從新分區,而後groupBy
   // b的形式爲RDD[(Int,Iterable[Int])] 好比說 (1,CompactBuffer(1))

    def myfunc(index:Int,iter:Iterator[(Int,Iterable[Int])]): Iterator[(Int,(Iterable[Int],Int))] = {
      iter.map(a => (index,(a._2,a._1))) //a._2這種寫法表示a中的第2個元素
    }
    val c = b.mapPartitionsWithIndex(myfunc)
    println("This is Result for My :")
    c.collect().foreach(println)
}



自定義分區規則
package sometest
import org.apache.spark.Partitioner

/**
*自定義數據分區規則
**/
class MyPartitioner extends Partitioner{
  def numPartitions:Int = 2 //設置分區數
  def getPartition(key:Any):Int =
  {
    val code = key match
      {
        case null => 0
        case key:Int => key % numPartitions //取餘
        case _ => key.hashCode % numPartitions
      }
    if(code < 0 ){ // 對 hashCode爲負數的結果進行處理
            code + numPartitions  
            }
    else{
        code
      }
  }
  override def equals(other:Any):Boolean = // java標準的判斷相等的函數, Spark內部比較兩個RDD的分區是否同樣時 會用到這個這個函數
  {
    other match
    {
      case h:MyPartitioner => h.numPartitions == numPartitions
      case _ => false
    }
  }
}

 

打包成sparkAction.jar後 使用命令執行  spark-submit  --class sometest.SparkApplication  ~/sparkAction.jar

輸出結果爲:

This is Result for My :
(0,(CompactBuffer(4),4))
( 0,( CompactBuffer(6),6))
( 0,( CompactBuffer(8),8))
( 0,( CompactBuffer(2),2))
( 0,( CompactBuffer(1),1))
( 0,( CompactBuffer(3),3))
( 0,( CompactBuffer(7),7))
( 0,( CompactBuffer(9),9))
( 0,( CompactBuffer(5),5))

 
9.groupByKey [Pair]
相似於groupBy ,不過函數做用於key,而groupBy的函數是做用於每一個數據的
val a = sc.parallelize(List("dog","tiger","lion","cat","spider","eagle"),2)
val b = a.keyBy(_.length)
b.groupByKey.collect

輸出res11:Array[(Int,Iterable[String])] = Array((4,CompactBuffer(lion)),(6,CompactBuffer(spider)),(3,CompactBuffer(dog,cat)),(5,CompactBuffer(tiger,eagle)))





10 .histogram[Double] 計算數據直方圖 (數值數據分佈的精確圖形表示)

計算給定數據中的最大值和最小值 ,而後將這個範圍段平均分紅n組,統計給定數據中每組的頻數
通常來講,範圍段爲橫軸 ,各組的統計個數爲縱座標

val a = sc.parallelize(List(1.1,1.2,1.3,2.0,2.1,7.4,7.5,7.6,8.8,9.0),3)
a.histogram(5) //將樣本數據分紅 5 組
res11: (Array[Double],Array[Long]) = (Array(1.1,2.68,4.26,5.84,7.42,9.0),Array(5,0,0,1,4))



11 .intersection 返回兩個RDD的交集(內鏈接)
val x=sc.parallelize(1 to 20)
val y =sc.parallelize(10 to 30)
val z = x.intersection(y)
z.collect
res74: Array[Int] = Array(16,17,18,10,19,11,20,12,13,14,15)

內鏈接
val a = sc.parallelize(List("dog","salmon","salmon","rat","elephant"),3)
val b = a.keyBy(_.length) //Array[(Int,String)]=Array((3,dog),(3,rat),(6,salmon),(6(salmon),(8,elephant))
val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf",bear","bee"),3)

val d = c.keyBy(_.length)
b.join(d).collect
輸出 res0: Array[(Int,(String,String))] = Array((6,(salmon,salmon)), (6,(salmon,rabbit)),(6,(salmon.turkey)), (6,(salmon,salmon)),
(6,(salmon,rabbit)), (6,(salmon,turkey)), (3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)) ,(3,(dog,bee)), (3,(rat,dog)),(3,(rat,cat)), (3,(rat,gnu)), (,(rat,bee)))


12 .keys[Pair] 返回 key,value列表中的全部key

val a = sc.parallelize(List((3,"dog"),(5,"tiger"),(4,"lion"),(3,"cat"),(7,"panther"),(5,"eagle")),2)
a.keys.collect
res2: Array[Int] = Array(3,5,4,3,7,5)


13 . lookup 查找指定記錄
val a = sc.parallelize(List((3,"dog"),(5,"tiger"),(4,"lion"),(3,"cat"),,(7,"panther"),(5,"eagle")),2)
a.lookup(5)
res8: Seq[String] = WrappedArray(tiger,eagle)

14 .max 返回最大值
借用上述的a
a.max
res9: (Int,String) = (7,panther)

val y =sc.parallelize(10 to 30)
y.max
res10: Int = 30
15 . mean 平均值
y.mean
res13: Double = 20.0


16 . persist,cache 設置RDD的存儲級別
val c = sc.parallelize(List("Gnu","Cat","Rat","Dog","Gnu","Rat"),2)
c.getStorageLevel
res14: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)
c.cache
res15: c.type = ParallelCollectionRDD[41] at parallelize at <console>:24
c.getStorageLevel
res16:org.apache.spark.storage.StorageLevel = StorageLevel(memory, deserialized, 1 replicas)


17 . sample 根據給定比例對數據進行採樣
sample(withReplacement, fraction, seed)withReplacement : 是否使用隨機數替換fraction : 對數據進行採樣的比例seed : 隨機數生成器種子val a = sc.parallelize(1 to 10000,3)a.sample(false,0.1,0).countres17:Long = 1032a.sample(true,0.3,0).countres18: Long = 3110 a.sample(true,0.3,13).countres20 : Long = 295218 .saveAsTextFile保存到文本數據 (默認 文件系統是hdfs)textFile讀取文本數據val a = sc.parallelize(11 to 19,3)a.saveAsTextFile("test/tf") //其實是保存到文件夾 test/tf ,因爲並行化因子爲3,一個Partition對應一個par-000xval b = sc.textFile("test/tf") b.collectres4: Array[String] = Array(11,12,13,14,15,16,17,18,19)19 .take 返回數據集中的前N個數據val b = sc.parallelize(List("dog","cat","ape","salmon","gnu"),2)b.take(2)res5: Array[String] = Array(dog,cat)20 .union,++ 對兩個RDD數據進行並集 ,合併兩個RDDval a = sc.parallelize( 1 to 5,1)val b = sc.parallelize(5 to 7,1)(a++b).collectArray[Int] = Array(1,2,3,4,5,5,6,7)
相關文章
相關標籤/搜索