package scala import java.util.Properties import org.apache.spark.sql.types._ import org.apache.spark.sql.Row import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext object TestMySQL { def main(args: Array[String]) { val conf = new SparkConf() conf.setMaster("local") .setAppName("scalawordcount") //設置運行方式爲本地 val sc = new SparkContext(conf) var sqlContext = new SQLContext(sc) val employeeRDD = sqlContext.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" ")) val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true))) val rowRDD = employeeRDD.map(p => Row(p(0).toInt,p(1).trim, p(2).trim,p(3).toInt)) val employeeDF = sqlContext.createDataFrame(rowRDD, schema) val prop = new Properties() prop.put("user", "root") prop.put("password", "root") prop.put("driver","com.mysql.jdbc.Driver") prop.put("url","jdbc:mysql://localhost:3306/sparktest") employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest", "sparktest.employee", prop) val jdbcDF = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/sparktest").option("driver","com.mysql.jdbc.Driver").option("dbtable","employee").option("user","root").option("password", "root").load() jdbcDF.agg("age" -> "max", "age" -> "sum") jdbcDF.show() } }
此代碼爲scala語言所寫,經調試可用。java