hadoop核心組件(二)

一、Hive概念:java

hive是數據倉庫,由解釋器、優化器和編譯器組成;運行時,元數據存儲在關係型數據庫中。node

二、Hive的架構:sql

(1)用戶接口主要有三個:CLi、Client和WUI。其中最經常使用的是CLi,CLi啓動時候,會啓動一個Hive副本。Client是hive的客戶端,用戶鏈接至Hive Server。在啓動Client模式的時候,須要指出Hive Server所在的節點,而且在該節點啓動Hive Server;WEBUI是經過瀏覽器訪問hive;數據庫

(2)Hive元數據存儲在數據庫中,如MySQL、Derby。Hive中的元數據包括表的名字,表的列和分區及其屬性,表的屬性(是否爲外部表等),表的數據所在的目錄等;apache

(3)解釋器、編譯器、優化器完成HQL查詢語句從詞法解析、語法分析、編譯、優化以及查詢計劃的產生。生成的查詢計劃存儲在HDFS中,並在隨後有MapReduce調用執行。瀏覽器

(4)Hive數據存儲在HDFS中,大部分的查詢、計算由Mapreduce完成(包含 * 的查詢,好比select * from table不會產生mapreduce任務)服務器

三、Hive的三種模式網絡

(1)local模式,通常用於Unit Test架構

(2)單用戶模式,經過網絡鏈接到一個數據庫中,是最常用的模式jvm

(3)多用戶模式,用於非java客戶端訪問元數據庫,在服務器端啓動MetaStoreServer,客戶端利用Thrift協議經過MetaStoreServer訪問元數據庫

四、Hive建表

建表語句見官網:https://cwiki.apache.org/confluence/display/Hive/LanguageManual

建立內部表:

create table psn(
id int,
name string,
likes ARRAY <string>,
address MAP <string,string>
) 
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
COLLECTION ITEMS TERMINATED BY '-'
MAP KEYS TERMINATED BY ':';

注:插入數據不建議用insert語句,由於它會轉成MapReduce,執行時間較長

LOAD DATA [LOCAL] INPATH '/data_path' INTO TABLE psn

 

建立外部表

