Spark SQL支持兩種不一樣的方法將現有的RDDs轉換爲數據集。java
第一個方法:使用反射來推斷包含特定對象類型的RDD的模式。這種基於反射的方法使代碼更加簡潔,而且當您在編寫Spark應用程序時已經瞭解了模式時,它能夠很好地工做。sql
第一種方法代碼實例java版本實現:apache
數據準備studentDatatxt
api
1001,20,zhangsan1002,17,lisi1003,24,wangwu1004,16,zhaogang
本地模式代碼實現:ide
package com.unicom.ljs.spark220.study;
import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.VoidFunction;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.SQLContext;
/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-01-20 08:58 * @version: v1.0 * @description: com.unicom.ljs.spark220.study */public class RDD2DataFrameReflect { public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD2DataFrameReflect"); JavaSparkContext sc = new JavaSparkContext(sparkConf); SQLContext sqlContext=new SQLContext(sc);
JavaRDD<String> lines = sc.textFile("C:\\Users\\Administrator\\Desktop\\studentData.txt"); JavaRDD<Student2> studentRDD = lines.map(new Function<String, Student2>() { @Override public Student2 call(String line) throws Exception { String[] split = line.split(","); Student2 student=new Student2(); student.setId(Integer.valueOf(split[0])); student.setAge(Integer.valueOf(split[1])); student.setName(split[2]); return student; } }); //使用反射方式將RDD轉換成dataFrame //將Student.calss傳遞進去,其實就是利用反射的方式來建立DataFrame Dataset<Row> dataFrame = sqlContext.createDataFrame(studentRDD, Student2.class); //拿到DataFrame以後將其註冊爲臨時表,而後針對其中的數據執行SQL語句 dataFrame.registerTempTable("studentTable");
//針對student臨時表,執行sql語句查詢年齡小於18歲的學生, /*DataFrame rowDF */ Dataset<Row> dataset = sqlContext.sql("select * from studentTable where age < 18"); JavaRDD<Row> rowJavaRDD = dataset.toJavaRDD(); JavaRDD<Student2> ageRDD = rowJavaRDD.map(new Function<Row, Student2>() { @Override public Student2 call(Row row) throws Exception { Student2 student = new Student2(); student.setId(row.getInt(0)); student.setAge(row.getInt(1)); student.setName(row.getString(2));
return student; } }); ageRDD.foreach(new VoidFunction<Student2>() { @Override public void call(Student2 student) throws Exception { System.out.println(student.toString()); } }); }}
Student2類:this
package com.unicom.ljs.spark220.study;
import java.io.Serializable;
/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-01-20 08:57 * @version: v1.0 * @description: com.unicom.ljs.spark220.study */public class Student2 implements Serializable { int id; int age; String name;
public int getId() { return id; }
public void setId(int id) { this.id = id; }
public int getAge() { return age; }
public void setAge(int age) { this.age = age; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
@Override public String toString() { return "Student2{" + "id=" + id + ", age=" + age + ", name='" + name + '\'' + '}'; }}
pom.xml關鍵依賴:
spa
2.2.02.11.8
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version></dependency><dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version></dependency>