SparkSQL DataFrames操做

Hive中已經存在emp和dept表:mysql

select * from emp;
+--------+---------+------------+-------+-------------+---------+---------+---------+
| empno  |  ename  |    job     |  mgr  |  hiredate   |   sal   |  comm   | deptno  |
+--------+---------+------------+-------+-------------+---------+---------+---------+
| 7369   | SMITH   | CLERK      | 7902  | 1980-12-17  | 800.0   | NULL    | 20      |
| 7499   | ALLEN   | SALESMAN   | 7698  | 1981-2-20   | 1600.0  | 300.0   | 30      |
| 7521   | WARD    | SALESMAN   | 7698  | 1981-2-22   | 1250.0  | 500.0   | 30      |
| 7566   | JONES   | MANAGER    | 7839  | 1981-4-2    | 2975.0  | NULL    | 20      |
| 7654   | MARTIN  | SALESMAN   | 7698  | 1981-9-28   | 1250.0  | 1400.0  | 30      |
| 7698   | BLAKE   | MANAGER    | 7839  | 1981-5-1    | 2850.0  | NULL    | 30      |
| 7782   | CLARK   | MANAGER    | 7839  | 1981-6-9    | 2450.0  | NULL    | 10      |
| 7788   | SCOTT   | ANALYST    | 7566  | 1987-4-19   | 3000.0  | NULL    | 20      |
| 7839   | KING    | PRESIDENT  | NULL  | 1981-11-17  | 5000.0  | NULL    | 10      |
| 7844   | TURNER  | SALESMAN   | 7698  | 1981-9-8    | 1500.0  | 0.0     | 30      |
| 7876   | ADAMS   | CLERK      | 7788  | 1987-5-23   | 1100.0  | NULL    | 20      |
| 7900   | JAMES   | CLERK      | 7698  | 1981-12-3   | 950.0   | NULL    | 30      |
| 7902   | FORD    | ANALYST    | 7566  | 1981-12-3   | 3000.0  | NULL    | 20      |
| 7934   | MILLER  | CLERK      | 7782  | 1982-1-23   | 1300.0  | NULL    | 10      |
+--------+---------+------------+-------+-------------+---------+---------+---------+

select * from dept;
+---------+-------------+-----------+
| deptno  |    dname    |    loc    |
+---------+-------------+-----------+
| 10      | ACCOUNTING  | NEW YORK  |
| 20      | RESEARCH    | DALLAS    |
| 30      | SALES       | CHICAGO   |
| 40      | OPERATIONS  | BOSTON    |
+---------+-------------+-----------+

 

DataFrame經常使用功能測試sql

val hc = new org.apache.spark.sql.hive.HiveContext(sc)
val emp = hc.table("emp")    //根據hive表建立DataFrame

emp.dtypes.foreach(println)   //查看全部字段名稱和類型
    (empno,IntegerType)
    (ename,StringType)
    (job,StringType)
    (mgr,IntegerType)
    (hiredate,StringType)
    (sal,DoubleType)
    (comm,DoubleType)
    (deptno,IntegerType)
    
emp.columns.foreach(println)  //查看全部字段名稱
    empno
    ename
    job
    mgr
    hiredate
    sal
    comm
    deptno

emp.printSchema    //打印schema信息
    root
        |-- empno: integer (nullable = true)
        |-- ename: string (nullable = true)
        |-- job: string (nullable = true)
        |-- mgr: integer (nullable = true)
        |-- hiredate: string (nullable = true)
        |-- sal: double (nullable = true)
        |-- comm: double (nullable = true)
        |-- deptno: integer (nullable = true)