create EXTERNAL table psn2(
id int,
name string,
likes ARRAY <string>,
address MAP <string,string>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
COLLECTION ITEMS TERMINATED BY '-'
MAP KEYS TERMINATED BY ':'
LOCATION '/user/psn2';
EXTERNAL 表示建立外部表
LOCATION 表示數據存放在hdfs上的路徑

刪除表
drop table psn;               #內表
drop table psn2;               #外表

注:外部表不會刪除數據

 

建立分區表

create table psn3(
id int,
name string,
likes ARRAY <string>,
address MAP <string,string>
)
PARTITIONED BY(sex string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
COLLECTION ITEMS TERMINATED BY '-'
MAP KEYS TERMINATED BY ':';

注:分區表的字段不能是表中的字段,分區字段不存值

分區表插入數據

LOAD DATA LOCAL INPATH '/root/data_path' INTO TABLE psn3 partition (sex='boy');

導入數據第二種方式

from psn insert into table psn2 select id,name,likes,address;

 

動態分區

靜態分區:固定的,經過路徑(hdfs來標識)

動態分區:不固定的,可變的(對於一個文件中分區字段有不一樣的值,導入表的同時分區)

動態分區參數配置
set hive.exec.dynamic.partition=true;               #是否開啓動態分區 默認false   
set hive.exec.dynamic.partition.mode=nostrict;      #設置非嚴格模式 默認strict(至少有一個是靜態分區)
set hive.exec.max.dynamic.partitions.pernode;       #每一個執行的mr節點上,容許建立的動態分區的最大數量(100set hive.exec.max.dynamic.partitions;               #全部執行的mr節點上,容許建立的全部動態分區的最大數量(1000set hive.exec.max.created.files;                    #全部的mr job容許建立的文件的最大數量(100000

注:動態分區導入數據不使用load data...

        建議使用 from tablle2 insert overwrite table table1 partition(age, sex)  select id, name, age, sex, likes, address distribute by age, sex;

五、建立自定義函數

官網網址:https://cwiki.apache.org/confluence/display/Hive/HivePlugins

package com.example.hive.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
 
public final class Lower extends UDF {
  public Text evaluate(final Text s) {
    if (s == null) { return null; }
    return new Text(s.toString().toLowerCase());
  }
}
重點:寫完以後打成jar上傳至hiverserver上,而不是客戶端,進入hive客戶端添加jar包
           add jar地址 path:hiverserver主機的地址,而不是hdfs地址;
實踐證實上傳至客戶端有效,只要建立成功,只要不退出本次會話,即便刪除本地的jar,建立的臨時函數也是有效的
hive> add jar /root/example.jar                                #後面爲jar包上傳的路徑
hive> create  temporary function example as 'com.example.hive.udf.Lower';       #建立臨時函數 example爲hive sql執行的函數名;字符串爲類路徑
hive> select id,name,example(name),likes from psn1;                    #運行
hive> drop temporary function example;                            #刪除臨時函數

六、分桶
-----分桶表是對列值取hash的方式,將不一樣數據放到不一樣文件中存儲
-----對於hive表中每個表、分區均可以進一步進行分桶
-----由列的hash值除以桶的個數來決定每條數據劃分在哪一個桶中
使用場景:數據抽樣(simaple)、mapjoin

首先要開啓支持分桶
set hive.enforce.bucketing=true;      #默認爲flase;設置以後,MR運行時會根據桶的個數自動分配reduce task個數

注:一次做業產生的桶(文件數量)和reduce task個數一致

建立分桶表

CREATE TABLE psnbucket( id INT, name STRING, sex string)
CLUSTERED BY (id) INTO 4 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

查看建立的分桶表

加載數據

1.insert [into/overwrite] table psnbucket select id, name, sex from psn6;
2.from psn6 insert into psnbucket select id,name,sex;

桶的數量對應執行了4個reduce任務 

桶表抽樣查詢

TABLESAMPLE(BUCKET x OUT OF y)
x:表示從哪一個bucket開始抽取數據
y:必須爲該表總bucket數的倍數或因子
 
example:
select id,name,age from bucket_table tablesample(bucket 1 out of 4 on id);

計算抽取哪些數據:

當表總bucket數爲32(total)時
      TABLESAMPLE(BUCKET 2 OUT OF 16),抽取哪些數據?
      共抽取2(32/16)個bucket的數據,抽取第二、第18(16+2)個bucket的數據
if(total /y >0){
   抽取第 x、y+ ( total /y )桶的數據
}else if(total /y < 0){
   抽取第x桶的( total /y )的數據  
}else{
   x、y數量有問題 
}
七、Hive索引
hive建立索引後如需使用的話必須先重建索引,索引才能生效

八、Hive優化
核心思想:把Hive SQL當作MapReduce程序去優化
一下SQL不會轉成MapReduce來執行
  • select僅查詢本表字段 
  • where僅對本表字段作條件過濾 

(1)Explain顯示執行計劃

 

EXPLAIN [EXTENDED] sql語句

 

(2)Hive的運行方式:本地模式和集羣模式

  開啓本地模式:

set hive.exec.mode.local.auto=true;

注意:hive.exec.mode.local.auto.inputbytes.max默認值爲128M ,表示加載文件的最大值,若大於該配置仍會以集羣方式來運行! 

(3)並行計算

set hive.exec.parallel=true;

注意:hive.exec.parallel.thread.number (一次SQL計算中容許並行執行的job個數的最大值) 

(4)嚴格模式

set hive.mapred.mode=strict;      #(默認爲:nonstrict非嚴格模式) 
查詢限制:
           一、對於分區表,必須添加where對於分區字段的條件過濾;
           二、order by語句必須包含limit輸出限制;
           三、限制執行笛卡爾積的查詢。
(5)Hive排序
   Order By - 對於查詢結果作全排序,只容許有一個reduce處理
          (當數據量較大時,應慎用。嚴格模式下,必須結合limit來使用)
        Sort By - 對於單個reduce的數據進行排序
        Distribute By - 分區排序,常常和Sort By結合使用
        Cluster By - 至關於 Sort By + Distribute By
          (Cluster By不能經過asc、desc的方式指定排序規則;
          可經過 distribute by column sort by column asc|desc 的方式)
(6)Hive Join
  Join計算時,將小表(驅動表)放在join的左邊,由於先加載左邊(讓它優先加載小表) 
  Map Join:在Map端完成Join 
  兩種實現方式: 
    一、SQL方式,在SQL語句中添加MapJoin標記(mapjoin hint)
               語法:
                     SELECT  /*+ MAPJOIN(smallTable) */  smallTable.key,  bigTable.value
                     FROM  smallTable  JOIN  bigTable  ON  smallTable.key  =  bigTable.key;
               二、開啓自動的MapJoin
set hive.auto.convert.join = true;      #該參數爲true時,Hive自動對左邊的表統計量,若是是小表就加入內存,即對小表使用Map join

相關配置參數: 

hive.mapjoin.smalltable.filesize;              #大表小表判斷的閾值,若是表的大小小於該值則會被加載到內存中運行 
hive.ignore.mapjoin.hint;                  #默認值:true;是否忽略mapjoin hint 即mapjoin標記 
hive.auto.convert.join.noconditionaltask;         #默認值:true;將普通的join轉化爲普通的mapjoin時,是否將多個mapjoin轉化爲一個mapjoin 
hive.auto.convert.join.noconditionaltask.size;      #將多個mapjoin轉化爲一個mapjoin時,其表的最大值 

(7)Map-Side聚合

set hive.map.aggr=true;
hive.groupby.mapaggr.checkinterval              #map端group by執行聚合時處理的多少行數據(默認:100000)
hive.map.aggr.hash.min.reduction                #進行聚合的最小比例(預先對100000條數據作聚合,若聚合以後的數據量/100000的值大於該配置0.5,則不會聚合)
hive.map.aggr.hash.percentmemory                #map端聚合使用的內存的最大值
hive.map.aggr.hash.force.flush.memory.threshold      #map端作聚合操做是hash表的最大可用內容,大於該值則會觸發flush
hive.groupby.skewindata                     #是否對GroupBy產生的數據傾斜作優化,默認爲false

(8)控制hive中的Map以及Reduce數量

Map數量相關的參數 

mapred.max.split.size        #一個split的最大值,即每一個map處理文件的最大值
mapred.min.split.size.per.node   #一個節點上split的最小值
mapred.min.split.size.per.rack   #一個機架上split的最小值

Reduce數量相關的參數 

mapred.reduce.tasks                #強制指定reduce任務的數量
hive.exec.reducers.bytes.per.reducer      #每一個reduce任務處理的數據量
hive.exec.reducers.max                     #每一個任務最大的reduce數

(9)Hive--JVM重用

適用場景: 一、小文件個數過多    二、task個數過多
set mapred.job.reuse.jvm.num.tasks=n    # 來設置(n爲task插槽個數)
缺點:設置開啓以後,task插槽會一直佔用資源,不管是否有task運行,直到全部的task即整個job所有執行完成時,纔會釋放全部的task插槽資源!
相關文章
相關標籤/搜索