本身動手寫Impala UDF

概述

出於對可擴展性和性能的考慮,UDF已變成大數據生態圈查詢引擎的必備功能之一,不管是Calcite、Hive、Impala都對其進行支持,可是UDF的支持有利也有弊,好處在於它提供了對某些用戶獨有需求的支持,例如某些產品須要將表中的某字段使用自定義的方式解析成可讀字段,例如須要實現特殊的聚合函數;它的弊端在於它對用戶開發,這樣對於惡意的用戶可能執行非正常的邏輯,例如在函數中刪除或者拷貝其它文件內容,從而對非受權數據形成破壞,所以對於一個SQL引擎來講,咱們須要UDF的集中管理,全部用戶自定義的UDF都須要管理員審查源代碼,不容許普通用戶本身上傳UDF,從而避免意外的發生。html

對於一般UDF的需求,我的以爲有兩方面的需求:一、系統提供的函數完成不了的需求,或者須要使用系統函數進行拼湊才能完成的需求。二、使用當前系統提供的函數性能太差,須要作一些特別的優化。另外,對於UDF還分爲兩類:自定義函數(UDF)和自定義聚合函數(UDAF),前者會處理每一條輸入的記錄,轉換成處理後的結果,相似於map的功能,後者對於多條記錄進行聚合,輸出聚合以後的值,相似於reduce的功能。node

衆所周知Impala使用了Java和C++實現(雖然大多數時候咱們都說Impala是C++實現的,因此性能更好,可是它的SQL解析部分的確是Java實現的),Impala一樣也支持兩種語言的UDF,可是UDAF目前只能支持C++實現,本文分別介紹這些方法如何在Impala中使用的。c++

目標

咱們這裏的example實現一個UDF和一個UDAF,分別實現以下的需求:
咱們須要實現的UDF爲int level(int),功能爲根據value的值計算出距離該值最近的2的冪數的冪值,若是有多個值則取最大的。例如15,距離它最近的2的冪數爲16,則level(15)=4,level(9)=3, level(12)=3或4則level(12)=4.
咱們須要實現的UDAF爲int sum_str(string),功能爲計算name中出現的第一個整數的和值,若是該字符串不出現整數則爲0,例如」abcd123ef」和」efdg23sd24」的和值爲123+23=146.
這兩個需求算是比較奇葩了吧,看看如何在impala中利用UDF和UDAF實現它們。git

實現

Java版UDF

瞭解Impala的都知道,Impala能夠直接使用Hive的metestore做爲元數據庫,一樣Impala也能夠直接使用Hive的UDF,因此對於以前奮鬥在Hive第一線的同窗們使用Impala有了很多親切感,那麼在這裏就順便溫習一遍Hive的UDF使用流程吧。github

使用Java實現的UDF必須繼承org.apache.Hadoop.hive.ql.exec.UDF類,而後在該類中實現名爲evaluate的函數,猜想Hive/Impala是根據函數名來找到具體的實現的,固然一個類裏面也能夠重載多個evaluate方法。該方法實現以下:package com.netease.hive.udf;shell

import org.apache.hadoop.hive.ql.exec.UDF;

public class LevelUDF extends UDF {
    public Integer evaluate(Integer value) {
        if(value == null)
            return null;

        double temp = value;
        int cnt = 0;
        int max = 1;
        while(temp > 1) {
            cnt ++;
            temp /= 2;
            max *= 2;
        }
        if(max - value > (value - max / 2))
            cnt --;

        return cnt;
    }

    public static void main(String[] args) {
          System.out.println(new LevelUDF().evaluate(9));
    }
}

而後編譯成jar上傳到HDFS上,Impala的自定義函數,不管是Java實現仍是C++實現的都須要手動放到HDFS上,而非像Hive那樣直接能夠add jar命令,而後在impala shell中執行create function函數。數據庫

> hadoop fs -put ./udf.jar hdfs://namenode-or-nameservice/tmp/nrpt/
> create function level(int) returns int location 'hdfs://namenode-or-nameservice/tmp/nrpt/udf.jar' symbol='com.netease.hive.udf.LevelUDF';

在impala中建立UDF必須指定參數和返回值,而且執行symbol爲類名。根據參數和返回值在運行時查找該函數,若是參數和返回值和類中實現有差異則會出現運行時錯誤(create function仍是可以成功的)。apache

> select level(15);
+-------------------+
| default.level(15) |
+-------------------+
| 4                 |
+-------------------+
> select level(9);
+------------------+
| default.level(9) |
+------------------+
| 3                |
+------------------+

使用Java實現UDF是很是簡單的 ,那麼爲何還須要使用C++的實現呢,主要是性能的考慮,畢竟相同的邏輯使用C++實現不管是性能仍是資源消耗都會比JAVA好得多,因此Impala官方是很是推崇使用C++實現UDF,而且聲稱性能會有10倍的提高。ubuntu

C++實現UDF

使用C++實現UDF就不那麼輕鬆了,首先要面臨的就是系統庫的差異,因此通常要求UDF開發機和Impala運行的機器使用相同的Linux發行版,最好在其中的一臺Impalad機器上開發,避免出現不可預測的問題,C++開發UDF須要首先下載impala udf devel開發包和一下其餘依賴的工具:緩存

