hive.apache.orgjava
Hive是什麼?node
Hive是Facebook開源的用於解決海量結構化日誌的數據統計,是基於Hadoop的一個數據倉庫工具,能夠將結構化的數據文件映射爲一張表,而且提供類SQL查詢功能,本質是將HQL轉化成MapReduce程序。mysql
數據存儲在HDFS,分析數據底層實現默認是MapReduce,執行程序運行在Yarn上。sql
若是沒有Hiveshell
想象一下數據統計的時候寫大量的MapReduce程序,那會是多麼痛苦。若是是寫SQL就開心多了,尤爲是離線數據倉庫方面普遍應用。數據庫
因爲 Hive 採用了相似SQL 的查詢語言 HQL(hive query language),所以很容易將 Hive 理解爲數據庫。其實從結構上來看,Hive 和數據庫除了擁有相似的查詢語言,再無相似之處。express
Hive經過給用戶提供的一系列交互接口,接收到用戶的指令(SQL),使用本身的Driver,結合元數據(MetaStore),將這些指令翻譯成MapReduce,提交到Hadoop中執行,最後,將執行返回的結果輸出到用戶交互接口。apache
mysql字段類型 | hive:ods字段類型 | hive:dwd字段類型 |
---|---|---|
tinyint | tinyint | tinyint |
int | int | int |
bigint | bigint | bigint |
varchar | string | string |
datetime | bigint | string |
bit | boolean | int |
double | double | double |
hiveserver2
beeline
!connect jdbc:hive2://cdh01.cm:10000
CREATE DATABASE 數據庫名字
show databases
drop database 數據庫名字 drop database 數據庫名字 cascade
desc database 數據庫名字 desc database extended 數據庫名字
use 數據庫名字
show tables
desc formatted 表名
建表語法瀏覽器
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name [(col_name data_type [COMMENT col_comment], ...)] [COMMENT table_comment] [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] [CLUSTERED BY (col_name, col_name, ...) [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS] [ROW FORMAT row_format] [STORED AS file_format] [LOCATION hdfs_path]
建表經常使用參數安全
EXTERNAL:外部表關鍵字,Hive份內部表(元數據和數據會被一塊兒刪除,通常不過重要的表如:中間臨時表)和外部表(只刪除元數據,不刪除數據,通常而言都採用外部表,由於數據安全些),通常而言採用外部表。
PARTITIONED BY:分區指定字段爲'dt'。
PARTITIONED BY ( `dt` String COMMENT 'partition' )
row format delimited fields terminated by '\t' :數據使用什麼作切分,這裏使用製表符。
stored as parquet:文件存儲格式,推薦parquet(spark自然支持parquet)。
location '/warehouse/層名/庫名/表名' :文件存儲位置。
tblproperties ("parquet.compression"="snappy") :文件壓縮策略,推薦snappy。
CLUSTERED BY建立分桶表(抽樣場景使用)
SORTED BY排序(通常狀況查詢語句都有排序,故不經常使用)
建表例子
#刪除表 drop table if exists dwd.student #建立表(庫名.表名) CREATE EXTERNAL TABLE `dwd.student`( `ID` bigint COMMENT '', `CreatedBy` string COMMENT '建立人', `CreatedTime` string COMMENT '建立時間', `UpdatedBy` string COMMENT '更新人', `UpdatedTime` string COMMENT '更新時間', `Version` int COMMENT '版本號', `name` string COMMENT '姓名' ) COMMENT '學生表' PARTITIONED BY ( `dt` String COMMENT 'partition' ) row format delimited fields terminated by '\t' location '/warehouse/dwd/test/student/' stored as parquet tblproperties ("parquet.compression"="snappy")
建立內部表就是去掉EXTERNAL關鍵字,使用插入數據案例測試一下刪除表後從新創建的效果吧。
INSERT INTO TABLE dwd.student partition(dt='2020-04-05') VALUES(1,"heaton","2020-04-05","","","1","zhangsan") INSERT INTO TABLE dwd.student partition(dt='2020-04-06') VALUES(2,"heaton","2020-04-06","","","1","lisi")
SELECT * FROM dwd.student SELECT * FROM dwd.student WHERE dt="2020-04-05" SELECT * FROM dwd.student WHERE dt="2020-04-06"
LOAD DATA INPATH '/warehouse/dwd/test/student/dt=2020-04-05/000000_0' INTO TABLE dwd.student1 partition(dt='2020-04-05')
TRUNCATE TABLE dwd.student1
insert into table dwd.student1 partition(dt='2020-04-05') select id,createdby,createdtime,updatedby,updatedtime,version,name from dwd.student where dt="2020-04-06"
因爲student1表2020-04-05分區中已經有一條張三數據,這時插入李四數據,查詢結果爲2條數據
insert overwrite table dwd.student1 partition(dt='2020-04-05') select id,createdby,createdtime,updatedby,updatedtime,version,name from dwd.student where dt="2020-04-06"
由上面的例子使student1表2020-04-05分區中有2條數據,這時候在插入一條,發現只有一條,由於數據被複寫。
with s1 as ( select id,createdby,createdtime,version,name from dwd.student where dt="2020-04-05" ), s2 as ( select id,updatedby,updatedtime,version,name from dwd.student where dt="2020-04-06" ) insert overwrite table dwd.student1 partition(dt='2020-04-06') select s1.id as id, s1.createdby as createdby, s1.createdtime as createdtime, "" as updatedby, "0" as updatedtime, s1.version as version, s1.name as name from s1 union all select s2.id as id, "" as createdby, "0" as createdtime, s2.updatedby as updatedby, s2.updatedtime as updatedtime, s2.version as version, s2.name as name from s2
hive -e 「select * from xx」:hive命令行SQL語句
hive -f xx.hql:hive執行sql文件
CLUSTERED BY(字段名) into 分桶數 buckets
指定分桶條件。set hive.enforce.bucketing=true; set mapreduce.job.reduces=-1;
y必須是table總bucket數的倍數或者因子,根據Y值決定抽樣的比例,如:table共4個桶,當y=2時,抽取4/2=2個桶數據,等y=8時,抽取4/8=1/2的bucket數據。
x是從哪一個桶開始抽,而後依次抽取x+y的桶數據,如:table共4個桶,當y=2,x=1時,抽取4/2=2個桶數據,桶數爲1號,1+2=3號。
#建表sql CREATE EXTERNAL TABLE `dwd.student_buck`( `ID` bigint COMMENT '', `name` string COMMENT '姓名' ) COMMENT '學生表' CLUSTERED BY(ID) into 4 buckets row format delimited fields terminated by '\t' stored as parquet location '/warehouse/dwd/test/student_buck/' tblproperties ("parquet.compression"="snappy") #插入數據 INSERT INTO TABLE dwd.student_buck VALUES(1,"zhangsan"),(2,"lisi") ,(3,"wangwu") ,(4,"zhaoliu") ,(5,"haqi") ,(6,"xiba") ,(7,"heijiu") ,(8,"washi") #抽樣查詢 SELECT * from dwd.student_buck TABLESAMPLE(BUCKET 1 OUT OF 4 ON id)
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Select
Order By 全局排序(全局一個Reducer)
Group By 分組(會根據跟的字段,發到不一樣的Reducer中)
Distribute By 相似MR中的partition進行分區,需結合Sort By使用,並且要寫在前面,如:根據日期分區,在根據年齡排序
#多reduce纔有效果 select * from student distribute by createtime sort by age descSort By每一個Reducer內部排序,對全局結果來講不是排序,結合Distribute By使用。(多Reducer區內排序)
Cluster By 當分區字段和區內排序字段相同,也就是distribute by和sort by須要字段相同可使用其代替,可是隻能升序排列。
NVL(string,替換值),若是string爲NULL,則返回替換值,不然返回string值
# 1 格式化時間-> 2020-04-05 00:00:00 SELECT date_format("2020-04-05","yyyy-MM-dd HH:mm:ss") # 2 時間天數相加-> 2020-04-05 ->2020-04-01 SELECT date_add("2020-04-05",4) SELECT date_add("2020-04-05",-4) # 3 時間天數相減->2020-04-01(通常直接用date_add了,這個不怎麼用) SELECT date_sub("2020-04-05",4) # 4 兩個時間相減-> 4 SELECT datediff("2020-04-09","2020-04-05") # 5 時間轉時間戳-> 1586016000 SELECT unix_timestamp("2020-04-05","yyyy-MM-dd") # 6 時間戳轉時間-> 2020-04-05 SELECT from_unixtime(cast("1586016000" as BIGINT),"yyyy-MM-dd") # 7 將日期轉爲星期-> 1 (星期一) SELECT pmod(datediff("2020-04-06","1920-01-01")-3,7)
財務部 | 男 |
---|---|
財務部 | 男 |
財務部 | 女 |
科技部 | 女 |
人事部 | 女 |
人事部 | 男 |
如上圖假設表 emp 中有6我的,求每一個部門下面的男女個數。
#使用CASE WHEN select department, sum(case sex when '男' then 1 else 0 end) man_count, sum(case sex when '女' then 1 else 0 end) woman_count, from emp group by department #還可使用IF select department, sum(if(sex='男',1,0)) man_count, sum(if(sex='女',1,0)) woman_count, from emp group by department
# 1 concat拼接-> 2020-04-05 SELECT concat("2020","-","04","-","05") # 2 concat_ws拼接-> 2020-04-05 注意 concat_ws能夠傳collect_set,須要字段爲string SELECT concat_ws("-","2020","04","05") # 3 彙總成Array類型字段-> [8,4,5,1,6,2,7,3] SELECT collect_set(id) from dwd.student_buck
EXPLODE(字段名):將一列複雜的array或者map結構拆分紅多行
炸裂函數必須配合側寫視圖函數:LATERAL VIEW udtf(expression) 側寫視圖別名 AS 側寫結果列別名
# 建表 CREATE EXTERNAL TABLE `dwd.hobby`( `id` bigint COMMENT '', `name` string COMMENT '姓名', `hobby_name` array<string> COMMENT '愛好名字' ) COMMENT '愛好表' PARTITIONED BY ( `dt` String COMMENT 'partition' ) row format delimited fields terminated by '\t' collection items terminated by ',' stored as parquet location '/warehouse/dwd/test/hobby/' tblproperties ("parquet.compression"="snappy") # 插入3條數據 INSERT INTO TABLE dwd.hobby partition(dt='2020-04-05') SELECT 1,"zhangsan",array("籃球","足球","羽毛球","惡做劇") INSERT INTO TABLE dwd.hobby partition(dt='2020-04-05') SELECT 2,"lisi",array("足球","惡做劇") INSERT INTO TABLE dwd.hobby partition(dt='2020-04-05') SELECT 3,"wangwu",array("羽毛球","惡做劇") # 使用側寫視圖將炸裂結果做爲臨時視圖,聚合須要的結果 select name,hobby_name_col from dwd.hobby lateral view explode(hobby_name) hobby_tmp as hobby_name_col
主要解決1行和n行數據沒法聚合在一塊兒展現的問題。
#建表 CREATE EXTERNAL TABLE `dwd.order`( `name` string COMMENT '姓名', `order_date` string COMMENT '購買日期', `price` bigint COMMENT '消費金額' ) COMMENT '訂單表' PARTITIONED BY ( `dt` String COMMENT 'partition' ) row format delimited fields terminated by '\t' stored as parquet location '/warehouse/dwd/test/order/' tblproperties ("parquet.compression"="snappy") #插入數據 INSERT INTO TABLE `dwd.order` partition(dt='2020-04-05') VALUES("zhangsan","2020-01-01",15),("lisi","2020-01-02",22),("zhangsan","2020-04-01",34),("lisi","2020-04-01",15),("zhangsan","2020-04-04",42),("zhangsan","2020-03-01",24),("lisi","2020-02-01",65),("wangwu","2020-04-01",33),("zhangsan","2020-05-01",43),("zhangsan","2020-07-01",12),("wangwu","2020-02-05",32),("zhangsan","2020-03-06",22),("lisi","2020-04-07",14)
SELECT name,count(*) OVER() FROM dwd.`order` WHERE order_date BETWEEN "2020-04-01" AND "2020-04-30" GROUP BY name
SELECT *,sum(price) OVER() FROM dwd.`order`
SELECT *,sum(price) OVER(ORDER BY order_date) FROM dwd.`order` SELECT *,sum(price) OVER(ORDER BY order_date DESC) FROM dwd.`order`
over函數傳參,仍是給全部結果集進行開窗,可是根據參數限定窗口大小,上面sql的意思爲:
1號zhangsan數據窗口內只含有 1號張三
2號lisi數據窗口內含有 1號張三,2號李四
3號lisi數據窗口內含有 1號張三,2號李四,3號李四
依次類推
SELECT *,sum(price) OVER(distribute by name) FROM dwd.`order`
SELECT *,sum(price) OVER(distribute by name sort by order_date) FROM dwd.`order`
SELECT *,sum(price) OVER(distribute by name rows between UNBOUNDED PRECEDING and CURRENT ROW) FROM dwd.`order`
SELECT *,lag(order_date,1,"1970-01-01") OVER(distribute by name sort by order_date) FROM dwd.`order`
SELECT *,ntile(5) OVER() FROM dwd.`order`
SELECT * FROM (SELECT *,ntile(5) OVER(ORDER BY order_date) ntile_5 FROM dwd.`order`) t1 WHERE t1.ntile_5=1
SELECT *,row_number() OVER(order by order_date) FROM dwd.`order`
SELECT *,rank() OVER(order by order_date) FROM dwd.`order`
SELECT *,dense_rank() OVER(order by order_date) FROM dwd.`order`
https://cwiki.apache.org/confluence/display/Hive/HivePlugins
必須有返回值,能夠返回null,可是類型不能是void。
須要繼承UDF類,而且方法名爲evaluate
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>2.1.1</version> </dependency>
import org.apache.hadoop.hive.ql.exec.UDF; //模擬concat public class MyUDF extends UDF { public String evaluate(String... args) { StringBuilder s = new StringBuilder(); for (String arg : args) { s.append(arg); } return s.toString(); } }
# 1 添加jar包,注意服務器本地路徑(hive客戶端關閉則消失,須要從新添加,若是不想從新添加,能夠直接使用4將jar包放入hive/lib下) add jar /root/test/function-1.0-SNAPSHOT.jar # 2 添加函數,注意hive中的函數別名,還有jar包全類名,以下爲永久建立和臨時建立temporary(hive客戶端關閉則消失) create function myconcat as "com.hive.function.udf.MyUDF" create temporary function myconcat as "com.hive.function.udf.MyUDF" # 3 加載jar包的第二種方式,上傳jar包至hdfs集羣 # create temporary function myconcat as "com.hive.function.udf.MyUDF" using jar 'hdfs:///hive_jar/function-1.0-SNAPSHOT.jar'; # 4 加載jar包的第三種方式,直接放在hive的lib目錄下,啓動hive,使用2添加函數便可。下面是cdh環境的jars包路徑,配置軟鏈接到hive/lib下 # ln -s /opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/jars/function-1.0-SNAPSHOT.jar /opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/hive/lib # 5 刪除函數 # drop function myconcat
SELECT myconcat("a","-","b","-","c")
用戶自定義UDAF必須繼承UDAF,必須提供一個實現了UDAFEvaluator接口的內部類
import org.apache.hadoop.hive.ql.exec.UDAF; import org.apache.hadoop.hive.ql.exec.UDAFEvaluator; //模擬avg public class MyUDAF extends UDAF { public static class AvgState { private long mCount; private double mSum; } public static class AvgEvaluator implements UDAFEvaluator { AvgState state; public AvgEvaluator() { super(); state = new AvgState(); init(); } /** * init函數相似於構造函數,用於UDAF的初始化 */ public void init() { state.mSum = 0; state.mCount = 0; } /** * iterate接收傳入的參數,並進行內部的輪轉。其返回類型爲boolean * * @param o * @return */ public boolean iterate(Double o) { if (o != null) { state.mSum += o; state.mCount++; } return true; } /** * terminatePartial無參數,其爲iterate函數遍歷結束後,返回輪轉數據, * terminatePartial相似於hadoop的Combiner * * @return */ public AvgState terminatePartial() { // combiner return state.mCount == 0 ? null : state; } /** * merge接收terminatePartial的返回結果,進行數據merge操做,其返回類型爲boolean * * @param o * @return */ public boolean merge(AvgState avgState) { if (avgState != null) { state.mCount += avgState.mCount; state.mSum += avgState.mSum; } return true; } /** * terminate返回最終的彙集函數結果 * * @return */ public Double terminate() { return state.mCount == 0 ? null : Double.valueOf(state.mSum / state.mCount); } } }
用戶自定義UDAF必須繼承GenericUDTF,重寫initialize(),process(),close()方法。
import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import java.util.ArrayList; import java.util.List; //模擬EXPLODE和split public class MyUDTF extends GenericUDTF { private List<String> dataList = new ArrayList<>(); //初始化方法,返回對象結構校驗器 @Override public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException { //列名,會被用戶傳遞的覆蓋 List<String> fieldNames = new ArrayList<>(); fieldNames.add("word1"); //返回列以什麼格式輸出,這裏是string,添加幾個就是幾個列,和上面的名字個數對應個數。 List<ObjectInspector> fieldOIs = new ArrayList<>(); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } @Override public void process(Object[] objects) throws HiveException { //獲取數據 String data = objects[0].toString(); //獲取分隔符 String splitKey = objects[1].toString(); //切分數據 String[] words = data.split(splitKey); //遍歷寫出 for (String word : words) { //將數據放入集合 dataList.clear(); dataList.add(word); //寫出數據到緩衝區 forward(dataList); } } @Override public void close() throws HiveException { //沒有流操做 } }
hive.execution.engine配置爲spark
Fetch抓取修改成more,可使全局查找,字段查找,limit查找等都不走計算引擎,而是直接讀取表對應儲存目錄下的文件,大大普通查詢速度。
hive.fetch.task.conversion配置爲more
hive能夠經過本地模式在單臺機器上處理全部的任務,對於小的數據集,執行時間能夠明顯被縮短。
hive-site.xml調整下面3個參數,開啓本地模式,文件不超過50M,個數不超過10個。
hive.exec.mode.local.auto=true
hive.exec.mode.local.auto.inputbytes.max=50000000 (50M左右,默認128M->134217728,機器資源足建議使用默認值)
hive.exec.mode.local.auto.input.files.max=10 (模式4個)
<property><name>hive.exec.mode.local.auto</name><value>true</value></property> <property><name>hive.exec.mode.local.auto.inputbytes.max</name><value>50000000</value></property> <property><name>hive.exec.mode.local.auto.input.files.max</name><value>10</value></property>
在join問題上,讓小表放在左邊 去左連接(left join)大表,這樣能夠有效的減小內存溢出錯誤發生的概率。
hive.auto.convert.join開啓。(默認開啓)
hive.mapjoin.smalltable.filesize(默認25000000->接近24M,若是機器內存足能夠適當調大,需在hive-site.xml中設置,如7.3)
大表和大表join,空key會打到同一個reduce上,形成數據傾斜,任務緩慢,內存泄漏。(reduce任務某個很是慢,其餘很快,及發生數據傾斜)
- 空key過濾:使用子查詢過濾掉空key,能夠有效的提高查詢速率。
- 空key轉換:附隨機值,使其能夠隨機均勻的分佈在不一樣的reduce上,使用case when then或if判斷null值,賦予rand()函數。
默認狀況下map階段同一個key發送給一個reduce,當一個key數據過大時就發生數據傾斜。
那麼把某些聚合操做提到Map端進行部分聚合,最後在reduce端得出最終結果,也能夠有效的提高執行效率。
hive.map.aggr開啓。(默認開啓)
hive.groupby.mapaggr.checkinterval=100000(默認100000條,在map端進行聚合操做的條目數目,需在hive-site.xml中設置,如7.3)
hive.groupby.skewindata=true(默認false,有數據傾斜時進行負載均衡,需在hive-site.xml中設置,如7.3)
hive.groupby.skewindata當選項設置爲true時,生成的查詢計劃會有兩個MR Job,第一個MR Job會將key加隨機數均勻的分佈到Reduce中,作部分聚合操做(預處理),第二個MR Job在根據預處理結果還原原始key,按照Group By Key分佈到Reduce中進行聚合運算,完成最終操做。
數據量小的時候不要緊,大數據量下,因爲Count Distinct操做須要用一個Reduce任務來完成,這一個Reduce須要處理的數據量太大,會致使Job緩慢,可使用子查詢Group By再Count的方式替換。
列處理:不使用Select *,使用什麼字段就寫什麼字段,就算是全部字段,也要一一列出,養成好習慣。
行處理:在join操做中,不要直接關聯表後使用where條件,這樣會使全表關聯後在過濾,使用子查詢過濾後在join來替換,使查詢效率提升。以下
#錯誤寫法 select id from student s left join class c on s.cid=c.id where c.id<=10 #正確寫法 select id from student s left join (select id from class where id<=10 ) c on s.cid=c.id
當數據會根據狀況變化的時候,先有數據,再想分區的狀況。
設計表時儘可能避免動態分區,速度比靜態分區(也就是直接指定分區慢不少)。
開啓動態分區參數
必要參數
相關參數
建表方式同靜態分區,插入方式要注意,查詢結果最後一個字段即爲分區字段(無論名字是否同樣,會將最後一個字段的值,直接拿來分區)
#錯誤寫法:會將name值直接插入dt分區字段。 insert into table dwd.student1 partition(dt) select id,createdby,createdtime,updatedby,updatedtime,version,name from dwd.student #正確寫法 insert into table dwd.student1 partition(dt) select id,createdby,createdtime,updatedby,updatedtime,version,name,dt from dwd.student
hive.exec.parallel=true;默認false。需在hive-site.xml中設置,如7.3
hive.exec.parallel.thread.number=16;默認8,同一個sql容許的最大並行度,針對集羣資源適當增長。需在hive-site.xml中設置,如7.3
在執行的查詢sql前加上Explain指令,查詢分析sql執行過程。