hive Streaming

1.Hive Streaming介紹

在前面咱們看到了UDF、UDTF、UDAF的實現並非很簡單,並且還要求對Java比較熟悉,而Hive設計的初衷是方便那些非Java人員使用。所以,Hive提供了另外一種數據處理方式——Streaming,這樣就能夠不須要編寫Java代碼了,其實Streaming處理方式能夠支持不少語言。可是,Streaming的執行效率一般比對應編寫的UDF或改寫InputFormat對象的方式要低。管道中序列化而後反序列化數據一般時低效的。並且以一般的方式很難調試整個程序。python

Hive中提供了多種語法來使用Streaming,包括:app

  • MAP()
  • REDUCE()
  • TRANSFORM()

可是,注意MAP()實際上並不是在Mapper階段執行Streaming,正如REDUCE()實際上並不是在Reducer階段執行Streaming。所以,相同的功能,一般建議使用TRANSFORM()語句,這樣能夠避免產生疑惑。函數

 

2.Streaming的編寫和使用

Streaming的實現須要TRANSFORM()函數和USING關鍵字,TRANSFORM()的參數是表的列名,USING關鍵字用於指定腳本。本節的數據仍然使用Hive UDF教程(一)中所使用的employee表。oop

例一:Streaming使用Linux命令spa

先看Streaming直接使用Linux系統中的命令cat來查詢表,cat.q是HiveQL文件,內容以下:.net

SELECT TRANSFORM(e.name, e.salary)
USING '/bin/cat' AS name, salary
FROM employee e;

執行結果:設計

hive (mydb)> SOURCE cat.q;
OK
Time taken: 0.044 seconds
Query ID = root_20160120000909_2de2d4f9-b50c-4ed1-a876-768c0127f067
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_1453275977382_0001, Tracking URL = http://master:8088/proxy/application_1453275977382_0001/
Kill Command = /root/install/hadoop-2.4.1/bin/hadoop job  -kill job_1453275977382_0001
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
2016-01-20 00:10:16,258 Stage-1 map = 0%,  reduce = 0%
2016-01-20 00:10:22,942 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.12 sec
MapReduce Total cumulative CPU time: 1 seconds 120 msec
Ended Job = job_1453275977382_0001
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 1   Cumulative CPU: 1.12 sec   HDFS Read: 1040 HDFS Write: 139 SUCCESS
Total MapReduce CPU Time Spent: 1 seconds 120 msec
OK
John Doe	100000.0
Mary Smith	80000.0
Todd Jones	70000.0
Bill King	60000.0
Boss Man	200000.0
Fred Finance	150000.0
Stacy Accountant	60000.0
Time taken: 24.758 seconds, Fetched: 7 row(s)

例二:Streaming使用Python腳本調試

下面,在對比下Hive的sum()函數,和使用sum.py的Python腳本執行狀況,先看Hive的sum()函數執行:code

hive (mydb)> SELECT sum(salary) FROM employee;
Query ID = root_20160120012525_1abf156b-d44b-4f1c-b2c2-3604e4c1bba0
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1453281391968_0002, Tracking URL = http://master:8088/proxy/application_1453281391968_0002/
Kill Command = /root/install/hadoop-2.4.1/bin/hadoop job  -kill job_1453281391968_0002
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2016-01-20 01:25:20,364 Stage-1 map = 0%,  reduce = 0%
2016-01-20 01:25:31,620 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.55 sec
2016-01-20 01:25:42,394 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 2.73 sec
MapReduce Total cumulative CPU time: 2 seconds 730 msec
Ended Job = job_1453281391968_0002
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 2.73 sec   HDFS Read: 1040 HDFS Write: 9 SUCCESS
Total MapReduce CPU Time Spent: 2 seconds 730 msec
OK
720000.0
Time taken: 33.891 seconds, Fetched: 1 row(s)

而後,在看Streaming的方式執行,sum.py腳本:orm

#!/usr/bin/env python

import sys

def sum(arg):
    global total
    total += arg

if __name__ == "__main__":
    total  = 0.0
    for arg in sys.stdin:
        sum(float(arg))
    print total;

HiveQL腳本sum.q:

SELECT TRANSFORM(salary)                     
USING 'python /root/experiment/hive/sum.py' AS total
FROM employee;

最後是執行結果:

hive> source sum.q;
OK
Time taken: 0.022 seconds
Query ID = root_20160120002626_0ced0b93-e4e8-4f3a-91d0-f2aaa06b5f11
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_1453278047512_0002, Tracking URL = http://master:8088/proxy/application_1453278047512_0002/
Kill Command = /root/install/hadoop-2.4.1/bin/hadoop job  -kill job_1453278047512_0002
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
2016-01-20 00:26:28,341 Stage-1 map = 0%,  reduce = 0%
2016-01-20 00:26:36,185 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.4 sec
MapReduce Total cumulative CPU time: 1 seconds 400 msec
Ended Job = job_1453278047512_0002
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 1   Cumulative CPU: 1.4 sec   HDFS Read: 1040 HDFS Write: 9 SUCCESS
Total MapReduce CPU Time Spent: 1 seconds 400 msec
OK
720000.0
Time taken: 17.048 seconds, Fetched: 1 row(s)

使用transform的時候不能查詢別的列,若是須要列a的話能夠直接放到transform裏,而後將其不做處理,直接輸出便可

add file /home/xxx/udf/python/xxx.py;
select transform(m, p, consume, cnt) using 'python xxx.py' as (mid, pid, trans_at, total_cnt) from xxx_table;
相關文章
相關標籤/搜索