sudo yum install gcc-c++ cmake boost-devel
sudo yum install impala-udf-devel

g++、cmake工具通常開發機上都是有的,boost開發包須要手動安裝,固然若是你的函數實現不須要boost也不須要安裝這個,不過Impala自己都是使用boost開發的,不過使用boost庫能夠提升開發效率和性能,impala-udf-devel這個包在安裝的時候發現ubuntu和debain上根本找不到,不過不用擔憂,其實安裝這些包主要就是把這個包下面的.h文件放到系統的include目錄下,把.so文件放到系統的lib下,只要有源碼本身也能夠編譯。能夠在https://github.com/laserson/impala-udf-devel下載它們的源碼(就兩個.h文件和一個.cc文件)。對於UDF開發更推薦這種方式,省的本身寫CMakeList.txt文件了,clone到本地以後能夠直接運行cmake生成Makefile文件。

> git clone https://github.com/laserson/impala-udf-devel.git
> cd impala-udf-devel/
> cmake .

此時會發現出現了錯誤:

CMake Error at CMakeLists.txt:46 (add_library):
  Cannot find source file:

  my-udf-file-1.cc

打開CMakeList.txt文件會發現add_library(myudf SHARED my-udf-file-1.cc my-udf-file-2.cc)這一行,它表示根據my-udf-file-1.cc和my-udf-file-2.cc文件生成一個動態連接庫myudf,咱們能夠經過該一下這個配置而後編寫本身的udf文件,咱們把CMakeList.txt文件的最後幾行改成以下:

# Build the UDA/UDFs into a shared library.  You can have multiple UDFs per
# file, and/or specify multiple files here.
add_library(level-udf SHARED level-udf.cc)

# The resulting LLVM IR module will have the same name as the .cc file
if (CLANG_EXECUTABLE)
  COMPILE_TO_IR(level-udf.cc)
endif(CLANG_EXECUTABLE)

而後編輯代碼level-udf.h(建立):

#ifndef LEVEL_UDF_H
#define LEVEL_UDF_H

#include "udf/udf.h"

using namespace impala_udf;

IntVal level(FunctionContext* context, const IntVal& value);

#endif

編輯level-udf.cc文件(建立):

#include "level-udf.h"

using namespace std;

IntVal level(FunctionContext* context, const IntVal& value) {
    if(value.is_null)
        return IntVal::null();

    int original = value.val;
    double temp = (double) original;
    int cnt = 0;
    int max = 1;
    while(temp > 1) {
        cnt ++;
        temp /= 2;
        max *= 2;
    }

    if((max - original) > (original - max / 2))
        cnt --;
    return IntVal(cnt);
}

這裏須要說明一下的是FunctionContext對象,這個對象是調用UDF函數以前由Impala本身建立的,它包含當前查詢的id、查詢執行的用戶名、使用該對象分配內存,而且能夠向Impala日誌系統輸出warning日誌,或者直接輸出一條error日誌以結束該查詢。而每個函數的輸入和輸出是IntVal類型,這個是在udf.h中定義的一個類型,它實際上就是包含null的int類型,除了該類型以外還有AnyVal; BooleanVal; TinyIntVal; SmallIntVal; IntVal; BigIntVal; StringVal; TimestampVal這幾種類型,和Impala中的類型一一對應,其中AnyVal是其餘類型的父類,它能夠標識該對象是否null,其餘的基本類型能夠經過val成員獲取原始值(在不是null的狀況下),StringVal能夠經過ptr和len獲取首地址和長度,TimestampVal則能夠獲取date和time_of_date分別獲取日期和納秒數(這樣的話TimestampVal就能夠包含Date了,何況Impala還不支持Date類型)。

這裏面沒有使用boost庫(大多數狀況下是須要使用的),而後編譯,連接生成動態連接庫:

> cmake .
> make

執行完成以後在當前目錄下產生了build目錄(相似於maven構建以後的target目錄),該目錄下存在連接完成的liblevel-udf.so文件,庫名爲以前在CMakeList.txt文件中修改以後的名字。

而後按照以前的方式首先上傳動態連接庫到HDFS,而後create function。

> hadoop fs -put ./liblevel-udf.so hdfs://namenode-or-nameservice/tmp/nrpt/
> create function level_c(INT) returns int location 'hdfs://namenode-or-nameservice/tmp/nrpt/liblevel-udf.so' symbol='level';

使用C++函數時symbol執行函數名,經過該函數名能夠在liblevel-udf.so中找到該函數。

> select level_c(12);
+---------------------+
| default.level_c(12) |
+---------------------+
| 4                   |
+---------------------+
> select level_c(9);
+--------------------+
| default.level_c(9) |
+--------------------+
| 3                  |
+--------------------+

