Spark Streaming是近實時(near real time)的小批處理系統。對給定的時間間隔(interval),Spark Streaming生成新的batch並對它進行一些處理。每一個batch中的數據都表明一個RDD,可是若是一些batch中沒有數據會發生什麼事情呢?Spark Streaming將會產生EmptyRDD的RDD,它的定義以下:apache
package org.apache.spark.rdd import scala.reflect.ClassTag import org.apache.spark.{Partition, SparkContext, TaskContext} /** * An RDD that has no partitions and no elements. */ private[spark] class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc, Nil) { override def getPartitions: Array[Partition] = Array.empty override def compute(split: Partition, context: TaskContext): Iterator[T] = { throw new UnsupportedOperationException("empty RDD") } }
能夠看到這個RDD並不對任何父RDD有依賴關係,咱們不能調用compute方法計算每一個分區的數據。EmptyRDD的存在是爲了保證Spark Streaming中多個batch的處理是一致的。可是存在EmptyRDD有時候會產生一些問題,好比:若是你想將接收到的Streaming數據寫入HDFS中:socket
val ssc = new StreamingContext(args(0),"iteblog",Seconds(10)) val socketStream = ssc.socketTextStream("www.iteblog.com",8888) val outputDir = args(1) socketStream.foreachRDD(rdd => { rdd.saveAsTextFile(outputDir) })
當你調用foreachRDD的時候若是當前rdd是EmptyRDD,這樣會致使在HDFS上生成大量的空文件!這確定不是咱們想要的,咱們只想在存在數據的時候才寫HDFS,咱們能夠經過如下的兩種方法來避免這種狀況:ide
socketStream.foreachRDD(rdd => { if(rdd.count() != 0){ rdd.saveAsTextFile(outputDir) } })
EmptyRDD的count確定是0,因此這樣能夠避免寫空文件,或者咱們也能夠用下面方法解決:函數
socketStream.foreachRDD(rdd => { if(!rdd.partitions.isEmpty){ rdd.saveAsTextFile(outputDir) } })
EmptyRDD是沒有分區的,因此調用partitions.isEmpty
是true。這樣也能夠解決上述問題。性能
後記:雖然上面兩種方法均可以解決這個問題,可是推薦使用第二種方法。由於第一種方法調用了RDD的count函數,這是一個Action,會觸發一次Job的計算,當你的數據量比較大的時候,這可能會帶來性能方面的一些影響;而partitions.isEmpty是不須要觸發Job的。
不過若是你使用的是Sprk 1.3.0,你能夠調用isEmpty函數來判斷一個RDD是否爲空,這個函數是在SPARK-5270引入的。spa
轉載自過往記憶(http://www.iteblog.com/)scala