Spark(六):SparkSQLAndDataFrames對結構化數據集與非結構化數據的處理

一:簡單瞭解SparkSQLhtml

    Spark SQL 是結構化的數據處理一個Spark模塊。與基本的Spark RDD API不一樣,Spark SQL 所提供的接口爲Spark 提供有關數據和正在執行的計算的結構的詳細信息。Spark SQL內部使用這些額外的信息來執行額外的優化。有幾種方法與Spark SQL 包括 SQL、 DataFrames API 和數據集 API 進行交互。計算結果相同的執行引擎在使用時,獨立的 API/語言使用的表達計算。這種統一意味着開發人員很容易能夠提供最天然的方式來表達一個給定的轉換基於各類 Api 之間來回切換。java

    Spark SQL是Spark中的一個模塊,主要用於進行結構化數據的處理。它提供的最核心的編程抽象,就是DataFrame。同時Spark SQL還能夠做爲分佈式的SQL查詢引擎。Spark SQL最重要的功能之一,就是從Hive中查詢數據。sql

二:簡單瞭解DataFrame。

    DataFrame是一個以命名列方式組織的分佈式數據集,等同於關係型數據庫中的一個表,也至關於R/Python中的data frames(可是進行了更多的優化)。DataFrame能夠經過不少來源進行構建,包括:結構化的數據文件,Hive中的表,外部的關係型數據庫,以及RDD。shell

接下來是對 結構化數據集 與 非結構化數據集 的操做。數據庫

三:結構化數據集: 如何把JSON文件轉化爲DataFrame apache

3.1.在HDFS上放置了兩個JSON文件,即編程

people.json, 文件內容以下:json

{"id": "19","name": "berg","sex": "male","age": 19}
{"id": "20","name": "cccc","sex": "female","age": 20}
{"id": "21","name": "xxxx","sex": "male","age": 21}
{"id": "22","name": "jjjj","sex": "female","age": 21}

student.json,文件內容以下:數組

{"id": "1","name": "china","sex": "female","age": 100}
{"id": "19","name": "xujun","sex": "male","age": 22}

3.2 經過DataFrame的API來操做數據,熟悉下DataFrame中方法的使用: 架構

public class SparkSqlDemo {

	private static String appName = "Test Spark RDD";
	private static String master = "local";

	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.set("spark.testing.memory", "269522560000");
		JavaSparkContext sc = new JavaSparkContext(master, appName, conf);

		//建立了 sqlContext的上下文,注意,它是DataFrame的起點。
		SQLContext sqlContext = new SQLContext(sc);

		//本地的JSON文件轉化爲DataFrame
		DataFrame df = sqlContext.read().json("hdfs://192.168.226.129:9000/txt/sparkshell/people.json");

		//輸出表結構
		df.printSchema();

		//顯示DataFrame的內容。
		df.show();

		//選擇name
		df.select(df.col("name")).show();

		// 選擇全部年齡大於21歲的人,只保留name字段
		df.filter(df.col("age").lt(21)).select("name").show();

		// 選擇name,並把age字段自增 1
		df.select(df.col("name"), df.col("age").plus(1)).show();

		//按年齡分組計數:
		df.groupBy("age").count().show();  // 應該有一條數據記錄爲  2 

		//把另個JSON文件轉化爲DataFrame
		DataFrame df2 = sqlContext.read().json("hdfs://192.168.226.129:9000/txt/sparkshell/student.json");

		df2.show();
		
		//表的關聯。
		df.join(df2,df.col("id").equalTo(df2.col("id"))).show();
		
		
		//以編程方式運行SQL:
		//把DataFrame對象轉化爲一個虛擬的表
		df.registerTempTable("people");
		sqlContext.sql("select age,count(*) from people group by age").show();

		System.out.println(  "-------------" );
		sqlContext.sql("select * from people").show();

	}
}

3.3 以編程方式運行 SQL 查詢並返回做爲綜合結果,經過註冊表,操做sql的方式來操做數據:

public class SparkSqlDemo1 {

	private static String appName = "Test Spark RDD";
	private static String master = "local";

	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.set("spark.testing.memory", "269522560000");
		JavaSparkContext sc = new JavaSparkContext(master, appName, conf);

		//建立了 sqlContext的上下文,注意,它是DataFrame的起點。
		SQLContext sqlContext = new SQLContext(sc);

