使用spark的 DataFrame 來操做mysql數據。html
DataFrame是比RDD更高一個級別的抽象,能夠應用SQL語句進行操做,詳細參考:java
https://spark.apache.org/docs/latest/sql-programming-guide.htmlmysql
這裏暫時使用spark-shell進行操做,sql
1.首先,必需要先下載一個mysql的jdbc的驅動shell
能夠從這裏下載apache
2.而後呢,就好辦了。ide
#具體的啓動spark-shell的方法(帶上mysql的driver)
$~/spark-shell --driver-class-path /path-to-mysql-jar/mysql-connector-java-5.1.34-bin.jar
#定義mysql的信息
val url="jdbc:mysql://10.181.176.226:3306/geo_info" val prop = new java.util.Properties prop.setProperty("user","geo") prop.setProperty("password","xxxxxx」)
#指定讀取條件,這裏 Array("country_code='CN'") 是where過濾條件
val cnFlight = sqlContext.read.jdbc(url,"gps_location",Array("country_code='CN'"),prop)
#而後進行groupby 操做,獲取數據集合 val emailList = cnFlight.groupBy("gps_city", "user_mail」)
#計算數目,並根據數目進行降序排序 val sorted = emailList.count().orderBy( desc("count") ) #顯示前10條 sorted.show(10) #存儲到文件(這裏會有不少分片文件。。。) sorted.rdd.saveAsTextFile("/home/qingpingzhang/data/flight_top」) #存儲到mysql表裏
sorted.write.jdbc(url,"table_name",prop)
3.具體文件編寫代碼,而後提交worker也相似,主要是DataFrame的 sqlContext聲明會不同。ui
val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
這裏若是要用spark-submit,則會有坑,即使你是用sbt的assembly來打包的一個全的jar包:url
參考:http://www.iteblog.com/archives/1300spa
[itelbog@iteblog ~]$ bin/spark-submit --master local[2] --driver-class-path lib/mysql-connector-java-5.1.35.jar --class spark.SparkToJDBC ./spark-test_2.10-1.0.jar