Elasticsearch 學習: Java API (一)

最近在學習 Elasticsearch,這是一個分佈式的大數據搜索引擎,其實也能夠看做是一個分佈式的數據庫。我使用的 Elasticsearch 的版本是 2.4.1,鑑於網上相關的中文資料較少,因此本身看官方文檔學習一下。java

使用 Maven 工程,個人 pom 文件以下所示:git

<dependencies>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>2.4.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.6.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.6.2</version>
        </dependency>
    </dependencies>

鏈接機器

TransportClient client = TransportClient.builder()
    .build()
    .addTransportAddress(new InetSocketTransportAddress(InetAddress
    .getByName("localhost"), 9300));

Index API 建立 Index 而且插入 Document

建立索引有不少種方法,這裏列舉經常使用的 2 種:github

HashMap<String, Object> json = new HashMap<String, Object>();
json.put("first_name","Shuang");
json.put("last_name", "Peng");
json.put("age", 24);
json.put("about", "I love coding");
IndexResponse response = client
    .prepareIndex("tseg","students","1")
    .setSource(json).get();

IndexResponse response = client.prepareIndex("tseg","students","1")
   .setSource(jsonBuilder()
   .startObject()
   .field("first_name", "Shuang")
   .field("first_name", "Peng")
   .field("age", 24)
   .field("about", "I love coding")
   .endObject())
   .get();

注意:Index API 只能用於建立 index,相似於關係型數據庫裏面的 create table,他不能對已有的數據庫進行添加。追加操做能夠用後面會提到的 Update 或者 Bulk 來完成。數據庫

Get API 獲取 Document

GetResponse response2 = client.prepareGet("tseg", "students", "1").get();
Map<String, Object> res = response2.getSource();
for (Map.Entry<String, Object> entry: res.entrySet()){
     System.out.println(entry.getKey() + " : " + entry.getValue());
     }

Delete API 刪除 Index 或者 Document

// 用來刪除對應的 document 
DeleteResponse response3 = 
    client.prepareDelete("tesg","students","1").get();
// 用來刪除對應的 index
DeleteIndexResponse response4 = 
    client.admin().indices().prepareDelete("facebook").execute().actionGet();

Update API 更新操做

更新操做也有兩種方法。建議使用第一種,第二種太複雜了。。。看看就好。apache

第一種json

client.prepareUpdate("tseg", "students", "1")
    .setDoc(jsonBuilder()
    .startObject().field("age", 32)
    .endObject())
    .get();

第二種api

IndexRequest indexRequest = new IndexRequest("tseg", "students", "1")
    .source(jsonBuilder()
    .startObject()
    .field("first_name", "Shuang")
    .field("last_name", "Peng")
    .field("age", 32)
    .field("about", "I loving coding")
    .endObject());

UpdateRequest updateRequest = new UpdateRequest("tseg","students", "1")
    .doc(jsonBuilder()
    .startObject().field("age", 32)
    .endObject())
    .upsert(indexRequest);
 client.update(updateRequest).get();

不過這裏提一下第二種方法,若是對應的 field 不存在的話,則更新操做自動變爲插入操做,不然,就是正常的修改操做。elasticsearch

Multi Get API 多查找

MultiGetResponse API 能夠一次返回多個要查找的值。下面介紹了兩種方法,一種是返回一個 Map,咱們能夠按照不一樣的 field 取值;第二種方法是直接返回一個字符串(Json格式)。分佈式

MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
    .add("tseg", "students", "1", "2", "3").get();

for (MultiGetItemResponse itemResponses : multiGetItemResponses) {
    GetResponse response5 = itemResponses.getResponse();
    if (response5.isExists()) {
    
// 第一種用法
    Map<String, Object> fields = response5.getSource();
    System.out.println(fields.get("first_name"));

// 第二種用法
    String json2 = response5.getSourceAsString();
    System.out.println(json2);
}

Bulk API 批量操做

Bulk API容許批量提交index和delete請求, 以下:ide

BulkRequestBuilder bulkRequest = client.prepareBulk();
bulkRequest.add(client.prepareIndex("tseg", "students", "1")
           .setSource(jsonBuilder()
           .startObject()
           .field("first_name", "Allen")
           .field("last_name", "Peng")
           .field("age", "22")
           .endObject()))
           .get();
           
bulkRequest.add(client.prepareIndex("tseg", "students", "2"))
            .setSource(jsonBuilder()
            .startObject()
            .field("first_name", "Hou")
            .field("last_name", "Xue")
            .field("age", "30")
            .endObject()))
            .get();

HashMap<String, Object> json2 = new HashMap<String, Object>();
List<String> list = new ArrayList<String>();
list.add("music");
list.add("football");
json2.put("first_name", "Peng");
json2.put("last_name", "Peng");
json2.put("interests", list);
BulkRequestBuilder bulkRequest2 = client.prepareBulk();

// 兩種執行方法,我的傾向於第一種
bulkRequest2.add(client.prepareIndex("facebook", "info", 
    "3").setSource(json2)).get();
// 第二種方法
bulkRequest2.add(client.prepareIndex("facebook", 
    "info","1").setSource(json2)).execute().actionGet();

還能夠這樣作:

BulkRequestBuilder bulkRequest = client.prepareBulk();
bulkRequest.add(client.prepareIndex("index1", "type1", "id1")
    .setSource(source);
bulkRequest.add(client.prepareIndex("index2", "type2", "id2")
    .setSource(source);
BulkResponse bulkResponse = bulkRequest.execute().actionGet();

Bulk Processor API 可在批量操做完成以前和以後進行相應的操做

BulkProcessor bulkProcessor = BulkProcessor.builder(
        client,  
        new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId,
                                  BulkRequest request) { ... } 

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  BulkResponse response) { ... } 

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  Throwable failure) { ... } 
        })
        .setBulkActions(10000) 
        .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) 
        .setFlushInterval(TimeValue.timeValueSeconds(5)) 
        .setConcurrentRequests(1) 
         .build();
         
bulkProcessor.add(new IndexRequest("index1", "type1", "id1").source(source1));  
bulkProcessor.add(new DeleteRequest("index2", "type2", "id2");
  1. beforeBulk 會在批量提交以前執行,能夠從 BulkRequest 中獲取請求信息request.requests() 或者請求數量 request.numberOfActions()。

  2. 第一個 afterBulk 會在批量成功後執行,能夠跟 beforeBulk 配合計算批量所需時間。

  3. 第二個 afterBulk 會在批量失敗後執行。

  4. 在例子中,當請求超過 10000 個(default=1000)或者總大小超過1GB(default=5MB)時,觸發批量提交動做。

後記

項目代碼已經共享至 GitHub

相關文章
相關標籤/搜索