emp.explain  //查看物理執行計劃
== Physical Plan ==
HiveTableScan [empno#0,ename#1,job#2,mgr#3,hiredate#4,sal#5,comm#6,deptno#7], (MetastoreRelation default, emp, None), None

emp.show  #默認顯示20行
    empno ename  job       mgr  hiredate   sal    comm   deptno
    7369  SMITH  CLERK     7902 1980-12-17 800.0  null   20    
    7499  ALLEN  SALESMAN  7698 1981-2-20  1600.0 300.0  30    
    7521  WARD   SALESMAN  7698 1981-2-22  1250.0 500.0  30    
    7566  JONES  MANAGER   7839 1981-4-2   2975.0 null   20    
    7654  MARTIN SALESMAN  7698 1981-9-28  1250.0 1400.0 30    
    7698  BLAKE  MANAGER   7839 1981-5-1   2850.0 null   30    
    7782  CLARK  MANAGER   7839 1981-6-9   2450.0 null   10    
    7788  SCOTT  ANALYST   7566 1987-4-19  3000.0 null   20    
    7839  KING   PRESIDENT null 1981-11-17 5000.0 null   10    
    7844  TURNER SALESMAN  7698 1981-9-8   1500.0 0.0    30    
    7876  ADAMS  CLERK     7788 1987-5-23  1100.0 null   20    
    7900  JAMES  CLERK     7698 1981-12-3  950.0  null   30    
    7902  FORD   ANALYST   7566 1981-12-3  3000.0 null   20    
    7934  MILLER CLERK     7782 1982-1-23  1300.0 null   10 

emp.show(10) #顯示指定行數

emp.limit(5).show
emp.head(3)
emp.head   #等價於head(1)
emp.first  #等價於head(1)
val emp_as = emp.as("emp_as")   #別名
emp_as.select("empno","ename","deptno").show

#查看指定列:
emp.select("empno","ename","deptno").show
emp.select($"empno",$"ename",$"deptno").show
emp.selectExpr("empno", "ename as name", "substr(ename,0,4)").show     #配合udf使用
emp.select($"empno",$"sal"+100).show  #給sal加100


#條件過濾:
emp.filter("empno>7698").show
emp.filter($"empno" > 7698).show
emp.where($"empno" > 7698).show

#排序:
emp.sort("empno").show  #默認升序
emp.sort($"empno").show
emp.sort("empno").show
emp.sort($"empno".desc).show
emp.sort($"deptno", $"empno".desc).show #多字段排序

emp.orderBy($"empno").show 
emp.orderBy($"empno".desc).show 
emp.orderBy($"deptno", $"empno".desc).show
    
#分組:    
emp.groupBy("deptno").count.show
emp.groupBy($"deptno").avg().show   #全部的列求平均值
emp.groupBy($"deptno").avg("sal").show   #sal列求平均值
emp.groupBy($"deptno").agg("sal"->"max").show   #sal取最大
emp.groupBy($"deptno").agg("sal"->"min").show   #sal取最小
emp.groupBy($"deptno").agg("sal"->"sum").show   #sal求和
emp.groupBy($"deptno").agg("sal"->"avg").show   #sal求平均值
#agg中能有的方法有: avg/max/min/sum/count


#join:    
val dept = hc.table("dept")
dept.show
emp.join(dept,emp.col("deptno") === dept.col("deptno"),"left_outer").show
emp.join(dept,emp.col("deptno") === dept.col("deptno"),"right_outer").show
emp.join(dept,emp.col("deptno") === dept.col("deptno"),"inner").show
emp.join(dept,$"emp.deptno"===$"dept.deptno" ,"inner").select("empno","ename","dname").show

 

DataFrames結合SQL使用測試apache

val emp_dept = emp.join(dept,emp.col("deptno") === dept.col("deptno"),"left_outer")
emp_dept.registerTempTable("emp_dept_temp")
hc.sql("select count(*) from emp_dept_temp").collect

 

DataFrames結合hive和mysql jdbc external datasource使用測試:oop

mysql中準備數據:測試

DROP TABLE IF EXISTS `dept`;
CREATE TABLE `dept` (
  `deptno` int(11) NOT NULL DEFAULT '0',
  `dname` varchar(30) DEFAULT NULL,
  `loc` varchar(30) DEFAULT NULL,
  PRIMARY KEY (`deptno`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

INSERT INTO `dept` VALUES ('10', 'ACCOUNTING', 'NEW YORK');
INSERT INTO `dept` VALUES ('20', 'RESEARCH', 'DALLAS');
INSERT INTO `dept` VALUES ('30', 'SALES', 'CHICAGO');
INSERT INTO `dept` VALUES ('40', 'OPERATIONS', 'BOSTON');

 

val hc = new org.apache.spark.sql.hive.HiveContext(sc)
val emp = hc.table("emp")    
val dept_jdbc = hc.jdbc("jdbc:mysql://hadoop000:3306/hive?user=root&password=root", "dept")
emp.join(dept_jdbc, emp.col("deptno") === dept_jdbc.col("deptno"), "left_outer").show    
    
    empno ename  job       mgr  hiredate   sal    comm   deptno deptno dname      loc     
    7782  CLARK  MANAGER   7839 1981-6-9   2450.0 null   10     10     ACCOUNTING NEW YORK
    7839  KING   PRESIDENT null 1981-11-17 5000.0 null   10     10     ACCOUNTING NEW YORK
    7934  MILLER CLERK     7782 1982-1-23  1300.0 null   10     10     ACCOUNTING NEW YORK
    7369  SMITH  CLERK     7902 1980-12-17 800.0  null   20     20     RESEARCH   DALLAS  
    7566  JONES  MANAGER   7839 1981-4-2   2975.0 null   20     20     RESEARCH   DALLAS  
    7788  SCOTT  ANALYST   7566 1987-4-19  3000.0 null   20     20     RESEARCH   DALLAS  
    7876  ADAMS  CLERK     7788 1987-5-23  1100.0 null   20     20     RESEARCH   DALLAS  
    7902  FORD   ANALYST   7566 1981-12-3  3000.0 null   20     20     RESEARCH   DALLAS  
    7499  ALLEN  SALESMAN  7698 1981-2-20  1600.0 300.0  30     30     SALES      CHICAGO 
    7521  WARD   SALESMAN  7698 1981-2-22  1250.0 500.0  30     30     SALES      CHICAGO 
    7654  MARTIN SALESMAN  7698 1981-9-28  1250.0 1400.0 30     30     SALES      CHICAGO 
    7698  BLAKE  MANAGER   7839 1981-5-1   2850.0 null   30     30     SALES      CHICAGO 
    7844  TURNER SALESMAN  7698 1981-9-8   1500.0 0.0    30     30     SALES      CHICAGO 
    7900  JAMES  CLERK     7698 1981-12-3  950.0  null   30     30     SALES      CHICAGO 

 

DataFrames結合parquet和mysql jdbc external datasource使用測試:spa

相關文章
相關標籤/搜索