從 MapReduce 到 Hive —— 一次遷移過程小記

一、背景介紹

早先的工做中,有不少比較複雜的分析工做,當時對hive還不熟悉,可是java比較熟悉,因此在進行處理的時候,優先選擇了MR.
可是隨着工做的數據內容愈來愈多,愈來愈複雜,對應的調整也愈來愈多,愈來愈複雜.純使用MR方式整個流程就比較複雜,若是須要修改某個部分,那首先須要修改代碼中的邏輯,而後把代碼打包上傳到某個可訪問路徑上(通常就是hdfs),而後在調度平臺內執行.若是改動較大的狀況,可能還會須要在測試環境中屢次調試. 總之就是會花比較多的時間在非業務邏輯改動的工做上.
考慮到維護的成本的增大,慢慢的開始準備將MR的做業,逐漸的移植到一些腳本平臺上去,hive成了咱們的首選。

二、實戰場景

先看這樣一個場景. 每個用戶在登陸到網站上的時候會帶有一個ip地址,屢次登陸可能會有多個不一樣的ip地址.
假設,咱們已經有一個 用戶->ip地址這樣的一份數據.咱們須要對此進行分析,獲得一份來自相同ip的用戶的關係表,數據格式相似
用戶->用戶,具體的ip咱們不保留了。

第1步 用udf取最頻繁ip

咱們先看一下原始數據的字段,是user_id,ips,咱們再來看看ips內容的格式,咱們執行
Select * from iptable limit 100
你會發現,雖然咱們limit了100並且是沒有任何複雜條件的查詢,hive居然也會去掃描全部的數據,這很是奇怪也很浪費。原來hive的limit在默認的狀況下的執行過程就是把全部數據都跑出來,而後再一個reduce上,進行limit。這是爲了保證在某些狀況下篩選條件對結果的影響。
可是咱們能夠經過打開一個hive.limit.optimize.enable=true來簡化這個查詢,當這個選項打開之後hive會讀取hive.limit.row.max.size,hive.limit.optimize.limit.file的默認值來進行小數據量的計算。
咱們看到ips的原始數據的格式是ip,ip,… 用逗號分隔的多個ip字符串。
咱們要從用戶->[ip地址] 這樣的數據中獲得一個用戶使用最多的ip地址做爲用戶的最經常使用ip。這裏咱們會使用hive的自定義udf來完成這一步的工做。
那麼udf是什麼呢,udf就是user define function的縮寫.經過它咱們能夠對hive進行擴展,hive自己已經帶了不少的基本的udf了,好比length(),sin(),unix_timestamp(),regexp_replace()等等.
這些都是一些比較通用的處理,若是有的時候咱們要在字段上作一些特殊的邏輯就要本身動手寫了.
下面就是咱們用來實現這個功能的udf代碼
@Description(name = 「freq
ips」, value = 「find most frequence ips from all login ip」, extended = 「」)
public class FindFreqIps extends UDF {
    public String evaluate(String content, int limit) {
       // 計算最經常使用ip的代碼邏輯,並返回結果
       Return result;
    }
}
裏面的邏輯主要就是找到前limit個最長使用的Ip,咱們看到咱們的類須要繼承自hive包中的UDF類,而後任意的定義輸入類型和返回類型,可是方法的名字必定要叫evaluate,hive會使用反射來獲得這個方法的輸入輸出。當咱們要在hive中使用它的時候,咱們要首先把這個類打成jar包,而後讓hive能夠訪問到。通常能夠直接放在hdfs上,而後使用
Add jar hdfs_path/myjar.jar;
Create temporary function FindFreqIps as ‘FindFreqIps’
Select user_id, FindFreqIps(ips) as freqIps from tablexxx
另外還有一種是繼承自genericUDF,這種方式能夠自由的控制輸入和返回類型處理,比起UDF來講更加的靈活些。可是咱們這裏普通的udf就足夠了。

第2步 列轉行,進行join

