理解Spark的RDD

RDD是個抽象類,定義了諸如map()、reduce()等方法,但實際上繼承RDD的派生類通常只要實現兩個方法:html

  • def getPartitions: Array[Partition]
  • def compute(thePart: Partition, context: TaskContext): NextIterator[T]

getPartitions()用來告知怎麼將input分片;
java

compute()用來輸出每一個Partition的全部行(行是我給出的一種不許確的說法,應該是被函數處理的一個單元);mysql

以一個hdfs文件HadoopRDD爲例:sql

  override def getPartitions: Array[Partition] = {
    val jobConf = getJobConf()
    // add the credentials here as this can be called before SparkContext initialized
    SparkHadoopUtil.get.addCredentials(jobConf)
    val inputFormat = getInputFormat(jobConf)
    if (inputFormat.isInstanceOf[Configurable]) {
      inputFormat.asInstanceOf[Configurable].setConf(jobConf)
    }
    val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
    val array = new Array[Partition](inputSplits.size)
    for (i <- 0 until inputSplits.size) {
      array(i) = new HadoopPartition(id, i, inputSplits(i))
    }
    array
  }

它直接將各個split包裝成RDD了,再看compute():

  override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
    val iter = new NextIterator[(K, V)] {

      val split = theSplit.asInstanceOf[HadoopPartition]
      logInfo("Input split: " + split.inputSplit)
      var reader: RecordReader[K, V] = null
      val jobConf = getJobConf()
      val inputFormat = getInputFormat(jobConf)
      HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
        context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
      reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)

      // Register an on-task-completion callback to close the input stream.
      context.addTaskCompletionListener{ context => closeIfNeeded() }
      val key: K = reader.createKey()
      val value: V = reader.createValue()

      // Set the task input metrics.
      val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
      try {
        /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't
         * always at record boundaries, so tasks may need to read into other splits to complete
         * a record. */
        inputMetrics.bytesRead = split.inputSplit.value.getLength()
      } catch {
        case e: java.io.IOException =>
          logWarning("Unable to get input size to set InputMetrics for task", e)
      }
      context.taskMetrics.inputMetrics = Some(inputMetrics)

      override def getNext() = {
        try {
          finished = !reader.next(key, value)
        } catch {
          case eof: EOFException =>
            finished = true
        }
        (key, value)
      }

      override def close() {
        try {
          reader.close()
        } catch {
          case e: Exception => logWarning("Exception in RecordReader.close()", e)
        }
      }
    }
    new InterruptibleIterator[(K, V)](context, iter)
  }

它調用reader返回一系列的K,V鍵值對。

再來看看數據庫的JdbcRDD:數據庫

  override def getPartitions: Array[Partition] = {
    // bounds are inclusive, hence the + 1 here and - 1 on end
    val length = 1 + upperBound - lowerBound
    (0 until numPartitions).map(i => {
      val start = lowerBound + ((i * length) / numPartitions).toLong
      val end = lowerBound + (((i + 1) * length) / numPartitions).toLong - 1
      new JdbcPartition(i, start, end)
    }).toArray
  }
它直接將結果集分紅numPartitions份。其中不少參數都來自於構造函數:

class JdbcRDD[T: ClassTag](
    sc: SparkContext,
    getConnection: () => Connection,
    sql: String,
    lowerBound: Long,
    upperBound: Long,
    numPartitions: Int,
    mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)

再看看compute()函數:

  override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] {
    context.addTaskCompletionListener{ context => closeIfNeeded() }
    val part = thePart.asInstanceOf[JdbcPartition]
    val conn = getConnection()
    val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)

    // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force streaming results,
    // rather than pulling entire resultset into memory.
    // see http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html
    if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) {
      stmt.setFetchSize(Integer.MIN_VALUE)
      logInfo("statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming ")
    }

    stmt.setLong(1, part.lower)
    stmt.setLong(2, part.upper)
    val rs = stmt.executeQuery()

    override def getNext: T = {
      if (rs.next()) {
        mapRow(rs)
      } else {
        finished = true
        null.asInstanceOf[T]
      }
    }

    override def close() {
      try {
        if (null != rs && ! rs.isClosed()) {
          rs.close()
        }
      } catch {
        case e: Exception => logWarning("Exception closing resultset", e)
      }
      try {
        if (null != stmt && ! stmt.isClosed()) {
          stmt.close()
        }
      } catch {
        case e: Exception => logWarning("Exception closing statement", e)
      }
      try {
        if (null != conn && ! conn.isClosed()) {
          conn.close()
        }
        logInfo("closed connection")
      } catch {
        case e: Exception => logWarning("Exception closing connection", e)
      }
    }
  }

這段代碼就是一段sql分頁查詢執行狀況(順便吐槽一下,這段代碼寫得確實比較渣。。。肯定sql裏面不會在limit前面出現整形變量?有興趣的同仁們,趕忙操起MyBatis或者Hibernate去投稿吧!)

以上內容爲本人原創,轉載請註明博客地址:http://blog.csdn.net/bluejoe2000/article/details/41415087分佈式

