例子:python
INSERT OVERWRITE TABLE prices_collected_${hiveconf:wid_version}正則表達式
select sql
pc.collect_id as product_id ,數據庫
regexp_extract(pc.price,'(\\d*\\.?\\d+)',1) as price ,ide
pc.region,oop
'' as location_area_code,spa
'' as city_code,unix
from_unixtime(unix_timestamp() , 'yyyy-MM-dd hh:mm:ss') as created_at,code
from_unixtime(unix_timestamp() , 'yyyy-MM-dd hh:mm:ss') as updated_atregexp
from products_compared_${hiveconf:wid_version} as pc
1.根據hive執行的參數來動態的設置表名稱 prices_collected_${hiveconf:wid_version}
hive -hiveconf wid_version='4'
則能夠經過${hiveconft:wid_version}來接收參數,生成prices_collected_4這張表
2. 使用正則表達式獲取須要的信息,如:獲取一段字符串中的數字
regexp_extract(pc.price,'(\\d*\\.?\\d+)',1) as price
注意hive中須要使用雙斜槓來處理正則表達式
3. 獲取系統時間
from_unixtime(unix_timestamp() , 'yyyy-MM-dd hh:mm:ss') as created_a
使用from_unixtime(unix_timestamp() , 'yyyy-MM-dd hh:mm:ss') 獲取系統時間,格式能夠根據須要調整
4. 多個表進行join的時候,可能會報錯
使用set hive.auto.convert.join=false;解決
5. 建立表
create table if not exists brands (
name string,
created_at string,
updated_at string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
ESCAPED BY '\\'
STORED AS TEXTFILE;
以文本方式進行存儲,"\\"進行轉義,"\t"做爲換行符
6.處處hive中的某個表中的數據到本地,執行hive命令以下:
hive
-hiveconf local_path=/home/hive/hive_data/products_24_1
-hiveconf hive_table=products_24_1
-hiveconf columnstr=' name , created_at, updated_at, "released" as status '
-f /home/hive/export_hive_table_to_local.sql
須要執行的參數依次是
1.導出到本地的位置local_path
2.導出hive中的哪一個表 hive_table
3. 導出products_24_1 表中的哪些字段 colunmstr
4. 根據上面的參數,在本地建立products_24_1 表,使用-f來指定調用的文件
/home/hive/export_hive_table_to_local.sql 文件內容以下:
insert overwrite local directory '${hiveconf:local_path}'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
ESCAPED BY '\\'
STORED AS TEXTFILE
select ${hiveconf:columnstr}
from ${hiveconf:hive_table};
7.將本地文件導入到psql數據庫中, hive對pg的支持很差,不能用sqoop來進行數據的導入,能夠先將hive中的數據讀到本地,在使用python腳原本進行文件的寫入
def insert_to_pg(conn , table_name , file_path , insert_columns=None):
conn = psycopg2.connect(conn)
cursor = conn.cursor()
if os.path.isfile( file_path ):
datafile=ReadFileProgress(file_path)
cursor.copy_from(file=datafile, table=table_name, sep='\t', null='\\N', size=81920, columns=insert_columns)
datafile.close()
#!/usr/bin/python
# #_*_ coding: utf-8 _*_
import os , sys
import psycopg2
class ReadFileProgress:
def __init__(self, filename):
self.datafile = open(filename)
self.totalRecords = 0
self.totalBytes = os.stat(filename).st_size
self.readBytes = 0
self.datafile.readline()
i = 0
for i, l in enumerate(self.datafile):
pass
self.totalRecords = i + 1
sys.stderr.write("Number of records: %d\n" % (self.totalRecords))
self.datafile.seek(0)
self.datafile.readline()
self.perc5 = self.totalBytes / 20.0
self.perc5count = 0
self.lastPerc5 = 0
sys.stderr.write("Writing records: 0%")
def countBytes(self, size=0):
self.readBytes += size
if (self.readBytes - self.lastPerc5 >= self.perc5):
self.lastPerc5 = self.readBytes
if (int(self.readBytes / self.perc5) == 5):
sys.stderr.write("25%")
elif (int(self.readBytes / self.perc5) == 10):
sys.stderr.write("50%")
elif (int(self.readBytes / self.perc5) == 15):
sys.stderr.write("75%")
else:
sys.stderr.write(".")
sys.stderr.flush()
def readline(self, size=None):
countBytes(size)
return self.datafile.readline(size)
def read(self, size=None):
self.countBytes(size)
return self.datafile.read(size)
def close(self):
sys.stderr.write("100%\n")
self.datafile.close()
8. 從pg上導出指定表
def do_export(conn , table_name , file_path , columns=None):
conn = psycopg2.connect(conn)
cursor = conn.cursor()
cursor.copy_to(file=file(file_path , 'w'), table=table_name, sep='\t', null='\\N', columns=columns)
cursor.close()
conn.commit()
sys.stdout.write("Transaction finished successfully.\n")
9. 則select語句中也能夠經過hiveconf來傳遞參數,執行hive命令
hive -hiveconf name='hello hive'
INSERT OVERWRITE TABLE companies
select
'${hiveconf:name}' as name
from companies_old