RDD是個抽象類,定義了諸如map()、reduce()等方法,但實際上繼承RDD的派生類通常只要實現兩個方法:html
getPartitions()用來告知怎麼將input分片;
java
compute()用來輸出每一個Partition的全部行(行是我給出的一種不許確的說法,應該是被函數處理的一個單元);mysql
以一個hdfs文件HadoopRDD爲例:sql
override def getPartitions: Array[Partition] = { val jobConf = getJobConf() // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) val inputFormat = getInputFormat(jobConf) if (inputFormat.isInstanceOf[Configurable]) { inputFormat.asInstanceOf[Configurable].setConf(jobConf) } val inputSplits = inputFormat.getSplits(jobConf, minPartitions) val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) } array }
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = { val iter = new NextIterator[(K, V)] { val split = theSplit.asInstanceOf[HadoopPartition] logInfo("Input split: " + split.inputSplit) var reader: RecordReader[K, V] = null val jobConf = getJobConf() val inputFormat = getInputFormat(jobConf) HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime), context.stageId, theSplit.index, context.attemptId.toInt, jobConf) reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) // Register an on-task-completion callback to close the input stream. context.addTaskCompletionListener{ context => closeIfNeeded() } val key: K = reader.createKey() val value: V = reader.createValue() // Set the task input metrics. val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) try { /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't * always at record boundaries, so tasks may need to read into other splits to complete * a record. */ inputMetrics.bytesRead = split.inputSplit.value.getLength() } catch { case e: java.io.IOException => logWarning("Unable to get input size to set InputMetrics for task", e) } context.taskMetrics.inputMetrics = Some(inputMetrics) override def getNext() = { try { finished = !reader.next(key, value) } catch { case eof: EOFException => finished = true } (key, value) } override def close() { try { reader.close() } catch { case e: Exception => logWarning("Exception in RecordReader.close()", e) } } } new InterruptibleIterator[(K, V)](context, iter) }
再來看看數據庫的JdbcRDD:數據庫
override def getPartitions: Array[Partition] = { // bounds are inclusive, hence the + 1 here and - 1 on end val length = 1 + upperBound - lowerBound (0 until numPartitions).map(i => { val start = lowerBound + ((i * length) / numPartitions).toLong val end = lowerBound + (((i + 1) * length) / numPartitions).toLong - 1 new JdbcPartition(i, start, end) }).toArray }它直接將結果集分紅numPartitions份。其中不少參數都來自於構造函數:
class JdbcRDD[T: ClassTag]( sc: SparkContext, getConnection: () => Connection, sql: String, lowerBound: Long, upperBound: Long, numPartitions: Int, mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)
override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] { context.addTaskCompletionListener{ context => closeIfNeeded() } val part = thePart.asInstanceOf[JdbcPartition] val conn = getConnection() val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force streaming results, // rather than pulling entire resultset into memory. // see http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) { stmt.setFetchSize(Integer.MIN_VALUE) logInfo("statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming ") } stmt.setLong(1, part.lower) stmt.setLong(2, part.upper) val rs = stmt.executeQuery() override def getNext: T = { if (rs.next()) { mapRow(rs) } else { finished = true null.asInstanceOf[T] } } override def close() { try { if (null != rs && ! rs.isClosed()) { rs.close() } } catch { case e: Exception => logWarning("Exception closing resultset", e) } try { if (null != stmt && ! stmt.isClosed()) { stmt.close() } } catch { case e: Exception => logWarning("Exception closing statement", e) } try { if (null != conn && ! conn.isClosed()) { conn.close() } logInfo("closed connection") } catch { case e: Exception => logWarning("Exception closing connection", e) } } }
以上內容爲本人原創,轉載請註明博客地址:http://blog.csdn.net/bluejoe2000/article/details/41415087分佈式
如下內容爲轉載,來自:http://developer.51cto.com/art/201309/410276_1.htmide
◆ RDD的特色:函數
◆ RDD的好處oop
◆ RDD的存儲與分區性能
◆ RDD的內部表示
在RDD的內部實現中每一個RDD均可以使用5個方面的特性來表示:
◆ RDD的存儲級別
RDD根據useDisk、useMemory、deserialized、replication四個參數的組合提供了11種存儲級別:
- val NONE = new StorageLevel(false, false, false)
- val DISK_ONLY = new StorageLevel(true, false, false)
- val DISK_ONLY_2 = new StorageLevel(true, false, false, 2)
- val MEMORY_ONLY = new StorageLevel(false, true, true)
- val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2)
- val MEMORY_ONLY_SER = new StorageLevel(false, true, false)
- val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2)
- val MEMORY_AND_DISK = new StorageLevel(true, true, true)
- val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2)
- val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false)
- val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)
◆ RDD定義了各類操做,不一樣類型的數據由不一樣的RDD類抽象表示,不一樣的操做也由RDD進行抽實現。
RDD的生成
◆ RDD有兩種建立方式:
一、從Hadoop文件系統(或與Hadoop兼容的其它存儲系統)輸入(例如HDFS)建立。
二、從父RDD轉換獲得新RDD。
◆ 下面來看一從Hadoop文件系統生成RDD的方式,如:val file = spark.textFile("hdfs://...")
,file變量就是RDD(實際是HadoopRDD實例),生成的它的核心代碼以下:
- // SparkContext根據文件/目錄及可選的分片數建立RDD, 這裏咱們能夠看到Spark與Hadoop MapReduce很像
- // 須要InputFormat, Key、Value的類型,其實Spark使用的Hadoop的InputFormat, Writable類型。
- def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
- hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable],
- classOf[Text], minSplits) .map(pair => pair._2.toString) }
- // 根據Hadoop配置,及InputFormat等建立HadoopRDD
- new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
◆ 對RDD進行計算時,RDD從HDFS讀取數據時與Hadoop MapReduce幾乎同樣的:
RDD的轉換與操做
◆ 對於RDD能夠有兩種計算方式:轉換(返回值仍是一個RDD)與操做(返回值不是一個RDD)。
◆ 轉換(Transformations) (如:map, filter, groupBy, join等),Transformations操做是Lazy的,也就是說從一個RDD轉換生成另外一個RDD的操做不是立刻執行,Spark在遇到Transformations操做時只會記錄須要這樣的操做,並不會去執行,須要等到有Actions操做的時候纔會真正啓動計算過程進行計算。
◆ 操做(Actions) (如:count, collect, save等),Actions操做會返回結果或把RDD數據寫到存儲系統中。Actions是觸發Spark啓動計算的動因。