最近在學習 Elasticsearch,這是一個分佈式的大數據搜索引擎,其實也能夠看做是一個分佈式的數據庫。我使用的 Elasticsearch 的版本是 2.4.1,鑑於網上相關的中文資料較少,因此本身看官方文檔學習一下。java
使用 Maven 工程,個人 pom 文件以下所示:git
<dependencies> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>2.4.1</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.6.2</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.6.2</version> </dependency> </dependencies>
TransportClient client = TransportClient.builder() .build() .addTransportAddress(new InetSocketTransportAddress(InetAddress .getByName("localhost"), 9300));
建立索引有不少種方法,這裏列舉經常使用的 2 種:github
HashMap<String, Object> json = new HashMap<String, Object>(); json.put("first_name","Shuang"); json.put("last_name", "Peng"); json.put("age", 24); json.put("about", "I love coding"); IndexResponse response = client .prepareIndex("tseg","students","1") .setSource(json).get(); IndexResponse response = client.prepareIndex("tseg","students","1") .setSource(jsonBuilder() .startObject() .field("first_name", "Shuang") .field("first_name", "Peng") .field("age", 24) .field("about", "I love coding") .endObject()) .get();
注意:Index API 只能用於建立 index,相似於關係型數據庫裏面的 create table,他不能對已有的數據庫進行添加。追加操做能夠用後面會提到的 Update 或者 Bulk 來完成。數據庫
GetResponse response2 = client.prepareGet("tseg", "students", "1").get(); Map<String, Object> res = response2.getSource(); for (Map.Entry<String, Object> entry: res.entrySet()){ System.out.println(entry.getKey() + " : " + entry.getValue()); }
// 用來刪除對應的 document DeleteResponse response3 = client.prepareDelete("tesg","students","1").get(); // 用來刪除對應的 index DeleteIndexResponse response4 = client.admin().indices().prepareDelete("facebook").execute().actionGet();
更新操做也有兩種方法。建議使用第一種,第二種太複雜了。。。看看就好。apache
第一種json
client.prepareUpdate("tseg", "students", "1") .setDoc(jsonBuilder() .startObject().field("age", 32) .endObject()) .get();
第二種api
IndexRequest indexRequest = new IndexRequest("tseg", "students", "1") .source(jsonBuilder() .startObject() .field("first_name", "Shuang") .field("last_name", "Peng") .field("age", 32) .field("about", "I loving coding") .endObject()); UpdateRequest updateRequest = new UpdateRequest("tseg","students", "1") .doc(jsonBuilder() .startObject().field("age", 32) .endObject()) .upsert(indexRequest); client.update(updateRequest).get();
不過這裏提一下第二種方法,若是對應的 field 不存在的話,則更新操做自動變爲插入操做,不然,就是正常的修改操做。elasticsearch
MultiGetResponse API 能夠一次返回多個要查找的值。下面介紹了兩種方法,一種是返回一個 Map,咱們能夠按照不一樣的 field 取值;第二種方法是直接返回一個字符串(Json格式)。分佈式
MultiGetResponse multiGetItemResponses = client.prepareMultiGet() .add("tseg", "students", "1", "2", "3").get(); for (MultiGetItemResponse itemResponses : multiGetItemResponses) { GetResponse response5 = itemResponses.getResponse(); if (response5.isExists()) { // 第一種用法 Map<String, Object> fields = response5.getSource(); System.out.println(fields.get("first_name")); // 第二種用法 String json2 = response5.getSourceAsString(); System.out.println(json2); }
Bulk API容許批量提交index和delete請求, 以下:ide
BulkRequestBuilder bulkRequest = client.prepareBulk(); bulkRequest.add(client.prepareIndex("tseg", "students", "1") .setSource(jsonBuilder() .startObject() .field("first_name", "Allen") .field("last_name", "Peng") .field("age", "22") .endObject())) .get(); bulkRequest.add(client.prepareIndex("tseg", "students", "2")) .setSource(jsonBuilder() .startObject() .field("first_name", "Hou") .field("last_name", "Xue") .field("age", "30") .endObject())) .get(); HashMap<String, Object> json2 = new HashMap<String, Object>(); List<String> list = new ArrayList<String>(); list.add("music"); list.add("football"); json2.put("first_name", "Peng"); json2.put("last_name", "Peng"); json2.put("interests", list); BulkRequestBuilder bulkRequest2 = client.prepareBulk(); // 兩種執行方法,我的傾向於第一種 bulkRequest2.add(client.prepareIndex("facebook", "info", "3").setSource(json2)).get(); // 第二種方法 bulkRequest2.add(client.prepareIndex("facebook", "info","1").setSource(json2)).execute().actionGet();
還能夠這樣作:
BulkRequestBuilder bulkRequest = client.prepareBulk(); bulkRequest.add(client.prepareIndex("index1", "type1", "id1") .setSource(source); bulkRequest.add(client.prepareIndex("index2", "type2", "id2") .setSource(source); BulkResponse bulkResponse = bulkRequest.execute().actionGet();
BulkProcessor bulkProcessor = BulkProcessor.builder( client, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { ... } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { ... } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { ... } }) .setBulkActions(10000) .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) .setFlushInterval(TimeValue.timeValueSeconds(5)) .setConcurrentRequests(1) .build(); bulkProcessor.add(new IndexRequest("index1", "type1", "id1").source(source1)); bulkProcessor.add(new DeleteRequest("index2", "type2", "id2");
beforeBulk 會在批量提交以前執行,能夠從 BulkRequest 中獲取請求信息request.requests() 或者請求數量 request.numberOfActions()。
第一個 afterBulk 會在批量成功後執行,能夠跟 beforeBulk 配合計算批量所需時間。
第二個 afterBulk 會在批量失敗後執行。
在例子中,當請求超過 10000 個(default=1000)或者總大小超過1GB(default=5MB)時,觸發批量提交動做。
項目代碼已經共享至 GitHub。