		//本地的JSON文件轉化爲DataFrame
		DataFrame df = sqlContext.read().json("hdfs://192.168.226.129:9000/txt/sparkshell/people.json");

		//把另外一個JSON文件轉化爲DataFrame
		DataFrame df2 = sqlContext.read().json("hdfs://192.168.226.129:9000/txt/sparkshell/student.json");		

		//以編程方式運行SQL:
		//把DataFrame對象轉化爲一個虛擬的表
		df.registerTempTable("people");
		df2.registerTempTable("student");

		// 查詢虛擬表 people 中全部數據
		sqlContext.sql("select * from people").show();

		//查看某個字段  
		sqlContext.sql("select name from people ").show();

		//查看多個字段  
		sqlContext.sql("select name,age+1 from people ").show();  

		//過濾某個字段的值  
		sqlContext.sql("select name, age from people where age>=21").show();

		//count group 某個字段的值  
		sqlContext.sql("select age,count(*) countage from people group by age").show();


		//關聯: 內聯 。 
		sqlContext.sql("select * from people inner join student on people.id = student.id ").show();
		/*
	    +---+---+----+----+---+---+-----+----+
		|age| id|name| sex|age| id| name| sex|
		+---+---+----+----+---+---+-----+----+
		| 19| 19|berg|male| 22| 19|xujun|male|
		+---+---+----+----+---+---+-----+----+ 
		 */
	}
}

 

四:非結構化數據集:

    第一種方法使用反射來推斷架構 RDD 包含特定類型的對象。
    這種基於反射方法致使更簡潔的代碼和工程好當您已經知道該Schema編寫Spark應用程序時。

    建立 DataFrames 的第二個方法是經過容許您構建一個Schema,而後將它應用於現有 RDD 的編程接口。
    雖然這種方法更爲詳細,它容許您構建 DataFrames 時直到運行時才知道的列和它們的類型。

4.1  非結構化的數據集文件,user.txt,內容以下:

1,"Hadoop",20
2,"HBase", 21
3,"Zookeeper",22
4,"Hive",23
5,"Spark",24
6,"Berg",22
7,"Xujun",23

4.2  經過 class反射來註冊一張表。

        Spark SQL 支持 JavaBeans RDD 自動轉換分佈式數據集。BeanInfo,使用反射來獲取定義表的架構。目前,Spark SQL 不支持包含嵌套的 JavaBeans 或包含複雜的類型,例如列表或數組。您能夠經過建立一個類,實現可序列化並有 getter 和 setter 方法的全部其字段建立 JavaBean。

public class SparkSqlDemo2 {

	private static String appName = "Test Spark RDD";
	private static String master = "local";

	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.set("spark.testing.memory", "269522560000");
		JavaSparkContext sc = new JavaSparkContext(master, appName, conf);

		//建立了 sqlContext的上下文,注意,它是DataFrame的起點。
		SQLContext sqlContext = new SQLContext(sc);

		//把加載的文本文件 並 每行轉換 JavaBean
		JavaRDD<String> rdd = sc.textFile("hdfs://192.168.226.129:9000/txt/sparkshell/user.txt");

		JavaRDD<User> userRDD = rdd.map( new Function<String, User>() {

			private static final long serialVersionUID = 1L;

			public User call(String line) throws Exception {
				String[] parts = line.split(",");
				User user = new User();
				user.setId(Integer.parseInt(parts[0].trim()));
				user.setName(parts[1].trim());
				user.setAge(Integer.parseInt(parts[2].trim()));
				return user;
			}
		});

		// collect 屬於行動算子Action 提交做業並觸發運算。
		List<User> list = userRDD.collect();
		for (User user : list) {
			System.out.println(  user );
		}

		//經過 class 反射註冊一張表
		DataFrame df = sqlContext.createDataFrame(userRDD, User.class);
		df.registerTempTable("user");

		DataFrame df1 = sqlContext.sql("SELECT id,name,age FROM user WHERE age >= 21 AND age <= 23");

		// 經過sql 查詢的結果是 DataFrame 即df1 它仍是支持 RDD的全部正常操做。
		df1.show();
		
