利用Spark SQL實現輕量級用戶數據查詢

簡介

當人們把愈來愈多的大數據存儲在HDFS或者AWS的S3上,一般下一個問題是如何讓全公司範圍的員工可以方便的查詢這些數據。一個選項是創建一個SQL-on-Hadoop系統,讓用戶使用SQL或者類SQL語言來查詢數據,可是這些SQL-on-Hadoop系統每每比較複雜,須要必定的開發和維護工做量。html

另外一個選項是,若是你已經有了Spark或者Hadoop YARN集羣,那麼利用Spark SQL,經過編寫少許的代碼,你就能夠創建一個輕量級的工具,讓用戶本身提交SQL語句,來獲取他們須要的數據。java

主要思路

這裏的思路是編寫一個Spark程序,在其中設置DataFrame(Spark SQL中的數據表)的數據格式(schema),而後用戶能夠經過Spark程序的參數,指定一個SQL查詢,進而執行這個查詢。sql

示例代碼

讓咱們經過一個具體的例子,來展現如何經過代碼實現這樣的功能。具體代碼參見這裏,下面是一些簡略解釋。apache

數據文件

咱們在AWS S3中有兩個文件: "s3n://bopublic/demo/selfservicequery/customers.json" 和 "s3n://bopublic/demo/selfservicequery/orders.json"。json

Spark程序

咱們編寫完Spark程序後,用戶能夠經過如下命令行執行SQL語句:微信

java -cp ... YourJob -q "select * from customers join orders on customers.key = orders.customerKey"ide

建立DataFrame數據格式(schema)

在Spark中,StructType類用來定義DataFrame的數據格式(schema)。下面代碼展現如何建立"customers"數據表的schema.工具

private static StructType createCustomerTableSchema() {
StructField[] fields = new StructField[] {
new StructField("key", DataTypes.IntegerType, true,
Metadata.empty()),
new StructField("name", DataTypes.StringType, true,
Metadata.empty()),
new StructField("address", DataTypes.StringType, true,
Metadata.empty())
};oop

StructType structType = new StructType(fields);
return structType;
}大數據

在Spark中加載數據

SparkConf conf = new SparkConf().setMaster(master).setAppName(
SparkSqlWithExplicitSchema.class.getSimpleName());

JavaSparkContext sc = new JavaSparkContext(conf);

String customerS3Path = "s3n://bopublic/demo/selfservicequery/customers.json";

JavaRDD customerRDD = sc.textFile(customerS3Path).mapPartitions(new ParseJson(customerTableSchema));

建立SQLContext和DataFrame

SQLContext sqlContext = new SQLContext(sc);

DataFrame customerDF = sqlContext.createDataFrame(customerRDD, createCustomerTableSchema());
customerDF.registerTempTable("customers");

運行SQL

String query = (get SQL query from program arguments);
DataFrame resultDF = sqlContext.sql(query);

輸出結果

// SerializeToCsv is a class to convert DataFrame row data to CSV. See full source code for details.

JavaRDD csvRDD = resultDF.toJavaRDD().map(new SerializeToCsv());

一點討論

顯式建立Schema

因爲Spark SQL能夠自動根據JSON檢測出數據格式,也許有人認爲咱們不須要顯式地建立DataFrame Schema。這裏咱們仍然顯式建立schema,有兩個緣由:

  • JSON文件運行忽略某些屬性,當這些屬性的值是缺省值的時候。當這種狀況發生的時候,Spark SQL檢測不出這些被忽略的屬性格式。

  • 對於其餘數據格式,好比CSV,Spark SQL無法檢測出schema,顯式建立schema使得咱們仍然能夠查詢這些數據源。

Spark Thrift JDBC/ODBC Server

Spark自帶一個Thrift JDBC/ODBC Server,人們可使用"beeline"工具鏈接上來執行SQL查詢,參考Spark文檔

這是方法也值得一試,可是目前Spark Thrift Server還未成熟到能夠產品化應用的階段。


掃描微信二維碼聯繫做者
掃描微信二維碼聯繫做者

相關文章
相關標籤/搜索