Hive 與 ElasticSearch 的數據交互

本文將詳細介紹利用 ES 與 Hive 直接的數據交互;經過 Hive 外部表的方式,能夠快速將 ES 索引數據映射到 Hive 中,使用易於上手的 Hive SQL 實現對數據的進一步加工。java

1、開發環境

一、組件版本

  • CDH 集羣版本:6.0.1
  • ES 版本:6.5.1
  • Hive 版本:2.1.1
  • ES-Hadoop 版本:6.5.1

二、Hive 簡介

Hive 在 Hadoop 生態系統中扮演着數據倉庫的角色,藉助 Hive 能夠方便地進行數據彙總、即席查詢以及分析存儲在 Hadoop 文件系統中的大型數據集。node

Hive 經過類 SQL 語言(HSQL)對 Hadoop 上的數據進行抽象,這樣用戶能夠經過 SQL 語句對數據進行定義、組織、操做和分析;在 Hive 中,數據集是經過表(定義了數據類型相關信息)進行定義的,用戶能夠經過內置運算符或用戶自定義函數(UDF)對數據進行加載、查詢和轉換。sql

三、Hive 安裝 ES-Hadoop

官方推薦的安裝方式:apache

使用 add jar

add jar /path/elasticsearch-hadoop.jar
複製代碼

使用 hive.aux.jars.path

$ bin/hive --auxpath=/path/elasticsearch-hadoop.jar
複製代碼

修改配置(hive-site.xml)

<property>
  <name>hive.aux.jars.path</name>
  <value>/path/elasticsearch-hadoop.jar</value>
  <description>A comma separated list (with no spaces) of the jar files</description>
</property>
複製代碼

CDH6.X 推薦的安裝方法

elasticsearch-hadoop.jar 複製到 Hive 的 auxlib 目錄中,而後重啓 Hive 便可。編程

cp elasticsearch-hadoop.jar /opt/cloudera/parcels/CDH/lib/hive/auxlib/
複製代碼

2、Hive 與 ElasticSearch 的數據交互

一、數據類型對照表

請務必注意,ES 中的類型是 index/_mapping 中對應的數據類型,非 _source 裏面數據的類型。json

Hive type Elasticsearch type
void null
boolean boolean
tinyint byte
smallint short
int int
bigint long
double double
float float
string string
binary binary
timestamp date
struct map
map map
array array
union not supported (yet)
decimal string
date date
varchar string
char string

二、創建 Hive 外部表

CREATE EXTERNAL TABLE default.surface(
    water_type STRING,
    water_level STRING,
    monitor_time TIMESTAMP,
    sitecode STRING,
    p492 DOUBLE,
    p311 DOUBLE,
    status STRING
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
    'es.resource'='ods_data_day_surface*/doc',
    'es.query'='?q=status:001'
    'es.nodes'='sky-01','es.port'='9200',
    'es.net.http.auth.user'='sky',
    'es.net.http.auth.pass'='jointsky',
    'es.date.format'='yyyy-MM-dd HH:mm:ss',
    'es.ser.reader.value.class'='com.jointsky.bigdata.hive.EsValueReader'
    'es.mapping.names'='waterType:water_type,monitortime:monitor_time'
);
複製代碼

三、配置項說明

es.resource

es.resource 用於設置 ES 資源的位置,默認該配置項同時設置了讀和寫的索引,固然也能夠分別設置讀寫索引名稱:bash

  • es.resource.read:設置讀取位置;
  • es.resource.write:設置寫入位置。

es.query

es.query 設置查詢過濾條件,目前支持 uri queryquery dslexternal resource 三種設置方式。app

# uri (or parameter) query
es.query = ?q=costinl

# query dsl
es.query = { "query" : { "term" : { "user" : "costinl" } } }

# external resource
es.query = org/mypackage/myquery.json
複製代碼

es.mapping.names

es.mapping.names 用於設置 Hive 與 ES 的字段映射關係,若是不設置,則默認字段名不發生變化(即爲 data type 區域定義的字段名);此外該部分還用於定義 Hive 到 ES 的數據映射類型。elasticsearch

'es.mapping.names' = 'date:@timestamp , url:url_123 ')
複製代碼

其餘通用字段的說明請參考文章:使用 ES-Hadoop 將 Spark Streaming 流數據寫入 ESide

四、自定義日期類型解析

目前將 ES 的 date 類型映射到 Hive 的 TIMESTAMP 類型時,ES-Hadoop 組件只能識別時間戳格式或者標準的 XSD 格式的日期字符串:

@Override
protected Object parseDate(Long value, boolean richDate) {
    return (richDate ? new TimestampWritable(new Timestamp(value)) : processLong(value));
}

