大概步驟:java
- 鏈接oracle,建立一個dataframe用來接收從oracle裏面讀取的數據。
- 將dataframe的數據寫入臨時表。
- 用hiveContext.sql語句將數據寫入hive裏面。
這個程序其實對於學了spark的人來講很簡單,直接上代碼吧:sql
package com.ctbri.cgs.oracle2Hive
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.util.Properties
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.functions._
object App {
def main(args: Array[String]):Unit = {
//建立一個sparkcontext對象,用enableHiveSupport獲取了對HIVE的支持
val spark = SparkSession
.builder()
.appName("Oracle2Hive")
.master("local")
.config("spark.port.maxRetries","128")
.config("spark.sql.parquet.writeLegacyFormat",true)
.enableHiveSupport()
.getOrCreate()
//鏈接oracle
val jdbcDF = spark.read.format("jdbc").options(
Map(
"driver" -> "oracle.jdbc.driver.OracleDriver",
"url" -> "url路徑",
"user" -> "username",
"password" -> "password",
"dbtable" -> "要導出的數據表名"
)).load()
//須要轉換的列名
val colName = ArrayBuffer[String]()
val schema = jdbcDF.schema.foreach(s => {
if (s.dataType.equals(DecimalType(38, 10)) || s.dataType.equals(DecimalType(4, 0))) {
colName += s.name
}
})
//字段類型轉換
var df_int = jdbcDF
colName.foreach(name => {
df_int = df_int.withColumn(name, col(name).cast(IntegerType))
})
//建立臨時表
jdbcDF.createOrReplaceTempView("records")
spark.sql("use 庫名")
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
jdbcDF.write.mode("overwrite").saveAsTable("表名")
}
}
其中須要注意的就是,我第一次寫的時候,沒有進行字段類型的轉換,致使數據能夠導入,在hive裏面也能夠查看錶屬性,可是沒法查出具體數據,緣由就是spark導入的時候,將oracle的number類型轉換成了decimal類型,致使沒法查看,其餘諸如data,char等都是成功的,進行一下類型轉換就能夠了。
文章及代碼內容參考了https://blog.csdn.net/dkl12/article/details/82347534
apache