利用 Elastic4s 寫入數據至 ElasticSearch

Elastic4s 是一個 scala 操做 ElasticSearch 的庫,Github 地址爲: elastic4s/samples/elastic4s-tcp-client-maven at master · sksamuel/elastic4s · GitHub,文檔地址爲: Elastic4s。本文記錄利用 Elastic4s 將數據寫入 ElasticSearch 時遇到的問題和解決方法。

1. 寫入方式

1.1 HTTP 方式

import com.sksamuel.elastic4s.ElasticsearchClientUri
import com.sksamuel.elastic4s.http.HttpClient
import com.sksamuel.elastic4s.http.ElasticDsl._

// onComplete 異步支持
import scala.concurrent.ExecutionContext.Implicits.global

class HTTPDemo extends App {
    val url = "localhost"
    val port = 9200
    val client = HttpClient(ElasticsearchClientUri(url, port))
    client.execute {
        bulk(
          indexInto("myindex" / "mytype").fields("country" -> "Mongolia", "capital" -> "Ulaanbaatar"),
          indexInto("myindex" / "mytype").fields("country" -> "Namibia", "capital" -> "Windhoek")
        )
    }.onComplete({
        case _ => {
            client.close()
        }
    })
}

1.2 TCP 方式

import com.sksamuel.elastic4s.ElasticsearchClientUri
import com.sksamuel.elastic4s.TcpClient
// 注意該 import 的變更
import com.sksamuel.elastic4s.ElasticDsl._

// onComplete 異步支持
import scala.concurrent.ExecutionContext.Implicits.global

class HTTPDemo extends App {
    val url = "localhost"
    val port = 9200
    val clusterName = "mycluster"
    
    // 主要區別在這裏
    val client = TcpClient.transport(s"elasticsearch://${url}:${port}?cluster.name=${clusterName}")
  
    client.execute {
        bulk(
          indexInto("myindex" / "mytype").fields("country" -> "Mongolia", "capital" -> "Ulaanbaatar"),
          indexInto("myindex" / "mytype").fields("country" -> "Namibia", "capital" -> "Windhoek")
        )
    }.onComplete({
        case _ => {
            client.close()
        }
    })
}

2. 問題及解決方法

2.1 使用 TCP 方法鏈接時遇到 available.processors 問題

報錯信息爲:html

Getting availableProcessors is already set to [n], rejecting [n] IllegalStateException exception

解決方案爲:java

在調用 TCP 鏈接前,先執行:node

System.setProperty("es.set.netty.runtime.available.processors", "false");

2.2 使用 TCP 鏈接一直超時

解決方案:git

ElasticSearch 的配置中,默認會開啓端口爲 9200 的 HTTP 端口,以及端口範圍爲 9300~9400 的 TCP 端口。github

對於遠程鏈接而言,HTTP 須要增長配置 network.host: 0.0.0.0;TCP 須要增長配置:transport.host: 0.0.0.0ajax

具體配置能夠參考官方文檔:[Transport | Elasticsearch Reference [5.6] | Elastic](https://www.elastic.co/guide/...api

publish_hostbind_host 之間的區別爲:networking - What's the difference between bind_host and publish_host in ElasticSearch? - Stack Overflow異步

2.3 使用 TCP 鏈接時報錯 NoNodeAvailableException

報錯信息:elasticsearch

NoNodeAvailableException : None of the configured nodes are available

解決方法:maven

當咱們採用默認集羣名稱時,即沒有改變 cluster.name: xxx 配置時,能夠使用以下方法創建 TCP 鏈接:

val url = "localhost"
val port = 9200

val client = HttpClient(ElasticsearchClientUri(url, port))

但當咱們改變了集羣名稱的配置時,鏈接方式須要改成:

val url = "localhost"
val port = 9200
val clusterName = "mycluster"

val client = TcpClient.transport(s"elasticsearch://${url}:${port}?cluster.name=${clusterName}")

若是集羣有多個節點,也能夠配置爲( 只配置其中一個節點也能夠 ):

elasticsearch://host1:9300,host2:9300,host3:9300?cluster.name=my-cluster

2.4 jackson 方法缺失問題

報錯信息:

Exception in thread 「main」 java.lang.NoSuchMethodError: com.fasterxml.jackson.core.JsonFactory.xxx

解決方法:

在引用的各類依賴中,可能出現 jackson 各組件版本不匹配的問題。在這種狀況下,咱們須要在 pom.xml 中顯式聲明各組件的版本:

<jackson.version>2.8.8</jackson.version>

...

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>${jackson.version}</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-annotations</artifactId>
    <version>${jackson.version}</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>${jackson.version}</version>
</dependency>

2.5 同步異步

在 Elastic4s 中,不管 HTTP 或是 TCP 鏈接,均可以指定調用方式爲同步或是異步:

// 同步方式
client.execute {bulk}.await

// 異步方式
client.execute {bulk}

在同步異步的調用方式上,會致使如下幾個問題:

2.5.1 異步調用是提示缺乏引入

報錯信息:

Cannot find an implicit ExecutionContext. You might pass an (implicit ec: ExecutionContext) parameter to your method or import scala.concurrent.ExecutionContext.Implicits.global.

解決方法:

import scala.concurrent.ExecutionContext.Implicits.global

2.5.2 在異步調用後馬上關閉鏈接致使數據沒有寫入

若是咱們在使用異步方式時這麼作:

client.execute {bulk}
client.close

會致使接口調用在未完成時就中斷了,使得雖然沒有報錯或提示信息,但數據確實沒有寫入到 ES 中。

解決方案:

相似 ajax,咱們能夠使用以下方式完成接口調用完成後的資源回收:

client.execute {bulk}
      .onComplete({
          case _ => {
              client.close()
          }
      })

2.6 日期格式問題

就查閱的資料,index 會將格式爲:yyyy-MM-dd HH:mm:ss Zyyyy-MM-dd Z 的字符串自動識別爲日期格式,但就我在 5.6 版本的實驗而言,在默認狀況下,可以被識別爲日期的字符串格式爲:yyyy-MM-dd HH:mm:ssyyyy-MM-dd

參考連接

  1. [Getting availableProcessors is already set to [1], rejecting [1] IllegalStateException exception - Elasticsearch - Discuss the Elastic Stack](https://discuss.elastic.co/t/...
  2. Elasticsearch 5.4.1 - availableProcessors is already set - Elasticsearch - Discuss the Elastic Stack
  3. How do I enable remote access/request in Elasticsearch 2.0? - Stack Overflow
  4. networking - What's the difference between bind_host and publish_host in ElasticSearch? - Stack Overflow
  5. NoNodeAvailableException None of the configured nodes are available: {127.0.0.1:9300} · Issue 972 · sksamuel/elastic4s · GitHub
  6. java - NoNodeAvailableException : None of the configured nodes are available - Stack Overflow
  7. elasticsearch 5.5使用TransportClient初始化拋異常 - CSDN博客
  8. Exception in thread "main" java.lang.NoSuchMethodError: com.fasterxml.jackson.core.JsonFactory.requiresPropertyOrdering()Z - Stack Overflow
  9. Date datatype | Elasticsearch Reference 5.6 | Elastic
相關文章
相關標籤/搜索