Spark讀寫ES

本文主要介紹spark sql讀寫es、structured streaming寫入es以及一些參數的配置html

ES官方提供了對spark的支持,能夠直接經過spark讀寫es,具體能夠參考ES Spark Support文檔(文末有地址)。node

如下是pom依賴,具體版本能夠根據本身的es和spark版本進行選擇:sql

<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch-spark-20_2.11</artifactId>
  <version>6.0.0</version>
</dependency>

Spark SQL - ES

主要提供了兩種讀寫方式:一種是經過DataFrameReader/Writer傳入ES Source實現;另外一種是直接讀寫DataFrame實現。在實現前,還要列一些相關的配置:bootstrap

配置

參數 描述
es.nodes.wan.only true or false,在此模式下,鏈接器禁用發現,而且全部操做經過聲明的es.nodes鏈接
es.nodes ES節點
es.port ES端口
es.index.auto.create true or false,是否自動建立index
es.resource 資源路徑
es.mapping.id es會爲每一個文檔分配一個全局id。若是不指定此參數將隨機生成;若是指定的話按指定的來
es.batch.size.bytes es批量API的批量寫入的大小(以字節爲單位)
es.batch.write.refresh 批量更新完成後是否調用索引刷新
es.read.field.as.array.include 讀es的時候,指定將哪些字段做爲數組類型

列了一些經常使用的配置,更多配置查看ES Spark Configuration文檔數組

DataFrameReader讀ES

import org.elasticsearch.spark.sql._
val options = Map(
  "es.nodes.wan.only" -> "true",
  "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
  "es.port" -> "9200",
  "es.read.field.as.array.include" -> "arr1, arr2"
)
val df = spark
    .read
    .format("es")
    .options(options)
    .load("index1/info")
df.show()

DataFrameWriter寫ES

import org.elasticsearch.spark.sql._
val options = Map(
  "es.index.auto.create" -> "true",
  "es.nodes.wan.only" -> "true",
  "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
  "es.port" -> "9200",
  "es.mapping.id" -> "id"
)

val sourceDF = spark.table("hive_table")
sourceDF
  .write
  .format("org.elasticsearch.spark.sql")
  .options(options)
  .mode(SaveMode.Append)
  .save("hive_table/docs")

讀DataFrame

jar包中提供了esDF()方法能夠直接讀es數據爲DataFrame,如下是源碼截圖。
在這裏插入圖片描述
簡單說一下各個參數:app

resource:資源路徑,例如hive_table/docselasticsearch

cfg:一些es的配置,和上面代碼中的options差很少ide

query:指定DSL查詢語句來過濾要讀的數據,例如"?q=user_group_id:3"表示讀user_group_id爲3的數據oop

val options = Map(
  "pushdown" -> "true",
  "es.nodes.wan.only" -> "true",
  "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
  "es.port" -> "9200"
)

val df = spark.esDF("hive_table/docs", "?q=user_group_id:3", options)
df.show()

寫DataFrame

jar包中提供了saveToEs()方法能夠將DataFrame寫入ES,如下是源碼截圖。
在這裏插入圖片描述
resource:資源路徑,例如hive_table/docsui

cfg:一些es的配置,和上面代碼中的options差很少

import org.elasticsearch.spark.sql._ 
val options = Map(
  "es.index.auto.create" -> "true",
  "es.nodes.wan.only" -> "true",
  "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
  "es.port" -> "9200",
  "es.mapping.id" -> "zip_record_id"
)
val df = spark.table("hive_table")
df.saveToEs("hive_table/docs", options)

Structured Streaming - ES

es也提供了對Structured Streaming的集成,使用Structured Streaming能夠實時的寫入ES。

import org.elasticsearch.spark.sql._
val options = Map(
  "es.index.auto.create" -> "true",
  "es.nodes.wan.only" -> "true",
  "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
  "es.port" -> "9200",
  "es.mapping.id" -> "zip_record_id"
)
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "a:9092,b:9092,c:9092")
  .option("subscribe", "test")
  .option("failOnDataLoss", "false")
  .load()
df
  .writeStream
  .outputMode(OutputMode.Append())
  .format("es")
  .option("checkpointLocation", s"hdfs://hadoop:8020/checkpoint/test01")
  .options(options)
  .start("test_streaming/docs")
  .awaitTermination()

可能遇到的問題

數組類型轉換錯誤

報錯信息:type (scala.collection.convert.Wrappers.JListWrapper) cannot be converted to the string type

由於es的mapping只會記錄字段的類型,不會記錄是不是數組,也就是說若是是int數組,es的mapping只是記錄成int。

能夠在option中加一個es.read.field.as.array.include,標明數組字段

es.read.field.as.array.include" -> "數組字段的名字"

若是是object裏的某個字段,寫成"object名字.數組字段名字",若是是多個字段,字段名之間用逗號分隔

Timestamp被轉爲Long

DataFrame的Timestamp類型數據寫入ES後,就變成了Number類型。

這可能不算個問題,時間戳本質上就是Long類型的毫秒值;可是在Hive中Timestamp是"yyyy-MM-dd HH:mm:ss"的類型,我的以爲很彆扭。

嘗試將Timestamp類型字段轉成Date類型,寫入ES後仍是Number類型。網上搜了一圈也沒有什麼好的辦法,你們有什麼解決辦法歡迎交流。

References

ES Spark Support文檔:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark

ES Spark Configuration: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html

end.


我的公衆號:碼農峯,定時推送行業資訊,持續發佈原創技術文章,歡迎你們關注。

相關文章
相關標籤/搜索