原文引自:http://blog.csdn.net/fengzhimohan/article/details/78471952java
項目應用須要利用Spark讀取mysql數據進行數據分析,而後將分析結果保存到mysql中。
開發環境:
java:1.8
IDEA
spark:1.6.2mysql
一.讀取mysql數據
1.建立一個mysql數據庫
user_test表結構以下:sql
1 create table user_test ( 2 id int(11) default null comment "id", 3 name varchar(64) default null comment "用戶名", 4 password varchar(64) default null comment "密碼", 5 age int(11) default null comment "年齡" 6 )engine=InnoDB default charset=utf-8;
2.插入數據數據庫
1 insert into user_test values(12, 'cassie', '123456', 25); 2 insert into user_test values(11, 'zhangs', '1234562', 26); 3 insert into user_test values(23, 'zhangs', '2321312', 27); 4 insert into user_test values(22, 'tom', 'asdfg', 28);
3.建立maven工程,命名爲Test,添加java類SparkMysql apache
添加依賴包json
pom文件內容:api
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 7 <groupId>SparkSQL</groupId> 8 <artifactId>com.sparksql.test</artifactId> 9 <version>1.0-SNAPSHOT</version> 10 <properties> 11 <java.version>1.8</java.version> 12 </properties> 13 <dependencies> 14 <dependency> 15 <groupId>mysql</groupId> 16 <artifactId>mysql-connector-java</artifactId> 17 <version>5.1.24</version> 18 </dependency> 19 <dependency> 20 <groupId>org.apache.hadoop</groupId> 21 <artifactId>hadoop-common</artifactId> 22 <version>2.6.0</version> 23 </dependency> 24 <dependency> 25 <groupId>net.sf.json-lib</groupId> 26 <artifactId>json-lib</artifactId> 27 <version>2.4</version> 28 <classifier>jdk15</classifier> 29 </dependency> 30 31 </dependencies> 32 33 </project>
4.編寫spark代碼app
1 import org.apache.spark.SparkConf; 2 import org.apache.spark.api.java.JavaSparkContext; 3 import org.apache.spark.sql.DataFrame; 4 import org.apache.spark.sql.SQLContext; 5 6 import java.util.Properties; 7 8 /** 9 * Created by Administrator on 2017/11/6. 10 */ 11 public class SparkMysql { 12 public static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(SparkMysql.class); 13 14 public static void main(String[] args) { 15 JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("SparkMysql").setMaster("local[5]")); 16 SQLContext sqlContext = new SQLContext(sparkContext); 17 //讀取mysql數據 18 readMySQL(sqlContext); 19 20 //中止SparkContext 21 sparkContext.stop(); 22 } 23 private static void readMySQL(SQLContext sqlContext){ 24 //jdbc.url=jdbc:mysql://localhost:3306/database 25 String url = "jdbc:mysql://localhost:3306/test"; 26 //查找的表名 27 String table = "user_test"; 28 //增長數據庫的用戶名(user)密碼(password),指定test數據庫的驅動(driver) 29 Properties connectionProperties = new Properties(); 30 connectionProperties.put("user","root"); 31 connectionProperties.put("password","123456"); 32 connectionProperties.put("driver","com.mysql.jdbc.Driver"); 33 34 //SparkJdbc讀取Postgresql的products表內容 35 System.out.println("讀取test數據庫中的user_test表內容"); 36 // 讀取表中全部數據 37 DataFrame jdbcDF = sqlContext.read().jdbc(url,table,connectionProperties).select("*"); 38 //顯示數據 39 jdbcDF.show(); 40 } 41 }
運行結果: maven
二.寫入數據到mysql中oop
1 import org.apache.spark.SparkConf; 2 import org.apache.spark.api.java.JavaRDD; 3 import org.apache.spark.api.java.JavaSparkContext; 4 import org.apache.spark.api.java.function.Function; 5 import org.apache.spark.sql.DataFrame; 6 import org.apache.spark.sql.Row; 7 import org.apache.spark.sql.RowFactory; 8 import org.apache.spark.sql.SQLContext; 9 import org.apache.spark.sql.types.DataTypes; 10 import org.apache.spark.sql.types.StructType; 11 12 import java.util.ArrayList; 13 import java.util.Arrays; 14 import java.util.List; 15 import java.util.Properties; 16 17 /** 18 * Created by Administrator on 2017/11/6. 19 */ 20 public class SparkMysql { 21 public static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(SparkMysql.class); 22 23 public static void main(String[] args) { 24 JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("SparkMysql").setMaster("local[5]")); 25 SQLContext sqlContext = new SQLContext(sparkContext); 26 //寫入的數據內容 27 JavaRDD<String> personData = sparkContext.parallelize(Arrays.asList("1 tom 5","2 jack 6","3 alex 7")); 28 //數據庫內容 29 String url = "jdbc:mysql://localhost:3306/test"; 30 Properties connectionProperties = new Properties(); 31 connectionProperties.put("user","root"); 32 connectionProperties.put("password","123456"); 33 connectionProperties.put("driver","com.mysql.jdbc.Driver"); 34 /** 35 * 第一步:在RDD的基礎上建立類型爲Row的RDD 36 */ 37 //將RDD變成以Row爲類型的RDD。Row能夠簡單理解爲Table的一行數據 38 JavaRDD<Row> personsRDD = personData.map(new Function<String,Row>(){ 39 public Row call(String line) throws Exception { 40 String[] splited = line.split(" "); 41 return RowFactory.create(Integer.valueOf(splited[0]),splited[1],Integer.valueOf(splited[2])); 42 } 43 }); 44 45 /** 46 * 第二步:動態構造DataFrame的元數據。 47 */ 48 List structFields = new ArrayList(); 49 structFields.add(DataTypes.createStructField("id",DataTypes.IntegerType,true)); 50 structFields.add(DataTypes.createStructField("name",DataTypes.StringType,true)); 51 structFields.add(DataTypes.createStructField("age",DataTypes.IntegerType,true)); 52 53 //構建StructType,用於最後DataFrame元數據的描述 54 StructType structType = DataTypes.createStructType(structFields); 55 56 /** 57 * 第三步:基於已有的元數據以及RDD<Row>來構造DataFrame 58 */ 59 DataFrame personsDF = sqlContext.createDataFrame(personsRDD,structType); 60 61 /** 62 * 第四步:將數據寫入到person表中 63 */ 64 personsDF.write().mode("append").jdbc(url,"person",connectionProperties); 65 66 //中止SparkContext 67 sparkContext.stop(); 68 } 69 }
運行結果: