使用pyspark模仿sqoop從oracle導數據到hive的主要功能(自動建表,分區導入,增量,解決數據換行符問題)

  最近公司開始作大數據項目,讓我使用sqoop(1.6.4版本)導數據進行數據分析計算,然而當咱們將全部的工做流都放到azkaban上時整個流程跑完須要花費13分鐘,而其中導數據(增量)就佔了4分鐘左右,老闆給我提供了使用 spark 導數據的思路,學習整理了一個多星期,終於實現了sqoop的主要功能。html

  這裏我使用的是pyspark完成的全部操做。node

  

  條件:hdfs平臺,pyspark,ubuntu系統python

  運行:我這裏是在 /usr/bin 目錄下(或者指定在此目錄下 )運行的python文件,也能夠使用系統自帶的pysparkmysql

1 ./spark-submit --jars "/home/engyne/spark/ojdbc7.jar" --master local  /home/engyne/spark/SparkDataBase.py

  其中--jars 是指定鏈接oracle的驅動,ojdbc7.jar對應的是oracle12版本,--master local /...指定的是運行的python文件git

  注意:個人代碼沒有解決中文問題,因此無論是註釋仍是代碼中都不能出現中文,記得刪除!!!github

 

  一、pyspark鏈接oracle,導數據到hive(後面的代碼須要在此篇代碼基礎上進行,重複代碼再也不copy了)sql

 1 import sys
 2 from pyspark.sql import HiveContext
 3 from pyspark import SparkConf, SparkContext, SQLContext
 4 
 5 conf = SparkConf().setAppName('inc_dd_openings')
 6 sc = SparkContext(conf=conf)
 7 sqlContext = HiveContext(sc)
 8 
 9 #如下是爲了在console中打印出表內容
10 reload(sys)
11 sys.setdefaultencoding("utf-8")
12 
13 get_df_url = "jdbc:oracle:thin:@//192.168.1.1:1521/ORCLPDB"
14 get_df_driver = "oracle.jdbc.driver.OracleDriver"
15 get_df_user = "xxx"
16 get_df_password = "xxx"
17  
18 df = sqlContext.read.format("jdbc") \
19     .option("url", get_df_url) \
20     .option("driver", get_df_driver) \
21     .option("dbtable", "STUDENT") \
22     .option("user",  get_df_user).option("password", get_df_password) \
23     .load()
24 #df.show() #能夠查看到獲取的表的內容,默認顯示20行
25 sqlContext.sql("use databaseName")    #databaseName指定使用hive中的數據庫
26 #建立臨時表
27 df.registerTempTable("tempTable")
28 #建立表並寫入數據
29 sqlContext.sql("create table STUDENT as select * from tempTable")

  二、pyspark在hive中建立動態分區表數據庫

1 #修改一下hive的默認設置以支持動態分區
2 sqlContext.sql("set hive.exec.dynamic.partition=true")
3 sqlContext.sql("set hive.exec.dynamic.partition.mode=nonstrict")
4 #設置hive支持建立分區文件的最大值
5 sqlContext.sql("SET hive.exec.max.dynamic.partitions=100000")
6 sqlContext.sql("SET hive.exec.max.dynamic.partitions.pernode=100000")

  這裏須要先手動建立分區表,我使用dataframe的dtypes屬性獲取到表結構,而後循環拼接表的每一個字段在hive中所對應的類型ubuntu

  最後寫入表數據的代碼是:oracle

