Spark Streaming中空batches處理的兩種方法

Spark Streaming是近實時(near real time)的小批處理系統。對給定的時間間隔(interval),Spark Streaming生成新的batch並對它進行一些處理。每一個batch中的數據都表明一個RDD,可是若是一些batch中沒有數據會發生什麼事情呢?Spark Streaming將會產生EmptyRDD的RDD,它的定義以下:apache

package org.apache.spark.rdd

import scala.reflect.ClassTag

import org.apache.spark.{Partition, SparkContext, TaskContext}

/**
 * An RDD that has no partitions and no elements.
 */
private[spark] class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc, Nil) {

  override def getPartitions: Array[Partition] = Array.empty

  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
    throw new UnsupportedOperationException("empty RDD")
  }
}

能夠看到這個RDD並不對任何父RDD有依賴關係,咱們不能調用compute方法計算每一個分區的數據。EmptyRDD的存在是爲了保證Spark Streaming中多個batch的處理是一致的。可是存在EmptyRDD有時候會產生一些問題,好比:若是你想將接收到的Streaming數據寫入HDFS中:socket

val ssc = new StreamingContext(args(0),"iteblog",Seconds(10))
val socketStream = ssc.socketTextStream("www.iteblog.com",8888)
val outputDir = args(1)

socketStream.foreachRDD(rdd => {
  rdd.saveAsTextFile(outputDir)
})

當你調用foreachRDD的時候若是當前rdd是EmptyRDD,這樣會致使在HDFS上生成大量的空文件!這確定不是咱們想要的,咱們只想在存在數據的時候才寫HDFS,咱們能夠經過如下的兩種方法來避免這種狀況:ide

socketStream.foreachRDD(rdd => {
  if(rdd.count() != 0){
  	rdd.saveAsTextFile(outputDir)
  }
})

EmptyRDD的count確定是0,因此這樣能夠避免寫空文件,或者咱們也能夠用下面方法解決:函數

socketStream.foreachRDD(rdd => {
  if(!rdd.partitions.isEmpty){
  	rdd.saveAsTextFile(outputDir)
  }
})

EmptyRDD是沒有分區的,因此調用partitions.isEmpty是true。這樣也能夠解決上述問題。性能

後記:雖然上面兩種方法均可以解決這個問題,可是推薦使用第二種方法。由於第一種方法調用了RDD的count函數,這是一個Action,會觸發一次Job的計算,當你的數據量比較大的時候,這可能會帶來性能方面的一些影響;而partitions.isEmpty是不須要觸發Job的。
  不過若是你使用的是Sprk 1.3.0,你能夠調用isEmpty函數來判斷一個RDD是否爲空,這個函數是在SPARK-5270引入的。spa

轉載自過往記憶(http://www.iteblog.com/)scala

相關文章
相關標籤/搜索