hive做爲一個sql查詢引擎,自帶了一些基本的函數,好比count
(計數),sum
(求和),有時候這些基本函數知足不了咱們的需求,這時候就要寫hive hdf(user defined funation)
,又叫用戶自定義函數。java
org.apache.hadoop.hive.ql.exec.UDF
類,實現evaluate方法;jar
包上傳到集羣,經過create temporary function
建立臨時函數,不加temporary
就建立了一個永久函數;下面是一個判斷hive表字段是否包含’100’
這個子串的簡單udf
:mysql
package com.js.dataclean.hive.udf.hm2 import org.apache.hadoop.hive.ql.exec.UDF; public class IsContains100 extends UDF{ public String evaluate(String s){ if(s == null || s.length() == 0){ return "0"; } return s.contains("100") ? "1" : "0"; } }
使用maven將其打包,進入hive cli
,輸入命令:sql
add jar /home/hadoop/codejar/flash_format.jar; create temporary function isContains100 as 'com.js.dataclean.hive.udf.hm2.IsContains100';
建立完臨時函數,便可使用這個函數了:shell
select isContains100('abc100def') from table limit 1; 1
經過讀取mysql數據庫中的規則,爲hive中的workflow返回對應的,類型:數據庫
type workflow a 1 a 2 b 11 b 22 b 33
需求:咱們但願,將hive的workflow字段取值爲,1,2的變爲類型(type)a
,取值爲11,22,33的所有變爲b
,就是歸類的意思。apache
這個udf能夠這麼實現:maven
package com.js.dataclean.hive.udf.hm2.workflow; import org.apache.hadoop.hive.ql.exec.UDF; import java.sql.*; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @ Author: keguang * @ Date: 2018/12/13 16:24 * @ version: v1.0.0 * @ description: */ public class GetWorkflow extends UDF{ private static final String host = "0.0.0.0"; private static final String port = "3306"; private static final String database = "root"; private static final String userName = "root"; private static final String password = "123456"; private static String url = ""; private static final String driver = "com.mysql.jdbc.Driver"; private static Connection conn = null; private static Map<String, List<String>> workflowType = null; static { url = "jdbc:mysql://" + host + ":" + port + "/" + database; try { // Class.forName(driver); conn = DriverManager.getConnection(url, userName, password); workflowType = getWorkflowType(conn); } catch (Exception e) { e.printStackTrace(); } } private static Map<String, List<String>> getWorkflowType(Connection conn){ Map<String, List<String>> workflowType = new HashMap<>(); String sql = "select * from flash_player_workflow"; PreparedStatement ps = null; try { ps = conn.prepareStatement(sql); ResultSet rs = ps.executeQuery(); while (rs.next()){ String workflow = rs.getString("workflow"); String type = rs.getString("flag"); List<String> workflows = workflowType.get(type); if(workflows == null){ workflows = new ArrayList<>(); } workflows.add(workflow); workflowType.put(type, workflows); } } catch (SQLException e) { e.printStackTrace(); }finally { // 關閉連接 if(conn != null){ try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } return workflowType; } public String evaluate(String s){ assert workflowType != null; for(String type:workflowType.keySet()){ List<String> workflows = workflowType.get(type); if(workflows.contains(s)){ return type; } } return s; } }
打好jar
包,建立函數: workflow2type(省略),而後使用:ide
select workflow2type(workflow) from table; a a b b b
這樣就把不少取值歸爲幾個大類了。函數
查month 相關的函數oop
show functions like '*month*';
查看 add_months 函數的用法
desc function add_months;
查看 add_months 函數的詳細說明並舉例
desc function extended add_months;
能夠看出,udf就是一個輸入一個輸出,輸入一個性別,返回’男’或者’女’,若是咱們想實現select date,count(1) from table
,統計天天的流量呢?這就是一個分組統計,顯然是多個輸入,一個輸出,這時候udf已經不能知足咱們的須要,就須要寫udaf,user defined aggregare function
(用戶自定義聚合函數)。
這裏寫一個字符串鏈接函數,至關於concat
的功能,將多行輸入,合併爲一個字符串,固然了hive中有字符串鏈接函數,這裏是舉例說明UDAF
的用法:
package com.js.dataclean.hive.udaf.hm2; import com.js.dataclean.utils.StringUtil; import org.apache.hadoop.hive.ql.exec.UDAF; import org.apache.hadoop.hive.ql.exec.UDAFEvaluator; /** * 實現字符串鏈接聚合的UDAF * @version v1.0.0 * @Author:keguang * @Date:2018/10/22 14:36 */ public class MutiStringConcat extends UDAF{ public static class SumState{ private String sumStr; } public static class SumEvaluator implements UDAFEvaluator{ SumState sumState; public SumEvaluator(){ super(); sumState = new SumState(); init(); } @Override public void init() { sumState.sumStr = ""; } /** * 來了一行數據 * @param s * @return */ public boolean iterate(String s){ if(!StringUtil.isNull(s)){ sumState.sumStr += s; } return true; } /** * 狀態傳遞 * @return */ public SumState terminatePartial() { return sumState; } /** * 子任務合併 * @param state * @return */ public boolean merge(SumState state){ if(state != null){ sumState.sumStr += state.sumStr; } return true; } /** * 返回最終結果 * @return */ public String terminate(){ return sumState.sumStr; } } }
用法,與udf同樣,仍是須要打包而且到hive cli中註冊使用。
關於UDAF開發注意點:
import org.apache.hadoop.hive.ql.exec.UDAF
以及org.apache.hadoop.hive.ql.exec.UDAFEvaluator
,這兩個包都是必須的Hive
自定義函數分爲臨時與永久函數,顧名思義,分別是臨時使用和永久有效使用的意思。
臨時函數,關閉會話就結束了生命週期,下次要想使用,須要從新註冊。
add jar /path/xx.jar(存儲在本地磁盤) // 臨時註冊UDF函數(hive會話生效) create temporary function 函數名 as '包名.類名';
刪除臨時函數:
永久函數一旦註冊,能夠在hive cli,遠程鏈接hiveserver2等地方永久使用,步驟爲:
先上傳jar包到HDFS
永久註冊:
CREATE FUNCTION 函數名 AS '包名.類名' USING JAR 'hdfs:///path/xxxx.jar';
注意:指定jar包路徑須要是hdfs
路徑。
drop function 數據庫名.函數名字;
新增的永久函數,好比在hive cli命令行註冊的,可能會在beeline或者hiveserver2遠程鏈接時,提示不存在該函數。解決辦法是,在沒法使用UDF的HiveServer2上,執行reload function
命令,將MetaStore
中新增的UDF信息同步到HiveServer2內存中。
UDF在hive中使用場景普遍,這裏列舉經常使用的使用場景。