至於C++實現和Java實現的性能對比,能夠在實際的場景下使用不一樣的語言實現相同的邏輯,只要你的C++代碼寫的不是太挫,那麼至少會有兩倍以上的性能差異的,這裏也推薦使用C++實現UDF,不過必定要注意內存泄漏、段錯誤等狀況,若是實現不當可能到impalad掛掉。對於C++實現UDF的詳細說明能夠參考Impala官方文檔:http://www.cloudera.com/documentation/archive/impala/2-x/2-1-x/topics/impala_udf.html,更多的C++實現的UDF實例能夠參考https://github.com/cloudera/impala-udf-samples

C++實現UDAF

實現一個聚合函數須要不像簡簡單單的實現一個UDF同樣,畢竟須要初始化環境,對於每個輸入記錄進行聚合,這就要求它在整個執行過程當中保持一個狀態,例如計算SUM時須要保持一個sum變量記錄已經處理的記錄的和,這樣的方式和reducer有所差異,它是經過對輸入的值循環調用aggr函數,而reducer則是將這個列表做爲輸入處理。在Impala中實現一個UDAF須要實現下面這些函數:

  • INIT_FN:初始化操做,在查詢執行時執行一次,例如清理計數器,分配緩存等。例如SUM時sum=0。
  • UPDATE_FN:在單機上執行的merge操做,對於每個節點的每一條記錄調用一次該函數,例如SUM時執行sum+= value。
  • MERGE_FN:節點之間的merge,一般邏輯上和UPDATE_FN操做相似,例如SUM時執行sum+=value。
  • SERIALIZE_FN:對於INIT、UPDATE、MERGE階段的結果進行序列化,例如須要對包含指針的值使用它所執行的內容序列化,而後釋放該指針。通常狀況下不須要設置該函數。
  • FINALIZE_FN:最終獲取結果的 函數,只調用一次。

咱們這裏須要實現的是統計某一個string列中每個成員第一次出現的整數的和,咱們定義爲int sum_first_int(string),這裏不牽扯到指針的序列化,因此不須要實現SERIALIZE_FN函數。

該需求的實現sum-udaf.h(新建):

#ifndef _SUM_UDAF_H_
#define _SUM_UDAF_H_

#include "udf/udf.h"

using namespace impala_udf;

void init_func(FunctionContext* context, BigIntVal* result);

void update_func(FunctionContext* context, const StringVal& input, BigIntVal* result);

void merge_func(FunctionContext* context, const StringVal& input, BigIntVal* result);

BigIntVal finalize_func(FunctionContext* context, const BigIntVal& val);

#endif

實現了四個函數的定義,其中init_func函數傳遞了一個BigIntVal函數,它是由函數的返回值決定的,result參數做爲保存聚合結果,update_func函數對於每個輸入的記錄進行處理,input參數爲該記錄中該列的值,merge_func對於不一樣節點的聚合結果進行再聚合,finalize_func則是返回最終結果。能夠看出每個聚合組的狀態信息保存在result參數中。

實現文件sum-udaf.cc(新建):

#include "sum-udaf.h"
#include <ctype.h>

void init_func(FunctionContext* context, BigIntVal* result) {
    result->is_null = false;
    result->val = 0;
}

void update_func(FunctionContext* context, const StringVal& input, BigIntVal* result) {
    if(input.is_null) return;
    int cur = 0;
    uint8_t *ptr = input.ptr;
    int len = input.len;
    int i = 0;
    for(i = 0 ; i < len && !isdigit(ptr[i]); ++ i);
    while(i < len && isdigit(ptr[i])) {
        cur = cur * 10 + (ptr[i] - '0');
        ++ i;
    }
    result->val += cur;
}

void merge_func(FunctionContext* context, const BigIntVal& input, BigIntVal* result) {
    result->val += input.val;
}

BigIntVal finalize_func(FunctionContext* context, const BigIntVal& val) {
    return val;
}

完成以後再修改CMakeList.txt文件,增長對sum-udaf.cc的編譯和連接,而後執行cmake和make(同上),就能夠生成新的.so連接庫了。使用相同的方式將該庫上傳到HDFS而後再impala shell上建立聚合函數。

> create aggregate function sum_first_int(string) returns bigint
> location 'hdfs://namenode-or-nameservice/tmp/nrpt/liblevel-udf.so' init_fn='init_func' update_fn='update_func' merge_fn='merge_func' finalize_fn='finalize_func';

此時可使用該聚合函數了,以下:

> CREATE TABLE default.test (id string);
> insert into test values("abc1234edf"), ("12shd45"), ("dhhf"), ("qwe3"), ("2016-09-01");
> select sum_first_int(id) from test;
+---------------------------+
| default.sum_first_int(id) |
+---------------------------+
| 3265                      |
+---------------------------+

總結

本文詳細講述了在Impala中實現自定義函數和自定義聚合函數的方式並給出實現實例以供參考,使用UDF和UDAF咱們能夠很輕易的實現特定需求的計算邏輯,也能夠用性能更好的方式實現一些相對固定的需求,例如咱們可使用UDAF函數實現更好性能的distinct count計算(利用hyoerloglog實現有偏差的去重計數),可是開放的接口也可能存在一些安全性上的問題,給系統運維的也帶來了必定的負擔,因此對於自定義函數的引入也須要謹慎。

相關文章
相關標籤/搜索