本文主要介紹spark sql讀寫es、structured streaming寫入es以及一些參數的配置html
ES官方提供了對spark的支持,能夠直接經過spark讀寫es,具體能夠參考ES Spark Support文檔(文末有地址)。node
如下是pom依賴,具體版本能夠根據本身的es和spark版本進行選擇:sql
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>6.0.0</version> </dependency>
主要提供了兩種讀寫方式:一種是經過DataFrameReader/Writer傳入ES Source實現;另外一種是直接讀寫DataFrame實現。在實現前,還要列一些相關的配置:bootstrap
參數 | 描述 |
---|---|
es.nodes.wan.only | true or false,在此模式下,鏈接器禁用發現,而且全部操做經過聲明的es.nodes鏈接 |
es.nodes | ES節點 |
es.port | ES端口 |
es.index.auto.create | true or false,是否自動建立index |
es.resource | 資源路徑 |
es.mapping.id | es會爲每一個文檔分配一個全局id。若是不指定此參數將隨機生成;若是指定的話按指定的來 |
es.batch.size.bytes | es批量API的批量寫入的大小(以字節爲單位) |
es.batch.write.refresh | 批量更新完成後是否調用索引刷新 |
es.read.field.as.array.include | 讀es的時候,指定將哪些字段做爲數組類型 |
列了一些經常使用的配置,更多配置查看ES Spark Configuration文檔數組
import org.elasticsearch.spark.sql._ val options = Map( "es.nodes.wan.only" -> "true", "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009", "es.port" -> "9200", "es.read.field.as.array.include" -> "arr1, arr2" ) val df = spark .read .format("es") .options(options) .load("index1/info") df.show()
import org.elasticsearch.spark.sql._ val options = Map( "es.index.auto.create" -> "true", "es.nodes.wan.only" -> "true", "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009", "es.port" -> "9200", "es.mapping.id" -> "id" ) val sourceDF = spark.table("hive_table") sourceDF .write .format("org.elasticsearch.spark.sql") .options(options) .mode(SaveMode.Append) .save("hive_table/docs")
jar包中提供了esDF()方法能夠直接讀es數據爲DataFrame,如下是源碼截圖。
簡單說一下各個參數:app
resource:資源路徑,例如hive_table/docselasticsearch
cfg:一些es的配置,和上面代碼中的options差很少ide
query:指定DSL查詢語句來過濾要讀的數據,例如"?q=user_group_id:3"表示讀user_group_id爲3的數據oop
val options = Map( "pushdown" -> "true", "es.nodes.wan.only" -> "true", "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009", "es.port" -> "9200" ) val df = spark.esDF("hive_table/docs", "?q=user_group_id:3", options) df.show()
jar包中提供了saveToEs()方法能夠將DataFrame寫入ES,如下是源碼截圖。
resource:資源路徑,例如hive_table/docsui
cfg:一些es的配置,和上面代碼中的options差很少
import org.elasticsearch.spark.sql._ val options = Map( "es.index.auto.create" -> "true", "es.nodes.wan.only" -> "true", "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009", "es.port" -> "9200", "es.mapping.id" -> "zip_record_id" ) val df = spark.table("hive_table") df.saveToEs("hive_table/docs", options)
es也提供了對Structured Streaming的集成,使用Structured Streaming能夠實時的寫入ES。
import org.elasticsearch.spark.sql._ val options = Map( "es.index.auto.create" -> "true", "es.nodes.wan.only" -> "true", "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009", "es.port" -> "9200", "es.mapping.id" -> "zip_record_id" ) val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "a:9092,b:9092,c:9092") .option("subscribe", "test") .option("failOnDataLoss", "false") .load() df .writeStream .outputMode(OutputMode.Append()) .format("es") .option("checkpointLocation", s"hdfs://hadoop:8020/checkpoint/test01") .options(options) .start("test_streaming/docs") .awaitTermination()
報錯信息:type (scala.collection.convert.Wrappers.JListWrapper) cannot be converted to the string type
由於es的mapping只會記錄字段的類型,不會記錄是不是數組,也就是說若是是int數組,es的mapping只是記錄成int。
能夠在option中加一個es.read.field.as.array.include,標明數組字段
es.read.field.as.array.include" -> "數組字段的名字"
若是是object裏的某個字段,寫成"object名字.數組字段名字",若是是多個字段,字段名之間用逗號分隔
DataFrame的Timestamp類型數據寫入ES後,就變成了Number類型。
這可能不算個問題,時間戳本質上就是Long類型的毫秒值;可是在Hive中Timestamp是"yyyy-MM-dd HH:mm:ss"的類型,我的以爲很彆扭。
嘗試將Timestamp類型字段轉成Date類型,寫入ES後仍是Number類型。網上搜了一圈也沒有什麼好的辦法,你們有什麼解決辦法歡迎交流。
ES Spark Support文檔:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark
ES Spark Configuration: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
end.
我的公衆號:碼農峯,定時推送行業資訊,持續發佈原創技術文章,歡迎你們關注。