上篇談到:elasticsearch自己是一個完整的後臺系統,對其的操做使用是經過終端api進行的。elasticsearch自己提供了多種編程語言的api,包括java的esjava。而elastic4s是一套基於esjava之上的scala api。java
先看看scala 終端 ElasticClient的構建過程:node
import com.sksamuel.elastic4s.ElasticDsl._ val esjava = JavaClient(ElasticProperties("http://localhost:9200")) val client = ElasticClient(esjava)
先構建JavaClient,JavaClient包嵌了個esjava的RestClient進行具體的操做:apache
class JavaClient(client: RestClient) extends HttpClient { ... //send request to elasticsearch
override def send(req: ElasticRequest, callback: Either[Throwable, HttpResponse] => Unit): Unit = { if (logger.isDebugEnabled) { logger.debug("Executing elastic request {}", Show[ElasticRequest].show(req)) } val l = new ResponseListener { override def onSuccess(r: org.elasticsearch.client.Response): Unit = callback(Right(fromResponse(r))) override def onFailure(e: Exception): Unit = e match { case re: ResponseException => callback(Right(fromResponse(re.getResponse))) case t => callback(Left(JavaClientExceptionWrapper(t))) } } val request = new Request(req.method, req.endpoint) req.params.foreach { case (key, value) => request.addParameter(key, value) } req.entity.map(apacheEntity).foreach(request.setEntity) //perform actual request sending
client.performRequestAsync(request, l) } ... }
上面這個RestClient便是elasticsearch提供的javaClient。而elastic4s的具體操做是經過RestClient.performRequestAsync進行的,以下:編程
public class RestClient implements Closeable { ... /** * Sends a request to the Elasticsearch cluster that the client points to. * The request is executed asynchronously and the provided * {@link ResponseListener} gets notified upon request completion or * failure. Selects a host out of the provided ones in a round-robin * fashion. Failing hosts are marked dead and retried after a certain * amount of time (minimum 1 minute, maximum 30 minutes), depending on how * many times they previously failed (the more failures, the later they * will be retried). In case of failures all of the alive nodes (or dead * nodes that deserve a retry) are retried until one responds or none of * them does, in which case an {@link IOException} will be thrown. * * @param request the request to perform * @param responseListener the {@link ResponseListener} to notify when the * request is completed or fails */
public void performRequestAsync(Request request, ResponseListener responseListener) { try { FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(responseListener); InternalRequest internalRequest = new InternalRequest(request); performRequestAsync(nextNodes(), internalRequest, failureTrackingResponseListener); } catch (Exception e) { responseListener.onFailure(e); } } ... }
另外,ElasticProperties是一個javaClient與ES鏈接的參數結構,包括IP地址:json
/** * Contains the endpoints of the nodes to connect to, as well as connection properties. */
case class ElasticProperties(endpoints: Seq[ElasticNodeEndpoint], options: Map[String, String] = Map.empty)
ElasticProperties包含了ES地址ElasticNodeEndPoint及其它鏈接參數(若是須要的話),以下:api
it should "support prefix path with trailing slash" in { ElasticProperties("https://host1:1234,host2:2345/prefix/path/") shouldBe ElasticProperties(Seq(ElasticNodeEndpoint("https", "host1", 1234, Some("/prefix/path")), ElasticNodeEndpoint("https", "host2", 2345, Some("/prefix/path")))) }
當elastic4s完成了與elasticsearch的鏈接以後,就能夠把按ES要求組合的Json指令發送到後臺ES去執行了。elastic4s提供了一套DSL, 一種嵌入式語言,能夠幫助用戶更方便的用編程模式來組合ES的指令Json。固然,用戶也能夠直接把字符類的Json直接經過ElasticClient發送到後臺ES。下面是一個簡單能夠運行的elastic4s示範:app
import com.sksamuel.elastic4s.http.JavaClient import com.sksamuel.elastic4s.requests.common.RefreshPolicy import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties} object HttpClientExampleApp extends App { // you must import the DSL to use the syntax helpers
import com.sksamuel.elastic4s.ElasticDsl._ val esjava = JavaClient(ElasticProperties("http://localhost:9200")) val client = ElasticClient(esjava) client.execute { bulk( indexInto("books" ).fields("title" -> "重慶火鍋的十種吃法", "content" -> "在這部書裏描述了火鍋的各類烹飪方式"), indexInto("books" ).fields("title" -> "中國火鍋大全", "content" -> "本書是全國中式烹飪中有關火鍋的各類介紹") ).refresh(RefreshPolicy.WaitFor) }.await val json =
""" |{ | "query" : { | "match" : {"title" : "火鍋"} | } |} |""".stripMargin
val response = client.execute { search("books").source(json) // .matchQuery("title", "火鍋")
}.await
// prints out the original json
println(response.result.hits.hits.head.sourceAsString) client.close() }