大數據學習——sparkSql對接mysql

1上傳jarjava

2 加載驅動包mysql

[root@mini1 bin]#  ./spark-shell --master spark://mini1:7077 --jars mysql-connector-java-5.1.32.jar --driver-class-path mysql-connector-java-5.1.32.jar   
create table dept(
    deptno int ,
    dname varchar(14) ,
    loc varchar(13) 
) ;

create table emp(
    eno int ,
    ename varchar(10),
    job varchar(9),
    mgr int,
    hirdate date,
    sal int,
    comm int,
    deptno int not null
);
INSERT INTO dept VALUES(10,'ACCOUNTING','NEW YORK');
INSERT INTO dept VALUES(20,'RESEARCH','DALLAS');
INSERT INTO dept VALUES(30,'SALES','CHICAGO');
INSERT INTO dept VALUES(40,'OPERATIONS','BOSTON');
INSERT INTO emp VALUES(7369,'SMITH','CLERK',7902,'1980-12-17',800,NULL,20);
INSERT INTO emp VALUES(7499,'ALLEN','SALESMAN',7698,'1981-02-20',1600,300,30);
INSERT INTO emp VALUES(7521,'WARD','SALESMAN',7698,'1981-02-22',1250,500,30);
INSERT INTO emp VALUES(7566,'JONES','MANAGER',7839,'1981-04-02',2975,NULL,20);
INSERT INTO emp VALUES(7654,'MARTIN','SALESMAN',7698,'1981-09-28',1250,1400,30);
INSERT INTO emp VALUES(7698,'BLAKE','MANAGER',7839,'1981-05-01',2850,NULL,30);
INSERT INTO emp VALUES(7782,'CLARK','MANAGER',7839,'1981-06-09',2450,NULL,10);
INSERT INTO emp VALUES(7788,'SCOTT','ANALYST',7566,'1987-06-13',3000,NULL,20);
INSERT INTO emp VALUES(7839,'KING','PRESIDENT',NULL,'1981-11-17',5000,NULL,10);
INSERT INTO emp VALUES(7844,'TURNER','SALESMAN',7698,'1981-09-08',1500,0,30);
INSERT INTO emp VALUES(7876,'ADAMS','CLERK',7788,'1987-06-13',1100,NULL,20);
INSERT INTO emp VALUES(7900,'JAMES','CLERK',7698,'1981-12-03',950,NULL,30);
INSERT INTO emp VALUES(7902,'FORD','ANALYST',7566,'1981-12-03',3000,NULL,20);
INSERT INTO emp VALUES(7934,'MILLER','CLERK',7782,'1983-01-23',1300,NULL,10);

 

sql

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val empDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:mysql://192.168.74.100:3306/test", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "emp", "user" -> "root", "password" -> "123456")).load()

val deptDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:mysql://192.168.74.100:3306/test", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "dept", "user" -> "root", "password" -> "123456")).load()

 

