DataFrame提供了
一條聯結全部主流數據源並自動轉化爲可並行處理格式的渠道,經過它Spark能取悅大數據生態鏈上的全部玩家,不管是善用R的數據科學家,慣用SQL的商業分析師,仍是在乎效率和實時性的統計工程師。php
以一個常見的場景 -- 日誌解析爲例,有時咱們須要用到一些額外的結構化數據(好比作IP和地址的映射),一般這樣的數據會存在MySQL,而訪問的方式有兩種:一是每一個worker遠程去檢索數據庫,弊端是耗費額外的網絡I/O資源;二是使用JdbcRDD
的API轉化爲RDD格式,而後編寫繁複的函數去實現檢索,顯然要寫更多的代碼。而如今Spark一行代碼就能實現從MySQL到DataFrame
的轉化,而且支持SQL查詢。java
在上一篇已經對文本格式進行測試,如今對hive hbase mysql oracle 以及臨時表之間join查詢作測試mysql
1.訪問mysqlsql
除了JSON以外,DataFrame
如今已經能支持MySQL、Hive、HDFS、PostgreSQL等外部數據源,而對關係數據庫的讀取,是經過jdbc
實現的。shell
1
2
3
4
5
6
7
8
9
10
11
12
|
bin/spark-shell --driver-
class
-path ./lib/mysql-connector-java-
5.1
.
24
-bin.jar
val
sc
=
new
org.apache.spark.SparkContext
val
sqlContext
=
new
org.apache.spark.sql.SQLContext(sc)
val
jdbcDF
=
sqlContext.load(
"jdbc"
, Map(
"url"
->
"jdbc:mysql://192.168.0.110:3306/hidata?user=root&password=123456"
,
"dbtable"
->
"loadinfo"
))
bin/spark-sql --driver-
class
-path ./lib/mysql-connector-java-
5.1
.
24
-bin.jar
spark-sql> create temporary table jdbcmysql using org.apache.spark.sql.jdbc options(url
"jdbc:mysql://192.168.0.110:3306/hidata?user=root&password=123456"
,dbtable
"loadinfo"
)
spark-sql>select * from jdbcmysql;
//注意src是hive原本就存在的表,在spark sql中不用創建臨時表,直接能夠進行操做
//實現hive和mysql中表的聯合查詢
select * from src join jdbcmysql on (src.key
=
jdbcmysql.id);
|
2.訪問Oracle數據庫
同理,但注意鏈接的URL不同,也是試了很久apache
1
2
|
bin/spark-shell --driver-
class
-path ./lib/ojdbc
6
.jar
val
jdbcDF
=
sqlContext.load(
"jdbc"
, Map(
"url"
->
"jdbc:oracle:thin:kang/123456@192.168.0.110:1521:orcl"
,
"dbtable"
->
"TEST"
))
|
Spark十八般武藝又能夠派上用場了。json
錯誤的URL:網絡
1
2
|
val
jdbcDF
=
sqlContext.load(
"jdbc"
, Map(
"url"
->
"jdbc:oracle:thin:@192.168.0.110:1521:orcl&user=kang&password=123456"
,
"dbtable"
->
"TEST"
))
val
jdbcDF
=
sqlContext.load(
"jdbc"
, Map(
"url"
->
"jdbc:oracle:thin:@192.168.0.110:1521/orcl&user=kang&password=123456"
,
"dbtable"
->
"TEST"
))
|
報錯類型:看起來最像的解決辦法,留着之後用oracle
java.sql.SQLException: Io : NL Exception was generated錯誤解決(jdbc數據源問題)
解決Oracle ORA-12505, TNS:listener does not currently know of SID given in connect
第一種方式,會告知沒法識別SID,其實在鏈接時將orcl&user=kang&password=123456都當作其SID,其實就接近了。通常平時用jdbc鏈接數據庫,url user password都分開,學習一下這種方式^^
Oracle的JDBC url三種方式:這
1
2
3
4
5
6
|
1
.普通SID方式
jdbc
:
oracle
:
thin
:
username/password
@
x.x.x.
1
:
1521
:
SID
2
.普通ServerName方式
jdbc
:
oracle
:
thin
:
username/password
@
//x.x.x.1:1522/ABCD
3
.RAC方式
jdbc
:
oracle
:
thin
:@
(DESCRIPTION
=
(ADDRESS
_
LIST
=
(ADDRESS
=
(PROTOCOL
=
TCP)(HOST
=
x.x.x.
1
)(PORT
=
1521
))(ADDRESS
=
(PROTOCOL
=
TCP)(HOST
=
x.x.x.
2
)(PORT
=
1521
)))(LOAD
_
BALANCE
=
yes)(CONNECT
_
DATA
=
(SERVER
=
DEDICATED)(SERVICE
_
NAME
=
xxrac)))
|
具體參看這裏
3.訪問hive
hive和spark sql的關係,參見
其實spark sql從一開始就支持hive。Spark提供了一個HiveContext
的上下文,實際上是SQLContext
的一個子類,但從做用上來講,sqlContext
也支持Hive數據源。只要在部署Spark的時候加入Hive選項,並把已有的hive-site.xml
文件挪到$SPARK_HOME/conf
路徑下,咱們就能夠直接用Spark查詢包含已有元數據的Hive表了。
1.Spark-sql方式
spark-sql是Spark bin目錄下的一個可執行腳本,它的目的是經過這個腳本執行Hive的命令,即原來經過
hive>輸入的指令能夠經過spark-sql>輸入的指令來完成。
spark-sql可使用內置的Hive metadata-store,也可使用已經獨立安裝的Hive的metadata store
配置步驟:
1. 將Hive的conf目錄的hive-site.xml拷貝到Spark的conf目錄
2. 將hive-site.xml中關於時間的配置的時間單位,好比ms,s所有刪除掉
錯誤信息:Exception in thread "main" java.lang.RuntimeException: java.lang.NumberFormatException: For input string: "5s" 一直覺得是輸入格式的問題。。
3. 將mysql jdbc的驅動添加到Spark的Classpath上
1
|
export SPARK
_
CLASSPATH
=
$SPARK
_
CLASSPATH
:
/home/hadoop/software/spark-
1.2
.
0
-bin-hadoop
2.4
/lib/mysql-connector-java-
5.1
.
34
.jar
|
1
2
3
|
[hadoop
@
hadoop bin]$ ./spark-sql
Spark assembly has been built
with
Hive, including Datanucleus jars on classpath
SET spark.sql.hive.version
=
0.13
.
1
|
提示編譯的時候要帶2個參數
從新編譯:./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -DskipTests -Dhadoop.version=2.4.1 -Phive -Phive-thriftserver
在Spark-default中已經指定
建立表
1
2
3
|
spark-sql> create table word
6
(id int,word string) row format delimited fields terminated by
','
stored as textfile ;
OK
Time taken
:
10.852
seconds
|
導入數據
1
2
3
4
5
6
7
|
spark-sql> load data local inpath
'/home/hadoop/word.txt'
into table word
6
;
Copying data from file
:
/home/hadoop/word.txt
Copying file
:
file
:
/home/hadoop/word.txt
Loading data to table default.word
6
Table default.word
6
stats
:
[numFiles
=
1
, numRows
=
0
, totalSize
=
31
, rawDataSize
=
0
]
OK
Time taken
:
2.307
seconds
|
與其餘數據源聯合查詢
1
|
select * from src join jdbcmysql on (src.key
=
jdbcmysql.id);
|
2.Spark-shell方式
1
|
sqlContext.sql(
"select count(*) from hive_people"
).show()
|
4.將dataframe數據寫入Hive分區表
DataFrame將數據寫入hive中時,默認的是hive默認數據庫,insertInto沒有指定數據庫的參數,使用下面方式將數據寫入hive表或者hive表的分區中。這
一、將DataFrame數據寫入到Hive表中
從DataFrame類中能夠看到與hive表有關的寫入Api有如下幾個:
1
2
3
4
|
registerTempTable(tableName
:
String)
:
Unit,
insertInto(tableName
:
String)
:
Unit
insertInto(tableName
:
String, overwrite
:
Boolean)
:
Unit
saveAsTable(tableName
:
String, source
:
String, mode
:
[size
=
13.3333320617676
px]SaveMode, options
:
Map[String, String])
:
Unit
|
還有不少重載函數,不一一列舉
registerTempTable函數是建立spark臨時表
insertInto函數是向表中寫入數據,能夠看出此函數不能指定數據庫和分區等信息,不能夠直接進行寫入。
向hive數據倉庫寫入數據必須指定數據庫,hive數據表創建能夠在hive上創建,或者使用hiveContext.sql(「create table ....")
下面語句是向指定數據庫數據表中寫入數據:
1
2
3
4
5
6
7
|
case
class
Person(name
:
String,col
1
:
Int,col
2
:
String)
val
sc
=
new
org.apache.spark.SparkContext
val
hiveContext
=
new
org.apache.spark.sql.hive.HiveContext(sc)
import
hiveContext.implicits.
_
hiveContext.sql(
"use DataBaseName"
)
val
data
=
sc.textFile(
"path"
).map(x
=
>x.split(
"\\s+"
)).map(x
=
>Person(x(
0
),x(
1
).toInt,x(
2
)))
<
br
>
data.toDF()
insertInto(
"tableName"
)
|
建立一個case類將RDD中數據類型轉爲case類型,而後經過toDF轉換爲DataFrame,調用insertInto函數時,首先指定數據庫,使用的是hiveContext.sql("use DataBaseName")語句,就能夠將DataFrame數據寫入hive數據表中了
二、將DataFrame數據寫入hive指定數據表的分區中
hive數據表創建能夠在hive上創建,或者使用hiveContext.sql(「create table ...."),使用saveAsTable時數據存儲格式有限,默認格式爲parquet,能夠指定爲json,若是有其餘格式指定,儘可能使用語句來創建hive表。
將數據寫入分區表的思路是:首先將DataFrame數據寫入臨時表,以後是由hiveContext.sql語句將數據寫入hive分區表中。具體操做以下:
1
2
3
4
5
6
7
8
|
case
class
Person(name
:
String,col
1
:
Int,col
2
:
String)
val
sc
=
new
org.apache.spark.SparkContext
val
hiveContext
=
new
org.apache.spark.sql.hive.HiveContext(sc)
import
hiveContext.implicits.
_
hiveContext.sql(
"use DataBaseName"
)
val
data
=
sc.textFile(
"path"
).map(x
=
>x.split(
"\\s+"
)).map(x
=
>Person(x(
0
),x(
1
).toInt,x(
2
)))
data.toDF().registerTempTable(
"table1"
)
hiveContext.sql(
"insert into table2 partition(date='2015-04-02') select name,col1,col2 from table1"
)
|
使用以上方式就能夠將dataframe數據寫入hive分區表了。