隨着Spark SQL的正式發佈,以及它對DataFrame的支持,它可能會取代HIVE成爲愈來愈重要的針對結構型數據進行分析的平臺。在博客文章What’s new for Spark SQL in Spark 1.3中,Databricks的工程師Michael Armbrust着重介紹了改進了的Data Source API。 sql
咱們在對結構型數據進行分析時,總不可避免會遭遇多種數據源的狀況。這些數據源包括Json、CSV、Parquet、關係型數據庫以及NoSQL數據庫。咱們天然但願可以以統一的接口來訪問這些多姿多態的數據源。數據庫
在咱們產品的應用場景中,須要訪問PostgreSQL的數據以進行數據分析。咱們能夠經過Spark SQL提供的JDBC來訪問,前提是須要PostgreSQL的driver。方法是在build.sbt中添加對應版本的driver依賴。例如:apache
libraryDependencies ++= {
val sparkVersion = "1.3.0"
Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.postgresql" % "postgresql" % "9.4-1201-jdbc41"
)
}服務器
根據Spark SQL的官方文檔,在調用Data Sources API時,能夠經過SQLContext加載遠程數據庫爲Data Frame或Spark SQL臨時表。加載時,能夠傳入的參數(屬性)包括:url、dbtable、driver、partitionColumn、lowerBound、upperBound與numPartitions。post
PostgreSQL Driver的類名爲org.postgresql.Driver。因爲屬性沒有user和password,所以要將它們做爲url的一部分。假設咱們要鏈接的數據庫服務器IP爲192.168.1.110,端口爲5432,用戶名和密碼均爲test,數據庫爲demo,要查詢的數據表爲tab_users,則訪問PostgreSQL的代碼以下所示:性能
object PostgreSqlApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("FromPostgreSql").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)ui
val query = "(SELECT * FROM tab_users) as USERS"
val url = "jdbc:postgresql://192.168.1.110:5432/demo?user=test&password=test"
val users = sqlContext.load("jdbc", Map(
"url" -> url,
"driver" -> "org.postgresql.Driver",
"dbtable" -> query
))url
users.foreach(println)
}
}spa
上面的代碼將查詢語句直接放在query變量中,並傳遞給SQLContext用以加載。另外一種方式是直接傳遞表名,而後經過調用registerTempTable()方法來註冊臨時表,並調用sql()方法執行查詢:postgresql
object PostgreSqlApp {
def main(args: Array[String]): Unit = {
//val sparkConf = new SparkConf().setAppName("FromPostgreSql").setMaster("local[2]")
val sparkConf = new SparkConf().setAppName("SparkSQL_Select_Table")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.sql.shuffle.partitions","12")
//本地啓動
.setMaster("local[2]");
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
val url = "jdbc:postgresql://192.168.1.110:5432/demo?user=test&password=test"
val dataFrame = sqlContext.read.format( "jdbc" ).options(
Map( "url" -> url, "user" -> "root", "password" -> "root", "dbtable" -> "users" )).load()
dataFrame.registerTempTable("USERS")
val users = sqlContext.sql("select * from USERS")
users.foreach(println)
}
}
從性能角度考慮,還能夠在建立SQLContext時,設置一些配置項,例如:
val sqlContext = new SQLContext(sc)sqlContext.setConf("spark.sql.inMemoryColumnarStorage.batchSize", "10000")