kudu之因此執行很是快速,能夠用來替代HDFS和Hbase等,一個主要緣由是,咱們能夠將普通SQL中的謂詞推入kudu引擎,這樣kudu查詢數據會變的很是快;sql
將謂詞評估推入Kudu引擎能夠提升性能,由於它能夠減小須要流回Spark引擎以進行進一步評估和處理的數據量。apache
經過Spark API當前支持謂詞下推的謂詞集包括:oop
等於(=) 大於(>) 大於或等於(> =) 小於(<) 小於等於(<=)
所以,Spark SQL中的這些語句會將謂詞評估推向Kudu的存儲引擎,從而提升總體性能。性能
import org.apache.kudu.spark.kudu._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SparkSession /** * Created by angel; */ object Predicate_pushDown { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("AcctfileProcess") //設置Master_IP並設置spark參數 .setMaster("local") .set("spark.worker.timeout", "500") .set("spark.cores.max", "10") .set("spark.rpc.askTimeout", "600s") .set("spark.network.timeout", "600s") .set("spark.task.maxFailures", "1") .set("spark.speculationfalse", "false") .set("spark.driver.allowMultipleContexts", "true") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sparkContext = SparkContext.getOrCreate(sparkConf) val sqlContext = SparkSession.builder().config(sparkConf).getOrCreate().sqlContext //使用spark建立kudu表 val kuduMasters = "hadoop01:7051,hadoop02:7051,hadoop03:7051" val kuduContext = new KuduContext(kuduMasters, sqlContext.sparkContext) //TODO 1:定義kudu表 val kuduTableName = "spark_kudu_tbl" //TODO 2:配置kudu參數 val kuduOptions: Map[String, String] = Map( "kudu.table" -> kuduTableName, "kudu.master" -> kuduMasters) //TODO 3:註冊kudu表做爲spark的臨時表 sqlContext.read.options(kuduOptions).kudu.registerTempTable(kuduTableName) //TODO 4:執行sparkSQL語句,spark會自動將謂詞推入kudu引擎 val customerNameAgeDF = sqlContext. sql(s"""SELECT name, age FROM $kuduTableName WHERE age >= 30""") //TODO 5:展現結果 customerNameAgeDF.show() //TODO 6:使用sparkSQL的查詢計劃 customerNameAgeDF.explain() } }
能夠看到查詢計劃:ui
== Physical Plan == Scan org.apache.kudu.spark.kudu.KuduRelation@781dbe44 [name#0,age#1] PushedFilters: [IsNotNull(age), *GreaterThanOrEqual(age,30)], ReadSchema: structname:string,age:intspa