五、Hive的自定義UDF函數

一、pom.xml引入依賴及打包

<dependencies>
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>1.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.6.0</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <!-- 配置java插件,指定版本 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <encoding>UTF-8</encoding>
                <source>1.8</source>
                <target>1.8</target>
                <showWarnings>true</showWarnings>
            </configuration>
        </plugin>
    </plugins>
</build>

二、對單個字段,或者多個字段進行處理

import utils.CommonUtils;
import org.apache.hadoop.hive.ql.exec.UDF;

/**
 * [@Author](https://my.oschina.net/arthor) liufu
 * @CreateTime 2017/5/4 14:13
 * @Descrition
 */
public class AllTracksUDF extends UDF {

    // 重載方法

    // 處理Int類型字段
    // 及時要插入的表中字段爲int、bigint類型等,均可以用string類型插入進去
    // int類型數據,在傳入參數的時候直接傳遞數字便可,好比:evaluate(power, 1)
    public Integer evaluate(String column, int columnType) {
        String longValue = getStringValue(column);
        if(longValue != null){
            return Integer.parseInt(longValue);
        }
        return null;
    }

    // 處理Long類型字段,包括時間
    // long類型參數,傳遞columnType的時候要加上"L", 好比:evaluate(startTime, 1L)
    public Long evaluate(String column, long columnType) {
        String longValue = getStringValue(column);
        if(longValue != null){
            // 1表示是時間,而時間爲秒,要轉化爲毫秒,*1000
            if(columnType == 1){
                return Long.parseLong(longValue) * 1000;
            }
            return Long.parseLong(longValue);
        }
        return null;
    }

    // 處理String類型字段
    public String evaluate(String column) {
        return getStringValue(column);
    }

    // 處理兩個字段,好比xpoint 和 ypoing的轉換,判空和拼接
    public String evaluate(String column1, String column2) {
        return convertLatLon(column1, column2);
    }

    /**
     * [@param](https://my.oschina.net/u/2303379) value
     * [@return](https://my.oschina.net/u/556800)
     * 獲取string類型的字段,判空處理
     */
    private String getStringValue(String value) {
        if (value != null && !"MULL".equalsIgnoreCase(value) && !"NULL".equalsIgnoreCase(value) && value.trim().length() != 0) {
            return value;
        }
        return null;
    }

    /**
     * @param lat
     * @param lon
     * @return
     * 將經度、維度拼接
     */
    private String convertLatLon(String lat, String lon) {
        if (lat == null | lon == null || "MULL".equalsIgnoreCase(lat) || "MULL".equalsIgnoreCase(lon) || "NULL".equalsIgnoreCase(lat) || "NULL".equalsIgnoreCase(lon) || "0".equalsIgnoreCase(lat) || "0".equalsIgnoreCase(lon)) {
            return "0,0";
        }
        // 經緯度轉換
        if (CommonUtils.parseDouble(lat) > CommonUtils.parseDouble(lon)) {
            return lon + "," + lat;
        } else {
            return lat + "," + lon;
        }
    }
}

三、利用map函數,將一條數據組裝成Map,而後傳遞進來

/**
 * 讀取hive的數據,而後將每條數據組合成一個json字符串,經過下面udf函數方法發送到kafka
 * <p>
 * 經過測試驗證,Hive2KafkaUDF類在每次mr任務中,只會建立一次,因此producer能夠作成單例
 *
 * @Author liufu
 * @E-mail: 1151224929@qq.com
 * @CreateTime 2019/6/5  18:06
 */
@Description(name = "hive2kafka", value = "_FUNC_(string, topic, map<string,string>) - Return ret ")
public class Hive2KafkaUDF extends UDF {
    private static Gson gson = new GsonBuilder().serializeNulls().create();
    private KafkaProducer<String, String> producer;

    public boolean evaluate(String kafkaParams, String topic, Map<String, String> dataMap) {
        KafkaProducer producerTemp = getProducer(kafkaParams);
        producerTemp.send(new ProducerRecord(topic, null, gson.toJson(dataMap)));
        return true;
    }

    private KafkaProducer getProducer(String kafkaParams) {
        if (producer == null) {
            synchronized ("getProducer") {
                if (producer == null) {
                    Properties props = gson.fromJson(kafkaParams, Properties.class);
                    producer = new KafkaProducer<>(props);
                }
            }
        }
        return producer;
    }
}
  • 3.二、 如何使用這個UDFjava

    利用map函數將數據組裝成一個Map對象
      select hive2kafka(
    
      "{'bootstrap.servers': 'gawh243:9092', 'acks': 'all', 'key.serializer': 'org.apache.kafka.common.serialization.StringSerializer', 'value.serializer': 'org.apache.kafka.common.serialization.StringSerializer'}", 
    
      'together001', 
    
      // map函數,左邊的name是最終的字段值,功能等同於username as name
      map('name',username,'age',age)
    
      ) from qwrenzixing.visual_deduction_kinship_relation

四、建立臨時函數

  • 4.一、打包成jar包,能夠放在任何可以訪問到的地方,好比hdfs://,本地文件系統file://apache

  • 4.二、加載jarjson

    hive> add jar /root/hive2kafka.udf-1.0.jar;
    
      Added [/root/elasticsearce-hadoop/hive2kafka.udf-1.0.jar] to class path
      Added resources: [/root/elasticsearce-hadoop/hive2kafka.udf-1.0.jar]
    
      hive> create temporary function hive2kafka as 'com.study.Hive2KafkaUDF';
    
      hive> create temporary function allTracksudf as 'com.study.AllTracksUDF';
    
      或者直接使用遠端jar來建立,不必定須要先add  jar
      hive> create temporary function hive2kafka as 'com.study.Hive2KafkaUDF' 
      	  using jar 'hdfs://rsb:8082/udf/hive2es.udf-1.0.jar'

五、使用臨時函數

  • 5.一、第一個函數bootstrap

    select allTracksudf(create_time, 1L) as create_time from t_a;maven

  • 5.二、第二個函數函數

    利用map函數將數據組裝成一個Map對象
      select hive2kafka(
    
      "{'bootstrap.servers': 'gawh243:9092', 'acks': 'all', 'key.serializer': 'org.apache.kafka.common.serialization.StringSerializer', 'value.serializer': 'org.apache.kafka.common.serialization.StringSerializer'}", 
    
      'together001', 
    
      // map函數,左邊的name是最終的字段值,功能等同於username as name
      map('name',username,'age',age)
    
      ) from testDb.t_b;
相關文章
相關標籤/搜索