如下內容爲轉載,來自:http://developer.51cto.com/art/201309/410276_1.htmide

◆ RDD的特色:函數

  1. 它是在集羣節點上的不可變的、已分區的集合對象。
  2. 經過並行轉換的方式來建立如(map, filter, join, etc)。
  3. 失敗自動重建。
  4. 能夠控制存儲級別(內存、磁盤等)來進行重用。
  5. 必須是可序列化的。
  6. 是靜態類型的。

◆ RDD的好處oop

  1. RDD只能從持久存儲或經過Transformations操做產生,相比於分佈式共享內存(DSM)能夠更高效實現容錯,對於丟失部分數據分區只需根據它的lineage就可從新計算出來,而不須要作特定的Checkpoint。
  2. RDD的不變性,能夠實現類Hadoop MapReduce的推測式執行。
  3. RDD的數據分區特性,能夠經過數據的本地性來提升性能,這與Hadoop MapReduce是同樣的。
  4. RDD都是可序列化的,在內存不足時可自動降級爲磁盤存儲,把RDD存儲於磁盤上,這時性能會有大的降低但不會差於如今的MapReduce。

◆ RDD的存儲與分區性能

  1. 用戶能夠選擇不一樣的存儲級別存儲RDD以便重用。
  2. 當前RDD默認是存儲於內存,但當內存不足時,RDD會spill到disk。
  3. RDD在須要進行分區把數據分佈於集羣中時會根據每條記錄Key進行分區(如Hash 分區),以此保證兩個數據集在Join時能高效。

◆ RDD的內部表示

在RDD的內部實現中每一個RDD均可以使用5個方面的特性來表示:

  1. 分區列表(數據塊列表)
  2. 計算每一個分片的函數(根據父RDD計算出此RDD)
  3. 對父RDD的依賴列表
  4. 對key-value RDD的Partitioner【可選】
  5. 每一個數據分片的預約義地址列表(如HDFS上的數據塊的地址)【可選】

◆ RDD的存儲級別

RDD根據useDisk、useMemory、deserialized、replication四個參數的組合提供了11種存儲級別:

 
 
 
 
  1. val NONE = new StorageLevel(falsefalsefalse)   
  2.     val DISK_ONLY = new StorageLevel(truefalsefalse)   
  3.     val DISK_ONLY_2 = new StorageLevel(truefalsefalse, 2)   
  4.     val MEMORY_ONLY = new StorageLevel(falsetruetrue)   
  5.     val MEMORY_ONLY_2 = new StorageLevel(falsetruetrue, 2)   
  6.     val MEMORY_ONLY_SER = new StorageLevel(falsetruefalse)   
  7.     val MEMORY_ONLY_SER_2 = new StorageLevel(falsetruefalse, 2)   
  8.     val MEMORY_AND_DISK = new StorageLevel(truetruetrue)   
  9.     val MEMORY_AND_DISK_2 = new StorageLevel(truetruetrue, 2)   
  10.     val MEMORY_AND_DISK_SER = new StorageLevel(truetruefalse)   
  11.     val MEMORY_AND_DISK_SER_2 = new StorageLevel(truetruefalse, 2)  

◆ RDD定義了各類操做,不一樣類型的數據由不一樣的RDD類抽象表示,不一樣的操做也由RDD進行抽實現。

RDD的生成

◆ RDD有兩種建立方式:

一、從Hadoop文件系統(或與Hadoop兼容的其它存儲系統)輸入(例如HDFS)建立。

二、從父RDD轉換獲得新RDD。

◆ 下面來看一從Hadoop文件系統生成RDD的方式,如:val file = spark.textFile("hdfs://..."),file變量就是RDD(實際是HadoopRDD實例),生成的它的核心代碼以下:

 
 
 
 
  1. // SparkContext根據文件/目錄及可選的分片數建立RDD, 這裏咱們能夠看到Spark與Hadoop MapReduce很像   
  2.    // 須要InputFormat, Key、Value的類型,其實Spark使用的Hadoop的InputFormat, Writable類型。   
  3.    def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {   
  4.        hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable],   
  5.        classOf[Text], minSplits) .map(pair => pair._2.toString) }  
  6.    
  7.    // 根據Hadoop配置,及InputFormat等建立HadoopRDD    
  8.    new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) 

◆ 對RDD進行計算時,RDD從HDFS讀取數據時與Hadoop MapReduce幾乎同樣的:

RDD的轉換與操做

◆ 對於RDD能夠有兩種計算方式:轉換(返回值仍是一個RDD)與操做(返回值不是一個RDD)。

◆ 轉換(Transformations) (如:map, filter, groupBy, join等),Transformations操做是Lazy的,也就是說從一個RDD轉換生成另外一個RDD的操做不是立刻執行,Spark在遇到Transformations操做時只會記錄須要這樣的操做,並不會去執行,須要等到有Actions操做的時候纔會真正啓動計算過程進行計算。

◆ 操做(Actions) (如:count, collect, save等),Actions操做會返回結果或把RDD數據寫到存儲系統中。Actions是觸發Spark啓動計算的動因。

相關文章
相關標籤/搜索