本文將詳細介紹利用 ES 與 Hive 直接的數據交互;經過 Hive 外部表的方式,能夠快速將 ES 索引數據映射到 Hive 中,使用易於上手的 Hive SQL 實現對數據的進一步加工。java
Hive 在 Hadoop 生態系統中扮演着數據倉庫的角色,藉助 Hive 能夠方便地進行數據彙總、即席查詢以及分析存儲在 Hadoop 文件系統中的大型數據集。node
Hive 經過類 SQL 語言(HSQL)對 Hadoop 上的數據進行抽象,這樣用戶能夠經過 SQL 語句對數據進行定義、組織、操做和分析;在 Hive 中,數據集是經過表(定義了數據類型相關信息)進行定義的,用戶能夠經過內置運算符或用戶自定義函數(UDF)對數據進行加載、查詢和轉換。sql
官方推薦的安裝方式:apache
add jar
add jar /path/elasticsearch-hadoop.jar
複製代碼
hive.aux.jars.path
$ bin/hive --auxpath=/path/elasticsearch-hadoop.jar
複製代碼
hive-site.xml
)<property>
<name>hive.aux.jars.path</name>
<value>/path/elasticsearch-hadoop.jar</value>
<description>A comma separated list (with no spaces) of the jar files</description>
</property>
複製代碼
將 elasticsearch-hadoop.jar
複製到 Hive 的 auxlib 目錄中,而後重啓 Hive 便可。編程
cp elasticsearch-hadoop.jar /opt/cloudera/parcels/CDH/lib/hive/auxlib/
複製代碼
請務必注意,ES 中的類型是
index/_mapping
中對應的數據類型,非_source
裏面數據的類型。json
Hive type | Elasticsearch type |
---|---|
void | null |
boolean | boolean |
tinyint | byte |
smallint | short |
int | int |
bigint | long |
double | double |
float | float |
string | string |
binary | binary |
timestamp | date |
struct | map |
map | map |
array | array |
union | not supported (yet) |
decimal | string |
date | date |
varchar | string |
char | string |
CREATE EXTERNAL TABLE default.surface(
water_type STRING,
water_level STRING,
monitor_time TIMESTAMP,
sitecode STRING,
p492 DOUBLE,
p311 DOUBLE,
status STRING
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.resource'='ods_data_day_surface*/doc',
'es.query'='?q=status:001'
'es.nodes'='sky-01','es.port'='9200',
'es.net.http.auth.user'='sky',
'es.net.http.auth.pass'='jointsky',
'es.date.format'='yyyy-MM-dd HH:mm:ss',
'es.ser.reader.value.class'='com.jointsky.bigdata.hive.EsValueReader'
'es.mapping.names'='waterType:water_type,monitortime:monitor_time'
);
複製代碼
es.resource
用於設置 ES 資源的位置,默認該配置項同時設置了讀和寫的索引,固然也能夠分別設置讀寫索引名稱:bash
es.resource.read
:設置讀取位置;es.resource.write
:設置寫入位置。es.query
設置查詢過濾條件,目前支持 uri query
、query dsl
、external resource
三種設置方式。app
# uri (or parameter) query
es.query = ?q=costinl
# query dsl
es.query = { "query" : { "term" : { "user" : "costinl" } } }
# external resource
es.query = org/mypackage/myquery.json
複製代碼
es.mapping.names
用於設置 Hive 與 ES 的字段映射關係,若是不設置,則默認字段名不發生變化(即爲 data type 區域定義的字段名);此外該部分還用於定義 Hive 到 ES 的數據映射類型。elasticsearch
'es.mapping.names' = 'date:@timestamp , url:url_123 ')
複製代碼
其餘通用字段的說明請參考文章:使用 ES-Hadoop 將 Spark Streaming 流數據寫入 ESide
目前將 ES 的 date 類型映射到 Hive 的 TIMESTAMP 類型時,ES-Hadoop 組件只能識別時間戳格式或者標準的 XSD 格式的日期字符串:
@Override
protected Object parseDate(Long value, boolean richDate) {
return (richDate ? new TimestampWritable(new Timestamp(value)) : processLong(value));
}
@Override
protected Object parseDate(String value, boolean richDate) {
return (richDate ? new TimestampWritable(new Timestamp(DatatypeConverter.parseDateTime(value).getTimeInMillis())) : parseString(value));
}
複製代碼
關於 XSD(XML Schema Date/Time Datatypes)可用參考文章:www.w3schools.com/xml/schema_…
爲了兼容自定義的日期格式,須要編寫自定義的日期讀取類:
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.hive.HiveValueReader;
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.util.Date;
public class EsValueReader extends HiveValueReader {
private String dateFormat;
private static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
private static final String DEFAULT_DATE_FORMAT_MIN = "yyyy-MM-dd HH:mm";
private static final String DEFAULT_DATE_FORMAT_HOUR = "yyyy-MM-dd HH";
private static final String DEFAULT_DATE_FORMAT_DAY = "yyyy-MM-dd";
@Override
public void setSettings(Settings settings) {
super.setSettings(settings);
dateFormat = settings.getProperty("es.date.format");
}
@Override
protected Object parseDate(String value, boolean richDate) {
if (value != null && value.trim().length() > 0 && DEFAULT_DATE_FORMAT.equalsIgnoreCase(dateFormat)) {
if (richDate){
if (value.length() == 16){
return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_MIN).getTime()));
}
if (value.length() == 13){
return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_HOUR).getTime()));
}
if (value.length() == 10){
return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_DAY).getTime()));
}
return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT).getTime()));
}
return parseString(value);
}
return super.parseDate(value, richDate);
}
/**
* 解析日期,根據指定的格式進行解析.<br>
* 若是解析錯誤,則返回null
* @param stringDate 日期字串
* @param format 日期格式
* @return 日期型別
*/
private static Date parseDate(String stringDate, String format) {
if (stringDate == null) {
return null;
}
try {
return parseDate(stringDate, new String[] { format });
} catch (ParseException e) {
return null;
}
}
public static Date parseDate(String str, String... parsePatterns) throws ParseException {
return parseDateWithLeniency(str, parsePatterns, true);
}
private static Date parseDateWithLeniency(
String str, String[] parsePatterns, boolean lenient) throws ParseException {
if (str == null || parsePatterns == null) {
throw new IllegalArgumentException("Date and Patterns must not be null");
}
SimpleDateFormat parser = new SimpleDateFormat();
parser.setLenient(lenient);
ParsePosition pos = new ParsePosition(0);
for (String parsePattern : parsePatterns) {
String pattern = parsePattern;
if (parsePattern.endsWith("ZZ")) {
pattern = pattern.substring(0, pattern.length() - 1);
}
parser.applyPattern(pattern);
pos.setIndex(0);
String str2 = str;
if (parsePattern.endsWith("ZZ")) {
str2 = str.replaceAll("([-+][0-9][0-9]):([0-9][0-9])$", "$1$2");
}
Date date = parser.parse(str2, pos);
if (date != null && pos.getIndex() == str2.length()) {
return date;
}
}
throw new ParseException("Unable to parse the date: " + str, -1);
}
}
複製代碼
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.1.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>6.5.4</version>
<scope>provided</scope>
</dependency>
</dependencies>
複製代碼
代碼編寫完成後,將代碼進行打包,而後將打包好的 jar 包放置到 Hive 的 auxlib 目錄中,而後重啓 Hive 便可;該步驟與 ES-Hadoop 的安裝步驟同樣。
在編寫 Spark 程序從 Hive 中讀取數據的時候,須要添加對該包的依賴以及對 ES-Hadoop 的依賴。
通過上述的步驟,Hive 與 ES 的映射已經不成問題,若是想從 ES 中導出數據,可用藉助 HSQL insert into table XXX select * from XXXXX;
的方式從 ES 中讀取數據寫入到 HDFS;固然經過更爲複雜的 HSQL 能夠將數據進行處理,並將數據從新寫入到 ES 或者存儲到 HDFS。
充分利用 ES 的查詢、過濾和聚合,能夠很好的去服務數據標準化、數據清洗、數據分佈狀況等 ETL 流程。
Any Code,Code Any!
掃碼關注『AnyCode』,編程路上,一塊兒前行。