使用 ES-Hadoop 將 Spark Streaming 流數據寫入 ES

本文將詳細介紹利用 ES-Hadoop 將 Spark 處理的數據寫入到 ES 中。html

1、開發環境

一、組件版本

  • CDH 集羣版本:6.0.1
  • Spark 版本:2.2.0
  • Kafka 版本:1.0.1
  • ES 版本:6.5.1

二、Maven 依賴

<!-- scala -->
<dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
  <version>2.11.8</version>
</dependency>

<!-- spark 基礎依賴 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- spark-streaming 相關依賴 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- spark-streaming-kafka 相關依賴 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- zookeeper 相關依賴 -->
<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.4.5-cdh6.0.1</version>
</dependency>

<!-- Spark-ES 相關依賴 -->
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark-20_2.11</artifactId>
    <version>6.5.4</version>
</dependency>

<!-- Spark-ES 依賴的 HTTP 傳輸組件 -->
<dependency>
    <groupId>commons-httpclient</groupId>
    <artifactId>commons-httpclient</artifactId>
    <version>3.1</version>
</dependency>
複製代碼

三、注意事項

若是使用 CDH 版本的 Spark,則在調試及實際部署運行的時候會出現下面的錯誤:java

java.lang.ClassNotFoundException: org.apache.commons.httpclient.protocol.Protocol
複製代碼

很顯然是缺乏 httpclient 相關依賴形成的,對比開源版本與 CDH 版本的 Spark,發現開源版本多出了 commons-httpclient-3.1.jar,所以上述 Maven 的 pom 文件添加上對其依賴便可。node

2、ES-Hadoop

一、簡介

ES-Hadoop 實現了 Hadoop 生態(Hive、Spark、Pig、Storm 等)與 ElasticSearch 之間的數據交互,藉助該組件能夠將 Hadoop 生態的數據寫入到 ES 中,而後藉助 ES 對數據快速進行搜索、過濾、聚合等分析,進一步能夠經過 Kibana 來實現數據的可視化。apache

同時,也能夠藉助 ES 做爲數據存儲層(相似數倉的 Stage 層或者 ODS 層),而後藉助 Hadoop 生態的數據處理工具(Hive、MR、Spark 等)將處理後的數據寫入到 HDFS 中。編程

使用 ES 作爲原始數據的存儲層,能夠很好的進行數據去重、數據質量分析,還能夠提供一些即時的數據服務,例如趨勢展現、彙總分析等。json

對 Hadoop 數據進行交互分析

二、組成

ES-Hadoop 是一個整合性質的組件,它封裝了 Hadoop 生態的多種組件與 ES 交互的 API,若是你只須要部分功能,可使用細分的組件:bash

  • elasticsearch-hadoop-mr
  • elasticsearch-hadoop-hive
  • elasticsearch-hadoop-pig
  • elasticsearch-spark-20_2.10
  • elasticsearch-hadoop-cascading
  • elasticsearch-storm

3、elasticsearch-spark

一、配置

es-hadoop 核心是經過 es 提供的 restful 接口來進行數據交互,下面是幾個重要配置項,更多配置信息請參閱官方說明服務器

  • es.nodes:須要鏈接的 es 節點(不須要配置所有節點,默認會自動發現其餘可用節點);
  • es.port:節點 http 通信端口;
  • es.nodes.discovery:默認爲 true,表示自動發現集羣可用節點;
  • es.nodes.wan.only:默認爲 false,設置爲 true 以後,會關閉節點的自動 discovery,只使用 es.nodes 聲明的節點進行數據讀寫操做;若是你須要經過域名進行數據訪問,則設置該選項爲 true,不然請務必設置爲 false;
  • es.index.auto.create:是否自動建立不存在的索引,默認爲 true;
  • es.net.http.auth.user:Basic 認證的用戶名;
  • es.net.http.auth.pass:Basic 認證的密碼。