1 sqlContext.sql("insert overwrite table STUDENT partition(AGE) SELECT ID,NAME,UPDATETIME,AGE FROM tempTable"

   三、實現增量導入數據

  我這裏使用了MySql數據庫,用來存儲增量導入的信息,建立表(job)

DROP TABLE IF EXISTS `job`;

CREATE TABLE `job` (
  `id` int(10) NOT NULL AUTO_INCREMENT,
  `database_name` varchar(50) DEFAULT NULL,     --數據庫名稱
  `table_name` varchar(100) DEFAULT NULL,       --須要增量導入的表名
  `partition_column_name` varchar(100) DEFAULT NULL,        --分區的字段名(這裏只考慮對一個字段分區,若是多個字段這裏應該使用一對多表結構吧)
  `partition_column_desc` varchar(50) DEFAULT NULL,     --分區字段類型
  `check_column` varchar(50) DEFAULT NULL,      --根據(table_name中)此字段進行增量導入校驗(我這裏例子使用的是updatetime)
  `last_value` varchar(255) DEFAULT NULL,       --校驗值
  `status` int(1) NOT NULL,     --是否使用(1表示此job激活)
  PRIMARY KEY (`id`)
) INCREMENTAL=InnoDB AUTO_INCREMENT=81 DEFAULT CHARSET=utf8;

  存儲STUDENT表增量導入信息(這裏是爲了演示)

insert  into `job`(`id`,`database_name`,`table_name`,`partition_column_name`,`partition_column_desc`,`check_column`,`last_value`,`status`)values (1,'test_datebase','STUDENT','AGE','string','UPDATETIME','2018-07-30',1)

  python 鏈接MySql的方法我這裏就直接懟代碼了,具體詳解你們就看菜鳥教程

  Ubuntu須要安裝MySQLdb(   sudo apt-get install python-mysqldb   )

import MySQLdb

# insert        update        delete
def conMysqlDB_exec(sqlStr):
    db = MySQLdb.connect("192.168.xxx.xxx", "xx", "xx", "xx", charset='utf8' )
    cursor = db.cursor()
    try:
        cursor.execute(sqlStr)
        db.commit()
        result = True
    except:
        print("---->MySqlError: execute error")
        result = False
        db.rollback()
    db.close
    return result

# select
def conMysqlDB_fetchall(sqlStr):
    db = MySQLdb.connect("192.168.xxx.xxx", "xx", "xx", "xx", charset='utf8' )
    cursor = db.cursor()
    results = []
    try:
        cursor.execute(sqlStr)
        results = cursor.fetchall()
    except:
        print("---->MySqlError: unable to fecth data")
    db.close
    return results

  查詢增量信息,使用spark進行導入

findJobSql = "SELECT * FROM job where status=1"
result
= conMysqlDB_fetchall(findJobSql) databaseName = val[1] tableName = val[2] partitionColumnName = val[3] partitionColumnDesc = val[4] checkColumn = val[5] lastValue = val[6] sqlContext.sql("use database") df = sqlContext.read.format("jdbc") \ .option("url", "jdbc:oracle:thin:@//192.168.xxx.xxx:1521/ORCLPDB") \ .option("driver", "oracle.jdbc.driver.OracleDriver") \ .option("dbtable", "(select * from %s where to_char(%s, 'yyyy-MM-dd')>'%s')" % (tableName, checkColumn, lastValue)) \ #這裏是關鍵,直接查詢出新增的數據,這樣後面的速度才能提高,不然要對整個表的dataframe進行操做,慢死了,千萬不要相信dataframe的filter,where這些東西,4萬多條數據要查3分鐘!!! .option("user", "xxx").option("password", "xxx") \ .load()
def  max(a, b):
    if a>b:
      return a
    else:
      return b
try: #獲取到新增字段的最大值!!!(這塊也困了我很久)這裏使用的是python的reduce函數,調用的max方法 nowLastValue = df.rdd.reduce(max)[checkColumn]
    df.registerTempTable("temp")#寫入內容
    saveSql = "insert into table student select * from temp"
    sqlContext.sql(saveSql)
    #更新mysql表,使lastValue是表最新值
    updataJobSql = "UPDATE job SET last_value='%s' WHERE table_name='%s'" % (nowLastValue, tableName)
    if conMysqlDB_exec(updataJobSql):
        print("---->SUCCESS: incremental import success")
except ValueError:
    print("---->INFO: No new data added!")
except:
    print("---->ERROR: other error")

  四、解決導入數據換行符問題

  有時候oracle中的數據中會存在換行符(" \n ")然而hive1.1.0中數據換行默認識別的也是\n,最坑的是還不能對它進行修改(目前我沒有查出修改的方法,你們要是有辦法歡迎在評論區討論)那我只能對數據進行處理了,之前使用sqoop的時候也有這個問題,所幸sqoop有解決換行符的語句,,,,巴拉巴拉,,,扯遠了

  解決換行符須要dataframe的map方法,而後使用lambda表達式進行replace,總結好就是下面的代碼(第3行)

  解釋:這是個for循環裏面加if else 判斷,整個須要用  [ ]  包起來,沒錯這是個list ,若是不包就報錯,lambda x 獲取到的是表中一行行的數據,for循環對每一行進行遍歷,而後對一行中每一個字段進行判斷,是不是unicode或者str類型,(通常只有是這兩個類型才存在換行符)若是是則進行replace處理,不然不作處理。

  轉化好以後這是個rdd類型的數據,須要轉化爲dataframe類型才能寫入hive

1 #df自帶獲取schema的方法,不要學我去拼湊出來(😓)
2 schema = df.schema
3 rdd = df.map(lambda x : [(x[i].replace("\n","").replace("\r","") if isinstance(x[i], unicode) or isinstance(x[i], str) else x[i]) for i in range(len(x))])
4 df = sqlContext.createDataFrame(rdd, schema)

 


 

  完成代碼我已經上傳到github上了https://github.com/yangzijia/learnSpark ,

  總結:使用spark進行數據導入和增量導入與sqoop作對比,80張表,sqoop 4分鐘多,使用此方法,0.7分鐘,(同是沒有新數據的前提下),普通導表,此方法5分鐘,80張大表(外網oracle),sqoop的話我就不說了,當時一張200萬數據的表導了一夜。。。

 

  初次寫這麼多話,內容不是很緊湊,若是你們還有其餘的問題,歡迎在評論區留言提意見。

相關文章
相關標籤/搜索