從第一步,咱們獲得了用戶最經常使用的N個ip,咱們這裏假設值3個。而後咱們要找到這些用戶之間的關聯,即相同的ip的關係。
那麼很是直接的方式,咱們直接對用戶的ip進行join,可是如今ip是3個連在一塊兒字符串的形式,沒法直接join。那麼咱們就先把ip都分解開。
咱們把這個ips的字段進行一個列轉行的轉換,以下
Select user_id,ip from tablexxx
Lateral view explode(split(ips, 「,」))
subview as ip
這樣就會獲得 user->ip的單條的記錄。這裏的
這下要join就方便了,假設上面的結果表是singleIP咱們
Select a.user_id as fromid, b.user_id as
toid
SingleIP a
Join  SingleIP b
On a.ip = b.ip and a.user_id <>
b.user_id;
什麼,報錯了!
a.user_id <> b.user_id這個部分會報錯,由於hive中join的時候,是隻能指定等式來進行匹配的,不支持不等式的條件。若是使用了不等式,會使join的數量變的很是大。
因而,咱們就只能曲線救國了。
Select * from
(Select a.user_id as fromid, b.user_id as
toid
SingleIP a  Join
SingleIP b
On a.ip = b.ip) m
Where m.fromid <> m.toid;
你會發現,執行了1次join,2次select使用的mr的步驟仍是一步。通常總感受嵌套了一次select之後也會對應的產生2次mr,難道是hive本身進行了優化嗎?那麼咱們藉助hive的分析工具來看看hive是如何執行的呢。
咱們在剛纔的語句前加上explain,來看看這個select的執行計劃。
Hive會經過antlr來對輸入的sql語句進行語法分析,產生一個執行計劃。
執行計劃會有三個部分

第一部分是ABSTRACT SYNTAX

TREE抽象語法樹
這裏面顯示了hive把這個sql解析成什麼樣的各個token。
相似這樣(TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF))表示

第二部分是STAGE DEPENDENCIES

一個hive的執行過程會包含多個stage,他們之間有互相依賴的關係。好比下面
Stage-1 is a root
stage

Stage-0 depends on stages: Stage-1

Stage-3 depends on
stages: Stage-0
這裏的stage-1是root stage。而0依賴於1,3依賴於0。

第三個部分是STAGE PLANS, 就是每一個stage中的具體執行的步驟。

在stage plans裏面每個stage和普通的hadoop程序同樣會有map和reduce的過程。咱們截取一段map過程的計劃看下。
Stage: Stage-1
    Map Reduce
      Alias ->
Map Operator Tree:
        a
          TableScan
            alias:
a
            Reduce
Output Operator
              key expressions:
expr: ip
type: string

             sort

order: +
Map-reduce partition columns:
expr: ip
type: string
              tag:
0
              value expressions:
expr: user_id
type: string
這裏是對a表也就是SingleIP表的一個map階段的操做。Reduce output operator這裏會顯示使用ip做爲key,自增排序,由於是string的因此是字典序的自增。Partition使用ip做爲分發字段。tag指的是相似一個來源的概念,由於這裏的join採用的是reduce join的方式,每個從不一樣的map來的數據最後在reduce進行匯合,他們會被打上一個標記,表明他們的來源。而後就是value的內容,user_id。

而後再來看看reduce過程的計劃
Reduce Operator Tree:

        Join
Operator

          condition
map:

Inner Join 0 to 1

          condition
expressions:

            0
{VALUE._col0}

            1
{VALUE._col0}

handleSkewJoin: false

