Spark使用Java讀取mysql數據和保存數據到mysql

原文引自:http://blog.csdn.net/fengzhimohan/article/details/78471952java

項目應用須要利用Spark讀取mysql數據進行數據分析,而後將分析結果保存到mysql中。 
開發環境: 
java:1.8 
IDEA 
spark:1.6.2mysql

一.讀取mysql數據 
1.建立一個mysql數據庫 
user_test表結構以下:sql

1 create table user_test (
2 id int(11) default null comment "id",
3 name varchar(64) default null comment "用戶名",
4 password varchar(64) default null comment "密碼",
5 age int(11) default null comment "年齡"
6 )engine=InnoDB default charset=utf-8;

2.插入數據數據庫

1 insert into user_test values(12, 'cassie', '123456', 25);
2 insert into user_test values(11, 'zhangs', '1234562', 26);
3 insert into user_test values(23, 'zhangs', '2321312', 27);
4 insert into user_test values(22, 'tom', 'asdfg', 28);

3.建立maven工程,命名爲Test,添加java類SparkMysql apache

 

 

添加依賴包json

pom文件內容:api

 

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <modelVersion>4.0.0</modelVersion>
 6 
 7     <groupId>SparkSQL</groupId>
 8     <artifactId>com.sparksql.test</artifactId>
 9     <version>1.0-SNAPSHOT</version>
10     <properties>
11          <java.version>1.8</java.version>
12     </properties>
13     <dependencies>
14         <dependency>
15             <groupId>mysql</groupId>
16             <artifactId>mysql-connector-java</artifactId>
17             <version>5.1.24</version>
18         </dependency>
19         <dependency>
20             <groupId>org.apache.hadoop</groupId>
21             <artifactId>hadoop-common</artifactId>
22             <version>2.6.0</version>
23         </dependency>
24         <dependency>
25             <groupId>net.sf.json-lib</groupId>
26             <artifactId>json-lib</artifactId>
27             <version>2.4</version>
28             <classifier>jdk15</classifier>
29         </dependency>
30 
31     </dependencies>
32 
33 </project>

4.編寫spark代碼app

 

 1 import org.apache.spark.SparkConf;
 2 import org.apache.spark.api.java.JavaSparkContext;
 3 import org.apache.spark.sql.DataFrame;
 4 import org.apache.spark.sql.SQLContext;
 5 
 6 import java.util.Properties;
 7 
 8 /**
 9  * Created by Administrator on 2017/11/6.
10  */
11 public class SparkMysql {
12     public static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(SparkMysql.class);
13 
14     public static void main(String[] args) {
15         JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("SparkMysql").setMaster("local[5]"));
16         SQLContext sqlContext = new SQLContext(sparkContext);
17         //讀取mysql數據
18         readMySQL(sqlContext);
19 
20         //中止SparkContext
21         sparkContext.stop();
22     }
23         private static void readMySQL(SQLContext sqlContext){
24         //jdbc.url=jdbc:mysql://localhost:3306/database
25         String url = "jdbc:mysql://localhost:3306/test";
26         //查找的表名
27         String table = "user_test";
28         //增長數據庫的用戶名(user)密碼(password),指定test數據庫的驅動(driver)
29         Properties connectionProperties = new Properties();
30         connectionProperties.put("user","root");
31         connectionProperties.put("password","123456");
32         connectionProperties.put("driver","com.mysql.jdbc.Driver");
33 
34         //SparkJdbc讀取Postgresql的products表內容
35         System.out.println("讀取test數據庫中的user_test表內容");
36         // 讀取表中全部數據
37         DataFrame jdbcDF = sqlContext.read().jdbc(url,table,connectionProperties).select("*");
38         //顯示數據
39         jdbcDF.show();
40     }
41 }

運行結果: maven

 

 

二.寫入數據到mysql中oop

 1 import org.apache.spark.SparkConf;
 2 import org.apache.spark.api.java.JavaRDD;
 3 import org.apache.spark.api.java.JavaSparkContext;
 4 import org.apache.spark.api.java.function.Function;
 5 import org.apache.spark.sql.DataFrame;
 6 import org.apache.spark.sql.Row;
 7 import org.apache.spark.sql.RowFactory;
 8 import org.apache.spark.sql.SQLContext;
 9 import org.apache.spark.sql.types.DataTypes;
10 import org.apache.spark.sql.types.StructType;
11 
12 import java.util.ArrayList;
13 import java.util.Arrays;
14 import java.util.List;
15 import java.util.Properties;
16 
17 /**
18  * Created by Administrator on 2017/11/6.
19  */
20 public class SparkMysql {
21     public static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(SparkMysql.class);
22 
23     public static void main(String[] args) {
24         JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("SparkMysql").setMaster("local[5]"));
25         SQLContext sqlContext = new SQLContext(sparkContext);
26         //寫入的數據內容
27         JavaRDD<String> personData = sparkContext.parallelize(Arrays.asList("1 tom 5","2 jack 6","3 alex 7"));
28         //數據庫內容
29         String url = "jdbc:mysql://localhost:3306/test";
30         Properties connectionProperties = new Properties();
31         connectionProperties.put("user","root");
32         connectionProperties.put("password","123456");
33         connectionProperties.put("driver","com.mysql.jdbc.Driver");
34         /**
35          * 第一步:在RDD的基礎上建立類型爲Row的RDD
36          */
37         //將RDD變成以Row爲類型的RDD。Row能夠簡單理解爲Table的一行數據
38         JavaRDD<Row> personsRDD = personData.map(new Function<String,Row>(){
39             public Row call(String line) throws Exception {
40                 String[] splited = line.split(" ");
41                 return RowFactory.create(Integer.valueOf(splited[0]),splited[1],Integer.valueOf(splited[2]));
42             }
43         });
44 
45         /**
46          * 第二步:動態構造DataFrame的元數據。
47          */
48         List structFields = new ArrayList();
49         structFields.add(DataTypes.createStructField("id",DataTypes.IntegerType,true));
50         structFields.add(DataTypes.createStructField("name",DataTypes.StringType,true));
51         structFields.add(DataTypes.createStructField("age",DataTypes.IntegerType,true));
52 
53         //構建StructType,用於最後DataFrame元數據的描述
54         StructType structType = DataTypes.createStructType(structFields);
55 
56         /**
57          * 第三步:基於已有的元數據以及RDD<Row>來構造DataFrame
58          */
59         DataFrame personsDF = sqlContext.createDataFrame(personsRDD,structType);
60 
61         /**
62          * 第四步:將數據寫入到person表中
63          */
64         personsDF.write().mode("append").jdbc(url,"person",connectionProperties);
65 
66         //中止SparkContext
67         sparkContext.stop();
68     }
69  }

 

運行結果:

相關文章
相關標籤/搜索