導入HDFS的數據到Hive

1. 經過Hive view

CREATE EXTERNAL TABLE if not exists finance.json_serde_optd_table (
  retCode string,
  retMsg string,
  data array<struct< secid:string,="" tradedate:date,="" optid:string,="" ticker:string,="" secshortname:string,="" exchangecd:string,="" presettleprice:double,="" precloseprice:double,="" openprice:double,="" highestprice:double,="" lowestprice:double,="" closeprice:double,="" settlprice:double,="" turnovervol:double,="" turnovervalue:double,="" openint:int="">>)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION 'hdfs://wdp.xxxxx.cn:8020/nifi/finance1/optd/';
create table if not exists finance.tb_optd
as
SELECT b.data.secID,
		b.data.tradeDate,
		b.data.optID,
		b.data.ticker,
		b.data.secShortName,
		b.data.exchangeCD,
		b.data.preSettlePrice,
		b.data.preClosePrice,
		b.data.openPrice,
		b.data.highestPrice,
		b.data.lowestPrice,
		b.data.closePrice,
		b.data.settlPrice,
		b.data.turnoverVol,
		b.data.turnoverValue,
		b.data.openInt
FROM finance.json_serde_optd_table LATERAL VIEW explode(json_serde_optd_table.data) b AS data;
 

2. 經過Zeppelin

 

%dep
z.load("/usr/hdp/2.4.2.0-258/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar");

 

// 定義導入的hive對象集合

case class HiveConfig(database: String, modelName: String, hdfsPath: String, schema: String, schema_tb: String);
var hiveConfigList = List[HiveConfig]();
 
// 建立equd數據結構
// 定義json結構
val schema_json_equd_serde ="""  retCode string,
                              retMsg string,
                              data array<struct< secid="" :="" string,="" tradedate="" date,="" ticker="" secshortname="" exchangecd="" precloseprice="" double,="" actprecloseprice:="" openprice="" highestprice="" lowestprice="" closeprice="" turnovervol="" turnovervalue="" dealamount="" int,="" turnoverrate="" accumadjfactor="" negmarketvalue="" marketvalue="" pe="" pe1="" pb="" isopen="" int="">>""";
var schema_equd ="""b.data.secID,
            		b.data.ticker,
            		b.data.secShortName,
            		b.data.exchangeCD,
            		b.data.tradeDate,
            		b.data.preClosePrice,
            		b.data.actPreClosePrice,
            		b.data.openPrice,
            		b.data.highestPrice,
            		b.data.lowestPrice,
            		b.data.closePrice,
            		b.data.turnoverVol,
            		b.data.turnoverValue,
            		b.data.dealAmount,
            		b.data.turnoverRate,
            		b.data.accumAdjFactor,
            		b.data.negMarketValue,
            		b.data.marketValue,
            		b.data.PE,
            		b.data.PE1,
            		b.data.PB,
            		b.data.isOpen""";
hiveConfigList  = hiveConfigList :+ HiveConfig("finance", "equd", "hdfs://wdp.xxxxx.cn:8020/nifi/finance1/", schema_json_equd_serde, schema_equd);

 

// 建立idxd數據結構
// 定義json結構
val schema_json_idxd_serde ="""  retCode string,
                              retMsg string,
                              data array<struct< indexid:string,="" tradedate:date,="" ticker:string,="" porgfullname:string,="" secshortname:string,="" exchangecd:string,="" precloseindex:double,="" openindex:double,="" lowestindex:double,="" highestindex:double,="" closeindex:double,="" turnovervol:double,="" turnovervalue:double,="" chg:double,="" chgpct:double="">>""";
var schema_idxd ="""b.data.indexID,
            		b.data.tradeDate,
            		b.data.ticker,
            		b.data.porgFullName,
            		b.data.secShortName,
            		b.data.exchangeCD,
            		b.data.preCloseIndex,
            		b.data.openIndex,
            		b.data.lowestIndex,
            		b.data.highestIndex,
            		b.data.closeIndex,
            		b.data.turnoverVol,
            		b.data.turnoverValue,
            		b.data.CHG,
            		b.data.CHGPct""";
hiveConfigList = hiveConfigList :+ HiveConfig("finance", "idxd", "hdfs://wdp.xxxxx.cn:8020/nifi/finance1/", schema_json_idxd_serde, schema_idxd);

 

// 循環加載數據中
  def loadDataToHive(args:HiveConfig){
    val loadPath = args.hdfsPath + args.modelName;
    val tb_json_serde = "json_serde_" + args.modelName +"_table";
    val tb= "tb_" + args.modelName;
    val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
    if(args.database != "" && args.schema != "") {
        print("正在建立項目..." + args.modelName)
        hiveContext.sql("CREATE DATABASE IF NOT EXISTS " + args.database);
        print("正在構造擴展模型...");
        hiveContext.sql("CREATE TABLE IF NOT EXISTS " + args.database + "." + tb_json_serde + "(" + args.schema + ") row format serde 'org.apache.hive.hcatalog.data.JsonSerDe' LOCATION " + "'" + loadPath + "/'");
        println("CREATE TABLE IF NOT EXISTS " + args.database + "." + tb + " as select " + args.schema_tb + " from " + args.database + "." + tb_json_serde + " LATERAL VIEW explode(" + tb_json_serde + ".data) b AS data");
        hiveContext.sql("CREATE TABLE IF NOT EXISTS " + args.database + "." + tb + " as select " + args.schema_tb + " from " + args.database + "." + tb_json_serde + " LATERAL VIEW explode(" + tb_json_serde + ".data) b AS data");
        println(args.modelName + " 擴展模型加載已完成!");
    }
  }
  hiveConfigList.size;
  hiveConfigList.foreach { x => loadDataToHive(x) };

 

 3. 第二種取法

因爲data是json數據裏的一個數組,因此上面的轉換複雜了一點。下面這種方法是先把json裏data數組取出來放到hdfs,而後直接用下面的語句放到hive:java

用splitjson 來提取、分隔 data 數組sql

NewImage

CREATE EXTERNAL TABLE if not exists finance.awen_optd (
  secid string,
  tradedate date,
  optid string,
  ticker string,
  secshortname string,
  exchangecd string,
  presettleprice double,
  precloseprice double,
  openprice double,
  highestprice double,
  lowestprice double,
  closeprice double,
  settlprice double,
  turnovervol double,
  turnovervalue double,
  openint int)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION 'hdfs://wdp.xxxx.cn:8020/nifi/finance2/optd/';

 

 

NIFI 中國社區 QQ羣:595034369apache

相關文章
相關標籤/搜索