4 讀取數據(注意mysql要啓動shell

empDF.show()
deptDF.show()

empDF.registerTempTable("emp")
deptDF.registerTempTable("dept")
 

 

1.列出至少有一個員工的全部部門。
分析:每一個部門有多少員工  ------ 根據部門編號進行分組
select deptno,count(*) from emp group by deptno having count(*) >= 1;

2.列出薪金比「SMITH」多的全部員工。(是否支持子查詢)
分析:先查詢出SMITH工資  : select sal from emp where ename='SMITH';
select * from emp where sal > (select sal from emp where ename='SMITH');

3***** 列出全部員工的姓名及其直接上級的姓名。
分析:表自映射,爲表起別名,進行關聯  t1 表模擬員工表 t2 表保存直接上級信息
select t1.ename 員工姓名, t2.ename 直接上級 from emp t1,emp t2 where t1.MGR = t2.empno;

4.列出受僱日期早於其直接上級的全部員工。
分析:原理和上題相似
select t1.*,t2.hirdate from emp t1,emp t2 where t1.MGR = t2.eno and t1.hirdate < t2.hirdate

5.列出部門名稱和這些部門的員工信息,同時列出那些沒有員工的部門。
分析:部門沒員工也要顯示 --- 外鏈接。不管怎樣部門信息必定要顯示,經過部門去關聯員工
select * from dept left outer join emp on dept.deptno = emp.deptno ;

6.列出全部「CLERK」(辦事員)的姓名及其部門名稱。
分析:查找job爲CLERK 員工姓名和部門名稱 
員工姓名 emp表
部門名稱 dept表
select emp.ename,dept.dname,emp.job from emp,dept where emp.deptno = dept.deptno and emp.job='CLERK'; 

7.列出最低薪金大於1500的各類工做。
分析:工做的最低薪金 ---- 按工做分組,求最低薪金
select min(sal) from emp group by job;
大於1500 是一個分組條件 --- having 
select job,min(sal) from emp group by job having min(sal) > 1500;

8.列出在部門「SALES」(銷售部)工做的員工的姓名,假定不知道銷售部的部門編號。
分析:員工姓名位於 emp  部門名稱 dept
select emp.ename from emp,dept where emp.deptno = dept.deptno and dept.dname = 'SALES';

9.列出薪金高於公司平均薪金的全部員工。
分析:先求公司平均薪金 select avg(sal) from emp;
select * from emp where sal > (select avg(sal) from emp);

10.列出與「SCOTT」從事相同工做的全部員工。
分析:先查詢SCOTT : select job from emp where ename ='SCOTT';
select * from emp where ename <> 'SCOTT' and job = (select job from emp where ename ='SCOTT');





13.列出在每一個部門工做的員工數量、平均工資。
分析:按部門分組
select deptno, count(*),avg(sal)  from emp group by deptno;

14.列出全部員工的姓名、部門名稱和工資。
分析:
select emp.ename,dept.dname,emp.sal from emp,dept where emp.deptno = dept.deptno;

15.列出全部部門的詳細信息和部門人數。
分析:
select dept.deptno,count(1) from emp,dept where emp.deptno=dept.deptno group by dept.deptno ; 
 

16.列出各類工做的最低工資。
分析:各個工做 分組 , 最低工資 min
select job,min(sal) from emp group by job;

17.列出各個部門的MANAGER(經理)的最低薪金。
分析:where job='MANAGER' 過濾全部不是經理數據
select deptno,min(sal) from emp where job ='MANAGER' group by deptno;

18.列出全部員工的年工資,按年薪從低到高排序。 
分析: select ename, sal*12 from emp order by sal*12 asc;

19.查出emp表中薪水在3000以上(包括3000)的全部員工的員工號、姓名、薪水。
分析: select * from emp where sal >= 3000;


22.查詢出emp表中全部的工做種類(無重複)
分析: select distinct job from emp;

23.查詢出全部獎金(comm)字段不爲空的人員的全部信息。
分析:不爲空 is not null
select * from emp where comm is not null;

24.查詢出薪水在800到2500之間(閉區間)全部員工的信息。(注:使用兩種方式實現and以及between and)
分析:select * from emp where sal >= 800 and sal <= 2500;
select * from emp where sal between 800 and 2500;

25.查詢出員工號爲7521,7900,7782的全部員工的信息。(注:使用兩種方式實現,or以及in)
分析:select * from emp where eno in(7521,7900,7782);
select * from emp where eno=7521 or eno = 7900 or eno = 7782;

26.查詢出名字中有「A」字符,而且薪水在1000以上(不包括1000)的全部員工信息。
分析: 模糊查詢
select * from emp where ename like '%A%' and sal > 1000;

27.查詢出名字第三個字母是「M」的全部員工信息。
分析:第三個字母 __M%
select * from emp where ename like '__M%';

28.將全部員工按薪水升序排序,薪水相同的按照入職時間降序排序。
分析:select * from emp order by sal asc,hiredate desc;

29.將全部員工按照名字首字母升序排序,首字母相同的按照薪水降序排序。
分析:SUBSTRING('字符串',第幾個字符,長度);  ---- 首字母 substring(ename,1,1)
select * from emp order by substring(ename,1,1) asc,sal desc;

 

 

5 往mysql數據庫寫數據數據庫

package org.apache.spark

import java.util.Properties

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, DataFrameHolder, SQLContext}

/**
  * Created by Administrator on 2019/6/13.
  */
object JDBCsparksql {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("spark-joindemo").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val file = sc.textFile("hdfs://mini1:9000/person.json")
    val personRDD: RDD[Person] = file.map(_.split(" ")).map(x => Person(x(0).toLong, x(1), x(2).toInt))

    import sqlContext.implicits._
    val personDF: DataFrame = personRDD.toDF()
    personDF.registerTempTable("person")
    val p: Properties = new Properties()
    p.put("user", "root")
    p.put("password", "123456")

    sqlContext.sql("select * from person").write.mode("overwrite").jdbc("jdbc:mysql://192.168.74.100:3306/test", "person", p)
    sc.stop()
  }

}

case class Person(id: Long, name: String, age: Int)

相關文章
相關標籤/搜索