0二、建立RDD(集合、本地文件、HDFS文件)

Spark Core提供了三種建立RDD的方式,包括:使用程序中的集合建立RDD;使用本地文件建立RDD;使用HDFS文件建立RDD。

一、並行化集合

若是要經過並行化集合來建立RDD,須要針對程序中的集合,調用SparkContext的parallelize()方法。Spark會將集合中的數據拷貝到集羣上去,造成一個分佈式的數據集合,也就是一個RDD。至關因而,集合中的部分數據會到一個節點上,而另外一部分數據會到其餘節點上。而後就能夠用並行的方式來操做這個分佈式數據集合,即RDD。
 
// 案例:1到10累加求和
val arr = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val rdd = sc.parallelize(arr)
val sum = rdd.reduce(_ + _)
 
調用parallelize()時,有一個重要的參數能夠指定,就是要將集合切分紅多少個partition。Spark會爲每個partition運行一個task來進行處理。Spark官方的建議是,爲集羣中的每一個CPU建立2~4個partition。Spark默認會根據集羣的狀況來設置partition的數量。可是也能夠在調用parallelize()方法時,傳入第二個參數,來設置RDD的partition數量。好比parallelize(arr, 10)

    1.一、Java

package sparkcore;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
/**
 * 並行化集合建立RDD 案例:累加1到10
 */
public class ParallelizeCollection {
    public static void main(String[] args) {
        // 建立SparkConf
        SparkConf conf = new SparkConf().setAppName("ParallelizeCollection").setMaster("local");
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        // 要經過並行化集合的方式建立RDD,那麼就調用SparkContext以及其子類,的parallelize()方法
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
        // 執行reduce算子操做
        // 至關於,先進行1 + 2 = 3;而後再用3 + 3 = 6;而後再用6 + 4 = 10。。。以此類推
        int sum = numberRDD.reduce(new Function2<Integer, Integer, Integer>() {
            private static final long serialVersionUID = 1L;
            public Integer call(Integer num1, Integer num2throws Exception {
                return num1 + num2;
            }
        });
        // 輸出累加的和
        System.out.println("1到10的累加和:" + sum);
        // 關閉JavaSparkContext
        sc.close();
    }
}

    1.二、Scala

package sparkcore.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object ParallelizeCollection {
  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("ParallelizeCollection")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val numbers = Array(12345678910)
    val numberRDD = sc.parallelize(numbers5)
    val sum = numberRDD.reduce(_ + _)
    println("1到10的累加和:" + sum)
  }
}

二、使用本地文件和HDFS建立RDD

    
Spark是支持使用任何Hadoop支持的存儲系統上的文件建立RDD的,好比說HDFS、Cassandra、HBase以及本地文件。經過調用SparkContext的textFile()方法,能夠針對本地文件或HDFS文件建立RDD。
 
有幾個事項是須要注意的:
一、若是是針對本地文件的話,若是是在windows上本地測試,windows上有一份文件便可;若是是在spark集羣上針對linux本地文件,那麼須要將文件拷貝到全部worker節點上。
二、Spark的textFile()方法支持針對目錄、壓縮文件以及通配符進行RDD建立。
三、Spark默認會爲hdfs文件的每個block建立一個partition,可是也能夠經過textFile()的第二個參數手動設置分區數量,只能比block數量多,不能比block數量少。
 
// 案例:文件字數統計
val rdd = sc.textFile("data.txt")
val wordCount = rdd.map(line => line.length).reduce(_ + _)

Spark的textFile()除了能夠針對上述幾種普通的文件建立RDD以外,還有一些特列的方法來建立RDD:
一、SparkContext. wholeTextFiles()方法,能夠針對一個目錄中的大量小文件,返回<filename, fileContent>組成的 pair,做爲一個PairRDD,而不是普通的RDD。普通的textFile()返回的RDD中,每一個元素就是文件中的一行文本。
二、SparkContext. sequenceFile[K, V]()方法,能夠針對SequenceFile建立RDD,K和V泛型類型就是SequenceFile的key和value的類型。K和V要求必須是Hadoop的序列化類型,好比IntWritable、Text等。
三、SparkContext. hadoopRDD()方法,對於Hadoop的自定義輸入類型,能夠建立RDD。該方法接收JobConf、InputFormatClass、Key和Value的Class。
四、SparkContext.objectFile()方法,能夠針對以前調用RDD.saveAsObjectFile()建立的對象序列化的文件,反序列化文件中的數據,並建立一個RDD。

    2.一、Java

package sparkcore.java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
/**
 * 使用本地文件建立RDD 案例:統計文本文件字數
 */
public class LocalFile {
    public static void main(String[] args) {
        // 建立SparkConf
        SparkConf conf = new SparkConf().setAppName("LocalFile").setMaster("local");
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        // 使用SparkContext以及其子類的textFile()方法,針對本地文件建立RDD
        JavaRDD<String> lines = sc.textFile("D:/eclipse-jee-neon-3/workspace/sparkcore-java/test.txt");
        // 統計文本文件內的字數
        JavaRDD<Integer> lineLength = lines.map(new Function<String, Integer>() {
            private static final long serialVersionUID = 1L;
            public Integer call(String v1throws Exception {
                return v1.length();
            }
        });
        int count = lineLength.reduce(new Function2<Integer, Integer, Integer>() {
            private static final long serialVersionUID = 1L;
            public Integer call(Integer v1, Integer v2throws Exception {
                return v1 + v2;
            }
        });
        System.out.println("文件總字數是:" + count);
        // 關閉JavaSparkContext
        sc.close();
    }
}


package sparkcore.java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
/**
 * 使用HDFS文件建立RDD
 * 案例:統計文本文件字數
 */
public class HDFSFile {
    
    public static void main(String[] args) {
        // 建立SparkConf
        // 修改:去除setMaster()設置,修改setAppName()
        SparkConf conf = new SparkConf()
                .setAppName("HDFSFile"); 
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        // 使用SparkContext以及其子類的textFile()方法,針對HDFS文件建立RDD
        // 只要把textFile()內的路徑修改成hdfs文件路徑便可
        JavaRDD<String> lines = sc.textFile("hdfs://node1:8020/test.txt");
        
        // 統計文本文件內的字數
        JavaRDD<Integer> lineLength = lines.map(new Function<String, Integer>() {
            
            private static final long serialVersionUID = 1L;
            @Override
            public Integer call(String v1) throws Exception {
                return v1.length();
            }
            
        });
        
        int count = lineLength.reduce(new Function2<Integer, Integer, Integer>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
            
        });
        
        System.out.println("文件總字數是:" + count);  
        
        // 關閉JavaSparkContext
        sc.close();
    }
}
    2.二、Scala
package sparkcore.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object LocalFile {
  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("LocalFile") .setMaster( "local" );
    val sc = new SparkContext(conf)
    val lines = sc.textFile("D:/eclipse-jee-neon-3/workspace/sparkcore-scala/test.txt"1);
    val count = lines.map { line => line.length() }.reduce(_ + _)
    println("file's count is " + count)
  }
}

package sparkcore.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object HDFSFile {
  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("HDFSFile").setMaster("local")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("hdfs://node1:8020/test.txt"1);
    val count = lines.map { _.length() }.reduce(_ + _)
    println("file's count is " + count)
  }
}
相關文章
相關標籤/搜索