<dependencies> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0</version> </dependency> </dependencies> <build> <plugins> <!-- 配置java插件,指定版本 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <encoding>UTF-8</encoding> <source>1.8</source> <target>1.8</target> <showWarnings>true</showWarnings> </configuration> </plugin> </plugins> </build>
import utils.CommonUtils; import org.apache.hadoop.hive.ql.exec.UDF; /** * [@Author](https://my.oschina.net/arthor) liufu * @CreateTime 2017/5/4 14:13 * @Descrition */ public class AllTracksUDF extends UDF { // 重載方法 // 處理Int類型字段 // 及時要插入的表中字段爲int、bigint類型等,均可以用string類型插入進去 // int類型數據,在傳入參數的時候直接傳遞數字便可,好比:evaluate(power, 1) public Integer evaluate(String column, int columnType) { String longValue = getStringValue(column); if(longValue != null){ return Integer.parseInt(longValue); } return null; } // 處理Long類型字段,包括時間 // long類型參數,傳遞columnType的時候要加上"L", 好比:evaluate(startTime, 1L) public Long evaluate(String column, long columnType) { String longValue = getStringValue(column); if(longValue != null){ // 1表示是時間,而時間爲秒,要轉化爲毫秒,*1000 if(columnType == 1){ return Long.parseLong(longValue) * 1000; } return Long.parseLong(longValue); } return null; } // 處理String類型字段 public String evaluate(String column) { return getStringValue(column); } // 處理兩個字段,好比xpoint 和 ypoing的轉換,判空和拼接 public String evaluate(String column1, String column2) { return convertLatLon(column1, column2); } /** * [@param](https://my.oschina.net/u/2303379) value * [@return](https://my.oschina.net/u/556800) * 獲取string類型的字段,判空處理 */ private String getStringValue(String value) { if (value != null && !"MULL".equalsIgnoreCase(value) && !"NULL".equalsIgnoreCase(value) && value.trim().length() != 0) { return value; } return null; } /** * @param lat * @param lon * @return * 將經度、維度拼接 */ private String convertLatLon(String lat, String lon) { if (lat == null | lon == null || "MULL".equalsIgnoreCase(lat) || "MULL".equalsIgnoreCase(lon) || "NULL".equalsIgnoreCase(lat) || "NULL".equalsIgnoreCase(lon) || "0".equalsIgnoreCase(lat) || "0".equalsIgnoreCase(lon)) { return "0,0"; } // 經緯度轉換 if (CommonUtils.parseDouble(lat) > CommonUtils.parseDouble(lon)) { return lon + "," + lat; } else { return lat + "," + lon; } } }
/** * 讀取hive的數據,而後將每條數據組合成一個json字符串,經過下面udf函數方法發送到kafka * <p> * 經過測試驗證,Hive2KafkaUDF類在每次mr任務中,只會建立一次,因此producer能夠作成單例 * * @Author liufu * @E-mail: 1151224929@qq.com * @CreateTime 2019/6/5 18:06 */ @Description(name = "hive2kafka", value = "_FUNC_(string, topic, map<string,string>) - Return ret ") public class Hive2KafkaUDF extends UDF { private static Gson gson = new GsonBuilder().serializeNulls().create(); private KafkaProducer<String, String> producer; public boolean evaluate(String kafkaParams, String topic, Map<String, String> dataMap) { KafkaProducer producerTemp = getProducer(kafkaParams); producerTemp.send(new ProducerRecord(topic, null, gson.toJson(dataMap))); return true; } private KafkaProducer getProducer(String kafkaParams) { if (producer == null) { synchronized ("getProducer") { if (producer == null) { Properties props = gson.fromJson(kafkaParams, Properties.class); producer = new KafkaProducer<>(props); } } } return producer; } }
3.二、 如何使用這個UDFjava
利用map函數將數據組裝成一個Map對象 select hive2kafka( "{'bootstrap.servers': 'gawh243:9092', 'acks': 'all', 'key.serializer': 'org.apache.kafka.common.serialization.StringSerializer', 'value.serializer': 'org.apache.kafka.common.serialization.StringSerializer'}", 'together001', // map函數,左邊的name是最終的字段值,功能等同於username as name map('name',username,'age',age) ) from qwrenzixing.visual_deduction_kinship_relation
4.一、打包成jar包,能夠放在任何可以訪問到的地方,好比hdfs://,本地文件系統file://apache
4.二、加載jarjson
hive> add jar /root/hive2kafka.udf-1.0.jar; Added [/root/elasticsearce-hadoop/hive2kafka.udf-1.0.jar] to class path Added resources: [/root/elasticsearce-hadoop/hive2kafka.udf-1.0.jar] hive> create temporary function hive2kafka as 'com.study.Hive2KafkaUDF'; hive> create temporary function allTracksudf as 'com.study.AllTracksUDF'; 或者直接使用遠端jar來建立,不必定須要先add jar hive> create temporary function hive2kafka as 'com.study.Hive2KafkaUDF' using jar 'hdfs://rsb:8082/udf/hive2es.udf-1.0.jar'
5.一、第一個函數bootstrap
select allTracksudf(create_time, 1L) as create_time from t_a;maven
5.二、第二個函數函數
利用map函數將數據組裝成一個Map對象 select hive2kafka( "{'bootstrap.servers': 'gawh243:9092', 'acks': 'all', 'key.serializer': 'org.apache.kafka.common.serialization.StringSerializer', 'value.serializer': 'org.apache.kafka.common.serialization.StringSerializer'}", 'together001', // map函數,左邊的name是最終的字段值,功能等同於username as name map('name',username,'age',age) ) from testDb.t_b;