outputColumnNames: _col0, _col2
這裏顯示一個join的操做。這裏表示把0的內容加到1上。後面有一個handleSkewJoin,這個是hive的一個應對數據傾斜的一種處理方式,默認是關閉的,咱們後面再來詳細看。
這裏也能夠用explain extended,輸出的信息會更加詳細。那麼看了這個咱們再比較一下咱們以前的第二個查詢計劃,咱們來看看加上了嵌套查詢之後的執行計劃有什麼變化呢?會發現hive在reduce的執行計劃裏面會加上一段
Filter
Operator
predicate:
expr: (_col0 <> _col2)
type: boolean
在reduce最後輸出以前,進行了一個過濾的操做,過濾的條件就是外部的查詢的where條件。正如咱們所料,hive發現這個過程是能夠一次性完成的,因此進行了優化,放在了reduce階段來做了。
另外若是hive中有多張表進行join,若是他們的join key是同樣的,那麼hive就會把他們都放在一次mr中完成。

第3步 數據傾斜

上一步中,咱們計算出了全部的相同ip的人的點對點關係。可是這個結果集會有很多問題,好比若是某個ip是一個公共出口,那麼就會出現同一個ip有上萬人都在使用,他們互相join展開之後,結果的數據量會很是大,時間上很慢不說,最終獲得的數據實際上不少咱們也用不上(這個是基於業務上得考慮),甚至有可能,在展開的時候會出現各類問題,致使計算時間過長,算不出來。這種狀況,咱們在hive裏面稱之爲數據傾斜。

在group by的時候,若是出現某一個reduce上得數據量過大的狀況,hive有一個默認的hive.groupby.skewindata選項,當把它設置爲true的時候,hive會將原來的一次MR變成2次,第一次,數據在reduce的時候會隨機分發到每一個reduce,作部分的聚合,而後第二次的時候再按照group by的key進行分發。這樣能夠有效的處理通常的傾斜狀況。

而在join的時候,若是join的其中某個key的值很是的多,也會致使傾斜。有的時候,若是有null值,在hive看來null和null是相等的,它也會對他們進行join,也會錯誤的傾斜。因爲join的時候,hive會把第一張表的內容放到一個內容map中,而後不斷的讀取後表的內容來進行join,因此若是左邊的表示小表這個過程就會很是的高效。固然使用mapjoin也一種有效的方式,直接把一張足夠小的表徹底放到內存來後另外一張表進行join。相似這樣
SELECT /*+ MAPJOIN(b) */
a.key, a.value FROM a join b on a.key = b.key;
咱們的ip計算使用的是本身join本身,全部也沒有大小表之分,同時單表的數據量也大到沒法徹底放進內存,那麼是否是就要進行硬算呢?在實際中,由於ip的分佈沒有傾斜到太過火的程度,硬算也確實能夠,可是這裏咱們換一種方式來稍稍優化一下。
首先咱們採用bucket的方式來保存之間的用戶->ip的數據。使用ip來做爲分桶鍵。
CREATE TABLE userip(user_id bigint, ip STRING)
CLUSTERED BY (ip) INTO 128 BUCKETS;
而後set hive.enforce.bucketing  = true;開啓bucket計算
from tableaaa
insert overwrite table tablebbb
select user_id, ip;
結果將會被保存到128個不一樣的桶中,默認根據ip的hashcode來取模。這樣每一個桶內的數據基本大概是原數據量的1/100。固然若是原始數據量太大,還能夠分桶更加多一些。
這個地方若是咱們不開啓enforce.bucketing的話,也能夠經過設置
set mapred.reduce.tasks=128.而後在查詢中cluster by來強制指定進行分桶。這步完成以後,咱們再來進行設置
set hive.optimize.bucketmapjoin=true;
set  hive.optimize.bucketmapjoin.sortedmerge=true;
而後hive就能對每一個分塊數據進行mapjoin。

第4步 用udaf取top N

好了,如今咱們已經有全部的user->user的數據,咱們但願要一個user->[users]的一對多的記錄,可是這個數據量有點大,實際上每一個用戶大概關聯1000個已經足夠了。首先對數據進行排序,排序的依據就是按照用戶的相同的ip的數量。而後去最前面的1000個,不足的按實際數量取。
這個地方比較容易想到的就是,先group by fromid,toid,而後count一個總數做爲新字段,以下
這裏想到一種作法是用淘寶的一個類sql的row_number實現,而後用row_number來對fromid作主鍵,給按照count從大到小寫上序列編號seq。最後作一個嵌套查詢,只取seq<=1000的數據。Row_number的話標準的hive中沒有。那麼這裏就可讓自定義udaf上場了。
Udaf顧名思義就是一個Aggregate的udf,和以前的udf的區別就是他通常是用來group by的場合中。
@Description(name = 「myudaf」, value = 「calc users has most same ips 」 )