		//而且 結果中的行列能夠按序號訪問。
		List<String> listString = df1.javaRDD().map(new Function<Row, String>() {
			
			private static final long serialVersionUID = 1L;

			public String call(Row row) {
				return "Id: " + row.getInt(0) + ", Name: "+row.getString(1) + ", Age: " + row.getInt(2);
			}
		}).collect();
		
		for (String string : listString) {
			System.out.println(  string );
		}
	}
}

 

4.3   以編程方式指定 schema, 經過字段反射來映射註冊臨時表

    在某些狀況下不能提早定義 JavaBean 類 (例如,記錄的結構編碼的字符串,或將解析文本數據集和領域預計將以不一樣的方式爲不一樣的用戶),
    三個步驟,能夠以編程方式建立分佈式數據集。

    1. 從原始 RDD; 建立行 RDD

    2. 建立由 StructType 中 RDD 在步驟 1 中建立的行結構相匹配的schema。

    3.適用於行 RDD 經過 createDataFrame 方法由 SQLContext 提供的schema。

    

public class SparkSqlDemo3 {

	private static String appName = "Test Spark RDD";
	private static String master = "local";

	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.set("spark.testing.memory", "269522560000");
		JavaSparkContext sc = new JavaSparkContext(master, appName, conf);

		//建立了 sqlContext的上下文,注意,它是DataFrame的起點。
		SQLContext sqlContext = new SQLContext(sc);

		//把加載的文本文件 並 每行轉換 JavaBean
		JavaRDD<String> rdd = sc.textFile("hdfs://192.168.226.129:9000/txt/sparkshell/user.txt");

		// schema 以字符串形式編碼
		String schemaString = "id name age";

		// 基於 字符串的schema生成 schema。
		List<StructField> fields = new ArrayList<StructField>();
		
		String[] str = schemaString.split(" ");
		fields.add(DataTypes.createStructField(str[0], DataTypes.IntegerType, true));
		fields.add(DataTypes.createStructField(str[1], DataTypes.StringType, true));
		fields.add(DataTypes.createStructField(str[2], DataTypes.IntegerType, true));

		StructType schema = DataTypes.createStructType(fields);  //  id name age

		JavaRDD<Row> rowRDD = rdd.map( new Function<String, Row>() {

			private static final long serialVersionUID = 1L;
			public Row call(String record) throws Exception {
				String[] fields = record.split(",");
				return RowFactory.create(Integer.parseInt(fields[0].trim()), fields[1].trim(),Integer.parseInt(fields[2].trim()));
			}
		});

		List<Row> list = rowRDD.collect();
		for (Row row : list) {
			System.out.println(  row.getInt(0) + "\t"+ row.getString(1) + "\t"+row.getInt(2)  );
		}

		//對RDD應用schema 並註冊一張表:
		DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
		System.out.println( "df : " + df);
		df.registerTempTable("user");

		df.show();
		DataFrame df2 = sqlContext.sql("SELECT id,name,age FROM user WHERE age >= 21 AND age <= 23");

		// 經過sql 查詢的結果是 DataFrame 即df1 它仍是支持 RDD的全部正常操做。
		df2.show();
		// 而且 結果中的行列能夠按序號訪問。
		List<String> listString = df2.javaRDD().map(new Function<Row, String>() {

			private static final long serialVersionUID = 1L;
			public String call(Row row) {
				System.out.println( row );
				return "Id: " + row.getInt(0) + ", Name: "+row.getString(1) + ", Age: " + row.getInt(2);
			}
		}).collect();

		for (String string : listString) {
			System.out.println(  string );
		}

	}
}

 

 

注意若是將上述代碼段中的一段,即:

        String[] str = schemaString.split(" ");

        fields.add(DataTypes.createStructField(str[0], DataTypes.IntegerType, true));

        fields.add(DataTypes.createStructField(str[1], DataTypes.StringType, true));

        fields.add(DataTypes.createStructField(str[2], DataTypes.IntegerType, true));

 

改成下面這段代碼:

   
     for (String fieldName: schemaString.split(" ")) {


            fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));

        }

將會出現如下錯誤:

        Caused by: scala.MatchError: 1 (of class java.lang.Integer)

 

那就來認識下   局部套用和部分應用 : http://www.ibm.com/developerworks/cn/java/j-jn9/

 

更多學習: 

    來自於Spark官網的學習資料:

    http://spark.apache.org/docs/latest/sql-programming-guide.html#dataframes

相關文章
相關標籤/搜索