val conf = new SparkConf().setIfMissing("spark.app.name","rt-data-loader").setIfMissing("spark.master", "local[5]")
conf.set(ConfigurationOptions.ES_NODES, esNodes)
conf.set(ConfigurationOptions.ES_PORT, esPort)
conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")
conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true")
conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false")
conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser)
conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPwd)
conf.set("es.write.rest.error.handlers", "ignoreConflict")
conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")
複製代碼

特別須要注意的配置項爲 es.nodes.wan.only,因爲在雲服務器環境中,配置文件使用的通常爲內網地址,而本地調試的時候通常使用外網地址,這樣將 es.nodes 配置爲外網地址後,最後會出現節點找不到的問題(因爲會使用節點配置的內網地址去進行鏈接):restful

org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: No data nodes with HTTP-enabled available; 
node discovery is disabled and none of nodes specified fit the criterion [xxx.xx.x.xx:9200]
複製代碼

此時將 es.nodes.wan.only 設置爲 true 便可。推薦開發測試時使用域名,集羣部署的時候將該選項置爲 falseapp

二、屏蔽寫入衝突

若是數據存在重複,寫入 ES 時每每會出現數據寫入衝突的錯誤,此時有兩種解決方法。

方法一:設置 es.write.operation 爲 upsert,這樣達到的效果爲若是存在則更新,不存在則進行插入,該配置項默認值爲 index。

方法二:自定義衝突處理類,相似上述配置中設置了自定義的 error.handlers,經過自定義類來處理相關錯誤,例如忽略衝突等:

public class IgnoreConflictsHandler extends BulkWriteErrorHandler {
    public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector) throws Exception {
        if (entry.getResponseCode() == 409) {
            StaticLog.warn("Encountered conflict response. Ignoring old data.");
            return HandlerResult.HANDLED;
        }
        return collector.pass("Not a conflict response code.");
    }
}
複製代碼

方法二能夠屏蔽寫入版本比預期的小之類的版本衝突問題。

三、RDD 寫入 ES

EsSpark 提供了兩種主要方法來實現數據寫入:

  • saveToEs :RDD 內容爲 Seq[Map],即一個 Map 對象集合,每一個 Map 對應一個文檔;
  • saveJsonToEs:RDD 內容爲 Seq[String],即一個 String 集合,每一個 String 是一個 JSON 字符串,表明一條記錄(對應 ES 的 _source)。

數據寫入能夠指定不少配置信息,例如:

  • es.resource:設置寫入的索引和類型,索引和類型名均支持動態變量
  • es.mapping.id:設置文檔 _id 對應的字段名;
  • es.mapping.exclude:設置寫入時忽略的字段,支持通配符。
val itemRdd = rdd.flatMap(line => {
    val topic = line.topic()
    println("正在處理:" + topic + " - " + line.partition() + " : " + line.offset())
    val jsonArray = JSON.parseArray(line.value()).toJavaList(classOf[JSONObject]).asScala
    val resultMap = jsonArray.map(jsonObj =>{
      var tmpId = "xxx"
      var tmpIndex = "xxxxxx"
      jsonObj.put("myTmpId", tmpId)
      jsonObj.put("myTmpIndex", tmpIndex)
      jsonObj.getInnerMap
    })
    resultMap
})
val mapConf = Map(
    ("es.resource" , "{myTmpIndex}/doc"),
    ("es.write.operation" , "upsert"),
    ("es.mapping.id" , "myTmpId"),
    ("es.mapping.exclude" , "myTmp*")
)
EsSpark.saveToEs(itemRdd, mapConf)
複製代碼

es.mapping.exclude 只支持 RDD 爲 Map 集合(saveToEs),當爲 Json 字符串集合時(saveJsonToEs)會提示不支持的錯誤信息;這個配置項很是有用,例如 myTmpId 做爲文檔 id,所以沒有必要重複存儲到 _source 裏面了,能夠配置到這個配置項,將其從 _source 中排除。


Any Code,Code Any!

掃碼關注『AnyCode』,編程路上,一塊兒前行。

相關文章
相關標籤/搜索