hive 使用技巧筆記

 

例子:python

INSERT OVERWRITE TABLE prices_collected_${hiveconf:wid_version}正則表達式

select sql

pc.collect_id as product_id ,數據庫

regexp_extract(pc.price,'(\\d*\\.?\\d+)',1) as price ,ide

pc.region,oop

'' as location_area_code,spa

'' as city_code,unix

from_unixtime(unix_timestamp() , 'yyyy-MM-dd hh:mm:ss') as created_at,code

from_unixtime(unix_timestamp() , 'yyyy-MM-dd hh:mm:ss') as updated_atregexp

from products_compared_${hiveconf:wid_version} as pc

 

1.根據hive執行的參數來動態的設置表名稱 prices_collected_${hiveconf:wid_version}

hive -hiveconf wid_version='4' 

則能夠經過${hiveconft:wid_version}來接收參數,生成prices_collected_4這張表

 

2. 使用正則表達式獲取須要的信息,如:獲取一段字符串中的數字

regexp_extract(pc.price,'(\\d*\\.?\\d+)',1) as price

注意hive中須要使用雙斜槓來處理正則表達式

 

3. 獲取系統時間

from_unixtime(unix_timestamp() , 'yyyy-MM-dd hh:mm:ss') as created_a

使用from_unixtime(unix_timestamp() , 'yyyy-MM-dd hh:mm:ss') 獲取系統時間,格式能夠根據須要調整

 

4. 多個表進行join的時候,可能會報錯

使用set hive.auto.convert.join=false;解決

 

5. 建立表

create table if not exists brands (

  name string,

  created_at string,

  updated_at string

)

ROW FORMAT DELIMITED

FIELDS TERMINATED BY '\t'

ESCAPED BY '\\'

STORED AS TEXTFILE;

以文本方式進行存儲,"\\"進行轉義,"\t"做爲換行符

 

6.處處hive中的某個表中的數據到本地,執行hive命令以下:

hive 

-hiveconf local_path=/home/hive/hive_data/products_24_1 

-hiveconf hive_table=products_24_1 

-hiveconf columnstr=' name , created_at,  updated_at,   "released" as status ' 

-f /home/hive/export_hive_table_to_local.sql

 

須要執行的參數依次是

1.導出到本地的位置local_path

2.導出hive中的哪一個表 hive_table

3. 導出products_24_1 表中的哪些字段 colunmstr

4. 根據上面的參數,在本地建立products_24_1 表,使用-f來指定調用的文件

 

/home/hive/export_hive_table_to_local.sql 文件內容以下:

insert overwrite local directory '${hiveconf:local_path}' 

ROW FORMAT DELIMITED

FIELDS TERMINATED BY '\t'

ESCAPED BY '\\'

STORED AS TEXTFILE

select ${hiveconf:columnstr}

from ${hiveconf:hive_table};

 

7.將本地文件導入到psql數據庫中, hive對pg的支持很差,不能用sqoop來進行數據的導入,能夠先將hive中的數據讀到本地,在使用python腳原本進行文件的寫入

Python代碼  收藏代碼

  1. def insert_to_pg(conn , table_name , file_path , insert_columns=None):  

  2.   conn = psycopg2.connect(conn)  

  3.   cursor = conn.cursor()  

  4.   if os.path.isfile( file_path ):  

  5.     datafile=ReadFileProgress(file_path)  

  6.     cursor.copy_from(file=datafile, table=table_name, sep='\t', null='\\N', size=81920, columns=insert_columns)  

  7.     datafile.close()  

 

Python代碼  收藏代碼

  1. #!/usr/bin/python  

  2. # #_*_ coding: utf-8 _*_  

  3.   

  4. import os , sys  

  5. import psycopg2  

  6.   

  7. class ReadFileProgress:  

  8.   

  9.   def __init__(self, filename):  

  10.     self.datafile = open(filename)  

  11.     self.totalRecords = 0  

  12.     self.totalBytes = os.stat(filename).st_size  

  13.     self.readBytes = 0  

  14.     self.datafile.readline()  

  15.     i = 0  

  16.     for i, l in enumerate(self.datafile):  

  17.       pass  

  18.     self.totalRecords = i + 1  

  19.     sys.stderr.write("Number of records: %d\n" % (self.totalRecords))  

  20.     self.datafile.seek(0)  

  21.     self.datafile.readline()  

  22.     self.perc5 = self.totalBytes / 20.0  

  23.     self.perc5count = 0  

  24.     self.lastPerc5 = 0  

  25.     sys.stderr.write("Writing records: 0%")  

  26.   

  27.   def countBytes(self, size=0):  

  28.     self.readBytes += size  

  29.     if (self.readBytes - self.lastPerc5 >= self.perc5):  

  30.       self.lastPerc5 = self.readBytes  

  31.       if (int(self.readBytes / self.perc5) == 5):  

  32.         sys.stderr.write("25%")  

  33.       elif (int(self.readBytes / self.perc5) == 10):  

  34.         sys.stderr.write("50%")  

  35.       elif (int(self.readBytes / self.perc5) == 15):  

  36.         sys.stderr.write("75%")  

  37.       else:  

  38.         sys.stderr.write(".")  

  39.   

  40.       sys.stderr.flush()  

  41.   

  42.   def readline(self, size=None):  

  43.     countBytes(size)  

  44.     return self.datafile.readline(size)  

  45.    

  46.   def read(self, size=None):  

  47.     self.countBytes(size)  

  48.     return self.datafile.read(size)  

  49.   

  50.   def close(self):  

  51.     sys.stderr.write("100%\n")  

  52.     self.datafile.close()  

 

8. 從pg上導出指定表

Python代碼  收藏代碼

  1. def do_export(conn , table_name , file_path , columns=None):  

  2.   conn = psycopg2.connect(conn)  

  3.   cursor = conn.cursor()  

  4.   cursor.copy_to(file=file(file_path , 'w'), table=table_name, sep='\t', null='\\N', columns=columns)  

  5.   cursor.close()  

  6.   conn.commit()  

  7.   sys.stdout.write("Transaction finished successfully.\n")  

 

9. 則select語句中也能夠經過hiveconf來傳遞參數,執行hive命令

hive -hiveconf name='hello hive'

INSERT OVERWRITE TABLE companies

select 

  '${hiveconf:name}' as name 

from companies_old

相關文章
相關標籤/搜索