public class GenericUDAFCollect extends AbstractGenericUDAFResolver {
    @Override
    public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
           throws SemanticException {
       return new MyUDAFEvaluator();
    }
}

本身定義一個evaluator,而且實現其中的一些方法。 java

public static class MyUDAFEvaluator extends GenericUDAFEvaluator {
    @Override
    public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
	
    }
    @Override
    public void reset(AggregationBuffer agg) throws HiveException {
	
    }
    @Override
    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
	
    }
    // Mapside
    @Override
    public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
	
    }
    // Mapside
    @Override
    public Object terminatePartial(AggregationBuffer agg) throws HiveException {
	
    }
    @Override
    public void merge(AggregationBuffer agg, Object partial) throws HiveException {
	
    }
	// Reduceside
    @Override
	public Object terminate(AggregationBuffer agg) throws HiveException {
	
    }
}
在init階段會傳一個Mode進來,這個Mode中定義瞭如下的幾個階段
PARTIAL1: 這個是map階段,這個階段會調用iterate(),和terminatePartial()
PARTIAL2:  這個是map段得combiner階段,會將map端的數據進行合併,也可能沒有這個階段。會執行merge()和terminatePartial()

FINAL: 這個是reduce階段,會調用merge()和terminate() 程序員

COMPLETE: 這是純map處理,無reduce的狀況出現的階段,它會調用iterate()和terminate()
而從函數方面來講,init是初始化,他會傳入mode做爲參數。能夠根據不一樣的階段採起不一樣的處理。getNewAggregationBuffer的處理是hive爲了內存的複用,減小gc,他並非每一次處理一條記錄都會新申請空間,而是在處理一批數據的時候重複使用一批內存。Terminate就是最終的輸出了。
Ok,瞭解了udaf,那麼能夠動手了。Sql以下
Select fromid, getTopN(toid,n) from tablexx3
Group by fromid
其中的getTopN首先在map端,將每個fromid的關聯的toid的次數都記錄下來,記錄條數表明重複的ip數量,而後按照這個次數進行倒序排序,截取前n個。
在reduce端,將各個map端的結果再按照次數倒序排序,再進行截取n個並進行合併。最終輸出的就是每一個fromid對應的toid的列表了。
從此次從mr轉換到hive的過程當中,對咱們目前的mr和hive進行了一些比較

三、mr和hive比較

1. 運算資源消耗

不管從時間,數據量,計算量上來看,通常狀況下mr都是優於或者等於hive的。mr的靈活性是毋庸置疑的。在轉換到hive的過程當中,會有一些爲了實現某些場景的需求而不得不用多步hive來實現的時候。

2. 開發成本/維護成本

毫無疑問,hive的開發成本是遠低於mr的。若是能熟練的運用udf和transform會更加提升hvie開發的效率。另外對於數據的操做也很是的直觀,對於全世界程序員都喜聞樂見的sql語法的繼承也讓它更加的容易上手。
   hive獨有的分區管理,方便進行數據的管理。
   代碼的管理也很方便,就是直接的文本。
   邏輯的修改和生效很方便。
   可是當出現異常錯誤的時候,hive的調試會比較麻煩。特別是在大的生產集羣上面的時候。

3. 底層相關性

在使用hive之後,讀取文件的時候,不再用關心文件的格式,文件的分隔符,只要指定一次,hive就會保存好。相比mr來講方便了不少。
當側重關心與業務相關的內容的時候,用hive會比較有優點。而在一些性能要求高,算法研究的時候,mr會更加適合。
相關文章
相關標籤/搜索