@Override
protected Object parseDate(String value, boolean richDate) {
    return (richDate ? new TimestampWritable(new Timestamp(DatatypeConverter.parseDateTime(value).getTimeInMillis())) : parseString(value));
}
複製代碼

關於 XSD(XML Schema Date/Time Datatypes)可用參考文章:www.w3schools.com/xml/schema_…

爲了兼容自定義的日期格式,須要編寫自定義的日期讀取類:

import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.hive.HiveValueReader;

import java.sql.Timestamp;
import java.text.ParseException;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.util.Date;

public class EsValueReader extends HiveValueReader {
    private String dateFormat;
    private static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private static final String DEFAULT_DATE_FORMAT_MIN = "yyyy-MM-dd HH:mm";
    private static final String DEFAULT_DATE_FORMAT_HOUR = "yyyy-MM-dd HH";
    private static final String DEFAULT_DATE_FORMAT_DAY = "yyyy-MM-dd";

    @Override
    public void setSettings(Settings settings) {
        super.setSettings(settings);
        dateFormat = settings.getProperty("es.date.format");
    }

    @Override
    protected Object parseDate(String value, boolean richDate) {
        if (value != null && value.trim().length() > 0 && DEFAULT_DATE_FORMAT.equalsIgnoreCase(dateFormat)) {
            if (richDate){
                if (value.length() == 16){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_MIN).getTime()));
                }
                if (value.length() == 13){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_HOUR).getTime()));
                }
                if (value.length() == 10){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_DAY).getTime()));
                }
                return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT).getTime()));
            }
            return parseString(value);
        }
        return super.parseDate(value, richDate);
    }

    /**
     * 解析日期,根據指定的格式進行解析.<br>
     * 若是解析錯誤,則返回null
     * @param stringDate 日期字串
     * @param format 日期格式
     * @return 日期型別
     */
    private static Date parseDate(String stringDate, String format) {
        if (stringDate == null) {
            return null;
        }
        try {
            return parseDate(stringDate, new String[] { format });
        } catch (ParseException e) {
            return null;
        }
    }

    public static Date parseDate(String str, String... parsePatterns) throws ParseException {
        return parseDateWithLeniency(str, parsePatterns, true);
    }

    private static Date parseDateWithLeniency(
            String str, String[] parsePatterns, boolean lenient) throws ParseException {
        if (str == null || parsePatterns == null) {
            throw new IllegalArgumentException("Date and Patterns must not be null");
        }

        SimpleDateFormat parser = new SimpleDateFormat();
        parser.setLenient(lenient);
        ParsePosition pos = new ParsePosition(0);
        for (String parsePattern : parsePatterns) {
            String pattern = parsePattern;
            if (parsePattern.endsWith("ZZ")) {
                pattern = pattern.substring(0, pattern.length() - 1);
            }
            parser.applyPattern(pattern);
            pos.setIndex(0);
            String str2 = str;
            if (parsePattern.endsWith("ZZ")) {
                str2 = str.replaceAll("([-+][0-9][0-9]):([0-9][0-9])$", "$1$2");
            }
            Date date = parser.parse(str2, pos);
            if (date != null && pos.getIndex() == str2.length()) {
                return date;
            }
        }
        throw new ParseException("Unable to parse the date: " + str, -1);
    }
}
複製代碼

上述代碼的 Maven 依賴

<dependencies>
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-hadoop</artifactId>
        <version>6.5.4</version>
        <scope>provided</scope>
    </dependency>
</dependencies>
複製代碼

自定義日期解析包的部署

代碼編寫完成後,將代碼進行打包,而後將打包好的 jar 包放置到 Hive 的 auxlib 目錄中,而後重啓 Hive 便可;該步驟與 ES-Hadoop 的安裝步驟同樣。

在編寫 Spark 程序從 Hive 中讀取數據的時候,須要添加對該包的依賴以及對 ES-Hadoop 的依賴。

3、總結

通過上述的步驟,Hive 與 ES 的映射已經不成問題,若是想從 ES 中導出數據,可用藉助 HSQL insert into table XXX select * from XXXXX; 的方式從 ES 中讀取數據寫入到 HDFS;固然經過更爲複雜的 HSQL 能夠將數據進行處理,並將數據從新寫入到 ES 或者存儲到 HDFS。

充分利用 ES 的查詢、過濾和聚合,能夠很好的去服務數據標準化、數據清洗、數據分佈狀況等 ETL 流程。


Any Code,Code Any!

掃碼關注『AnyCode』,編程路上,一塊兒前行。

相關文章
相關標籤/搜索