Elasticsearch筆記五之java操做es

Java操做es集羣步驟1:配置集羣對象信息;2:建立客戶端;3:查看集羣信息java

1:集羣名稱node

      默認集羣名爲elasticsearch,若是集羣名稱和指定的不一致則在使用節點資源時會報錯。json

2:嗅探功能服務器

       經過client.transport.sniff啓動嗅探功能,這樣只須要指定集羣中的某一個節點(不必定是主節點),而後會加載集羣中的其餘節點,這樣只要程序不停即便此節點宕機仍然能夠鏈接到其餘節點。app

3:查詢類型SearchType.QUERY_THEN_FETCHcurl

      Es中一共有四種查詢類型。elasticsearch

      QUERY_AND_FETCH:maven

      主節點將查詢請求分發到全部的分片中,各個分片按照本身的查詢規則即詞頻文檔頻率進行打分排序,而後將結果返回給主節點,主節點對全部數據進行彙總排序而後再返回給客戶端,此種方式只須要和es交互一次。tcp

      這種查詢方式存在數據量和排序問題,主節點會彙總全部分片返回的數據這樣數據量會比較大,二是各個分片上的規則可能不一致。ide

QUERY_THEN_FETCH:

      主節點將請求分發給全部分片,各個分片打分排序後將數據的id和分值返回給主節點,主節點收到後進行彙總排序再根據排序後的id到對應的節點讀取對應的數據再返回給客戶端,此種方式須要和es交互兩次。

      這種方式解決了數據量問題可是排序問題依然存在並且是es的默認查詢方式。

DFS_QUERY_AND_FETCH和DFS_QUERY_THEN_FETCH:

      這兩種方式和前面兩種的區別在於將各個分片的規則統一塊兒來進行打分。解決了排序問題可是DFS_QUERY_AND_FETCH仍然存在數據量問題,DFS_QUERY_THEN_FETCH兩種噢乖你問題都解決可是效率是最差的。

特色:

     一個交互兩次,一個交互一次;一個統一打分規則一個不統一;一個分片返回詳細數據一個分片返回id。

4:分頁壓力

       咱們經過curl和java查詢時均可以指定分頁,可是頁數越日後服務器的壓力會越大。大多數搜索引擎都不會提供很是大的頁數搜索,緣由有兩個一是用戶習慣通常不會看頁數大的搜索結果由於越日後越不許確,二是服務器壓力。

       好比分片是5分頁單位是10查詢第10000到10010條記錄,es須要在全部分片上進行查詢,每一個分片會產生10010條排序後的數據而後返回給主節點,主節點接收5個分片的數據一共是50050條而後再進行彙總最後再取其中的10000到10010條數據返回給客戶端,這樣一來看似只請求了10條數據但實際上es要彙總5萬多條數據,因此頁碼越大服務器的壓力就越大。

5:超時timeout

       查詢時若是數據量很大,能夠指定超時時間即到達此時間後不管查詢的結果是什麼都會返回而且關閉鏈接,這樣用戶體驗較好缺點是查詢出的數據可能不完整,Java和curl均可以指定超時時間。

6:maven依賴

[java]  view plain  copy
  1. <dependency>  
  2.             <groupId>org.elasticsearch</groupId>  
  3.             <artifactId>elasticsearch</artifactId>  
  4.             <version>1.4.4</version>  
  5.         </dependency>    
  6.         <dependency>  
  7.             <groupId>com.fasterxml.jackson.core</groupId>  
  8.             <artifactId>jackson-databind</artifactId>  
  9.             <version>2.1.3</version>  
  10.         </dependency>  


如下是java代碼

    

[java]  view plain  copy
  1. package elasticsearch;  
  2.   
  3. import java.io.IOException;  
  4. import java.util.HashMap;  
  5. import java.util.List;  
  6. import java.util.Map;  
  7. import java.util.concurrent.ExecutionException;  
  8.   
  9. import online.elasticsearch.bean.Student;  
  10.   
  11. import org.elasticsearch.ElasticsearchException;  
  12. import org.elasticsearch.action.bulk.BulkItemResponse;  
  13. import org.elasticsearch.action.bulk.BulkRequestBuilder;  
  14. import org.elasticsearch.action.bulk.BulkResponse;  
  15. import org.elasticsearch.action.delete.DeleteRequest;  
  16. import org.elasticsearch.action.delete.DeleteResponse;  
  17. import org.elasticsearch.action.get.GetResponse;  
  18. import org.elasticsearch.action.index.IndexRequest;  
  19. import org.elasticsearch.action.index.IndexResponse;  
  20. import org.elasticsearch.action.search.SearchResponse;  
  21. import org.elasticsearch.action.search.SearchType;  
  22. import org.elasticsearch.action.update.UpdateRequest;  
  23. import org.elasticsearch.action.update.UpdateResponse;  
  24. import org.elasticsearch.client.transport.TransportClient;  
  25. import org.elasticsearch.cluster.node.DiscoveryNode;  
  26. import org.elasticsearch.common.collect.ImmutableList;  
  27. import org.elasticsearch.common.settings.ImmutableSettings;  
  28. import org.elasticsearch.common.settings.Settings;  
  29. import org.elasticsearch.common.text.Text;  
  30. import org.elasticsearch.common.transport.InetSocketTransportAddress;  
  31. import org.elasticsearch.common.transport.TransportAddress;  
  32. import org.elasticsearch.common.xcontent.XContentBuilder;  
  33. import org.elasticsearch.common.xcontent.XContentFactory;  
  34. import org.elasticsearch.index.query.FilterBuilders;  
  35. import org.elasticsearch.index.query.MatchQueryBuilder.Operator;  
  36. import org.elasticsearch.index.query.QueryBuilders;  
  37. import org.elasticsearch.search.SearchHit;  
  38. import org.elasticsearch.search.SearchHits;  
  39. import org.elasticsearch.search.aggregations.Aggregation;  
  40. import org.elasticsearch.search.aggregations.AggregationBuilders;  
  41. import org.elasticsearch.search.aggregations.Aggregations;  
  42. import org.elasticsearch.search.aggregations.bucket.terms.Terms;  
  43. import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;  
  44. import org.elasticsearch.search.aggregations.metrics.sum.Sum;  
  45. import org.elasticsearch.search.highlight.HighlightField;  
  46. import org.elasticsearch.search.sort.SortOrder;  
  47. import org.junit.Before;  
  48. import org.junit.Test;  
  49.   
  50. import com.fasterxml.jackson.core.JsonProcessingException;  
  51. import com.fasterxml.jackson.databind.ObjectMapper;  
  52.   
  53. public class elastaicTest {  
  54.   
  55.     TransportClient transportClient;  
  56.     //索引庫名  
  57.     String index = "shb01";  
  58.     //類型名稱  
  59.     String type = "stu";  
  60.       
  61.     @Before  
  62.     public void before()  
  63.     {  
  64.         /** 
  65.          * 1:經過 setting對象來指定集羣配置信息 
  66.          */  
  67.         Settings setting = ImmutableSettings.settingsBuilder()  
  68.             .put("cluster.name""shb01")//指定集羣名稱  
  69.             .put("client.transport.sniff"true)//啓動嗅探功能  
  70.             .build();  
  71.           
  72.         /** 
  73.          * 2:建立客戶端 
  74.          * 經過setting來建立,若不指定則默認連接的集羣名爲elasticsearch 
  75.          * 連接使用tcp協議即9300 
  76.          */  
  77.         transportClient = new TransportClient(setting);                          
  78.         TransportAddress transportAddress = new InetSocketTransportAddress("192.168.79.131"9300);  
  79.         transportClient.addTransportAddresses(transportAddress);  
  80.           
  81.         /** 
  82.          * 3:查看集羣信息 
  83.          * 注意個人集羣結構是: 
  84.          *   131的elasticsearch.yml中指定爲主節點不能存儲數據, 
  85.          *   128的elasticsearch.yml中指定不爲主節點只能存儲數據。 
  86.          * 全部控制檯只打印了192.168.79.128,只能獲取數據節點 
  87.          *  
  88.          */  
  89.         ImmutableList<DiscoveryNode> connectedNodes = transportClient.connectedNodes();  
  90.         for(DiscoveryNode node : connectedNodes)  
  91.         {  
  92.             System.out.println(node.getHostAddress());  
  93.         }  
  94.           
  95.     }  
  96.       
  97.     /** 
  98.      * 經過prepareGet方法獲取指定文檔信息 
  99.      */  
  100.     @Test  
  101.     public void testGet() {  
  102.         GetResponse getResponse = transportClient.prepareGet(index, type, "1").get();  
  103.         System.out.println(getResponse.getSourceAsString());  
  104.     }  
  105.       
  106.     /** 
  107.      * prepareUpdate更新索引庫中文檔,若是文檔不存在則會報錯 
  108.      * @throws IOException 
  109.      *  
  110.      */  
  111.     @Test  
  112.     public void testUpdate() throws IOException  
  113.     {  
  114.         XContentBuilder source = XContentFactory.jsonBuilder()  
  115.             .startObject()  
  116.             .field("name""will")  
  117.             .endObject();  
  118.           
  119.         UpdateResponse updateResponse = transportClient  
  120.                 .prepareUpdate(index, type, "6").setDoc(source).get();  
  121.           
  122.         System.out.println(updateResponse.getVersion());  
  123.     }  
  124.   
  125.     /** 
  126.      * 經過prepareIndex增長文檔,參數爲json字符串 
  127.      */  
  128.     @Test  
  129.     public void testIndexJson()  
  130.     {  
  131.         String source = "{\"name\":\"will\",\"age\":18}";  
  132.         IndexResponse indexResponse = transportClient  
  133.                 .prepareIndex(index, type, "3").setSource(source).get();  
  134.         System.out.println(indexResponse.getVersion());  
  135.     }  
  136.       
  137.     /** 
  138.      * 經過prepareIndex增長文檔,參數爲Map<String,Object> 
  139.      */  
  140.     @Test  
  141.     public void testIndexMap()  
  142.     {  
  143.         Map<String, Object> source = new HashMap<String, Object>(2);  
  144.         source.put("name""Alice");  
  145.         source.put("age"16);  
  146.         IndexResponse indexResponse = transportClient  
  147.                 .prepareIndex(index, type, "4").setSource(source).get();  
  148.         System.out.println(indexResponse.getVersion());  
  149.     }  
  150.       
  151.     /** 
  152.      * 經過prepareIndex增長文檔,參數爲javaBean 
  153.      *  
  154.      * @throws ElasticsearchException 
  155.      * @throws JsonProcessingException 
  156.      */  
  157.     @Test  
  158.     public void testIndexBean() throws ElasticsearchException, JsonProcessingException  
  159.     {  
  160.         Student stu = new Student();  
  161.         stu.setName("Fresh");  
  162.         stu.setAge(22);  
  163.           
  164.         ObjectMapper mapper = new ObjectMapper();  
  165.         IndexResponse indexResponse = transportClient  
  166.                 .prepareIndex(index, type, "5").setSource(mapper.writeValueAsString(stu)).get();  
  167.         System.out.println(indexResponse.getVersion());  
  168.     }  
  169.       
  170.     /** 
  171.      * 經過prepareIndex增長文檔,參數爲XContentBuilder 
  172.      *  
  173.      * @throws IOException 
  174.      * @throws InterruptedException 
  175.      * @throws ExecutionException 
  176.      */  
  177.     @Test  
  178.     public void testIndexXContentBuilder() throws IOException, InterruptedException, ExecutionException  
  179.     {  
  180.         XContentBuilder builder = XContentFactory.jsonBuilder()  
  181.                 .startObject()  
  182.                 .field("name""Avivi")  
  183.                 .field("age"30)  
  184.                 .endObject();  
  185.         IndexResponse indexResponse = transportClient  
  186.                 .prepareIndex(index, type, "6")  
  187.                 .setSource(builder)  
  188.                 .execute().get();  
  189.         //.execute().get();和get()效果同樣  
  190.         System.out.println(indexResponse.getVersion());  
  191.     }  
  192.       
  193.     /** 
  194.      * 經過prepareDelete刪除文檔 
  195.      *  
  196.      */  
  197.     @Test  
  198.     public void testDelete()  
  199.     {  
  200.         String id = "9";  
  201.         DeleteResponse deleteResponse = transportClient.prepareDelete(index,  
  202.                 type, id).get();  
  203.           
  204.         System.out.println(deleteResponse.getVersion());  
  205.           
  206.         //刪除全部記錄  
  207.         transportClient.prepareDeleteByQuery(index).setTypes(type)  
  208.                 .setQuery(QueryBuilders.matchAllQuery()).get();  
  209.     }  
  210.       
  211.     /** 
  212.      * 刪除索引庫,不可逆慎用 
  213.      */  
  214.     @Test  
  215.     public void testDeleteeIndex()  
  216.     {  
  217.         transportClient.admin().indices().prepareDelete("shb01","shb02").get();  
  218.     }  
  219.       
  220.     /** 
  221.      * 求索引庫文檔總數 
  222.      */  
  223.     @Test  
  224.     public void testCount()  
  225.     {  
  226.         long count = transportClient.prepareCount(index).get().getCount();  
  227.         System.out.println(count);  
  228.     }  
  229.       
  230.     /** 
  231.      * 經過prepareBulk執行批處理 
  232.      *  
  233.      * @throws IOException  
  234.      */  
  235.     @Test  
  236.     public void testBulk() throws IOException  
  237.     {  
  238.         //1:生成bulk  
  239.         BulkRequestBuilder bulk = transportClient.prepareBulk();  
  240.           
  241.         //2:新增  
  242.         IndexRequest add = new IndexRequest(index, type, "10");  
  243.         add.source(XContentFactory.jsonBuilder()  
  244.                     .startObject()  
  245.                     .field("name""Henrry").field("age"30)  
  246.                     .endObject());  
  247.           
  248.         //3:刪除  
  249.         DeleteRequest del = new DeleteRequest(index, type, "1");  
  250.           
  251.         //4:修改  
  252.         XContentBuilder source = XContentFactory.jsonBuilder().startObject().field("name""jack_1").field("age"19).endObject();  
  253.         UpdateRequest update = new UpdateRequest(index, type, "2");  
  254.         update.doc(source);  
  255.           
  256.         bulk.add(del);  
  257.         bulk.add(add);  
  258.         bulk.add(update);  
  259.         //5:執行批處理  
  260.         BulkResponse bulkResponse = bulk.get();  
  261.         if(bulkResponse.hasFailures())  
  262.         {  
  263.             BulkItemResponse[] items = bulkResponse.getItems();  
  264.             for(BulkItemResponse item : items)  
  265.             {  
  266.                 System.out.println(item.getFailureMessage());  
  267.             }  
  268.         }  
  269.         else  
  270.         {  
  271.             System.out.println("所有執行成功!");  
  272.         }  
  273.     }  
  274.       
  275.     /** 
  276.      * 經過prepareSearch查詢索引庫 
  277.      * setQuery(QueryBuilders.matchQuery("name", "jack")) 
  278.      * setSearchType(SearchType.QUERY_THEN_FETCH) 
  279.      *  
  280.      */  
  281.     @Test  
  282.     public void testSearch()  
  283.     {  
  284.         SearchResponse searchResponse = transportClient.prepareSearch(index)  
  285.                 .setTypes(type)  
  286.                 .setQuery(QueryBuilders.matchAllQuery()) //查詢全部  
  287.                 //.setQuery(QueryBuilders.matchQuery("name", "tom").operator(Operator.AND)) //根據tom分詞查詢name,默認or  
  288.                 //.setQuery(QueryBuilders.multiMatchQuery("tom", "name", "age")) //指定查詢的字段  
  289.                 //.setQuery(QueryBuilders.queryString("name:to* AND age:[0 TO 19]")) //根據條件查詢,支持通配符大於等於0小於等於19  
  290.                 //.setQuery(QueryBuilders.termQuery("name", "tom"))//查詢時不分詞  
  291.                 .setSearchType(SearchType.QUERY_THEN_FETCH)  
  292.                 .setFrom(0).setSize(10)//分頁  
  293.                 .addSort("age", SortOrder.DESC)//排序  
  294.                 .get();  
  295.           
  296.         SearchHits hits = searchResponse.getHits();  
  297.         long total = hits.getTotalHits();  
  298.         System.out.println(total);  
  299.         SearchHit[] searchHits = hits.hits();  
  300.         for(SearchHit s : searchHits)  
  301.         {  
  302.             System.out.println(s.getSourceAsString());  
  303.         }  
  304.     }  
  305.       
  306.     /** 
  307.      * 多索引,多類型查詢 
  308.      * timeout 
  309.      */  
  310.     @Test  
  311.     public void testSearchsAndTimeout()  
  312.     {  
  313.         SearchResponse searchResponse = transportClient.prepareSearch("shb01","shb02").setTypes("stu","tea")  
  314.             .setQuery(QueryBuilders.matchAllQuery())  
  315.             .setSearchType(SearchType.QUERY_THEN_FETCH)  
  316.             .setTimeout("3")  
  317.             .get();  
  318.           
  319.         SearchHits hits = searchResponse.getHits();  
  320.         long totalHits = hits.getTotalHits();  
  321.         System.out.println(totalHits);  
  322.         SearchHit[] hits2 = hits.getHits();  
  323.         for(SearchHit h : hits2)  
  324.         {  
  325.             System.out.println(h.getSourceAsString());  
  326.         }  
  327.     }  
  328.       
  329.     /** 
  330.      * 過濾, 
  331.      * lt 小於 
  332.      * gt 大於 
  333.      * lte 小於等於 
  334.      * gte 大於等於 
  335.      *  
  336.      */  
  337.     @Test  
  338.     public void testFilter()  
  339.     {  
  340.         SearchResponse searchResponse = transportClient.prepareSearch(index)  
  341.                 .setTypes(type)  
  342.                 .setQuery(QueryBuilders.matchAllQuery()) //查詢全部  
  343.                 .setSearchType(SearchType.QUERY_THEN_FETCH)  
  344. //              .setPostFilter(FilterBuilders.rangeFilter("age").from(0).to(19)  
  345. //                      .includeLower(true).includeUpper(true))  
  346.                 .setPostFilter(FilterBuilders.rangeFilter("age").gte(18).lte(22))  
  347.                 .setExplain(true//explain爲true表示根據數據相關度排序,和關鍵字匹配最高的排在前面  
  348.                 .get();  
  349.       
  350.           
  351.         SearchHits hits = searchResponse.getHits();  
  352.         long total = hits.getTotalHits();  
  353.         System.out.println(total);  
  354.         SearchHit[] searchHits = hits.hits();  
  355.         for(SearchHit s : searchHits)  
  356.         {  
  357.             System.out.println(s.getSourceAsString());  
  358.         }  
  359.     }  
  360.       
  361.     /** 
  362.      * 高亮 
  363.      */  
  364.     @Test  
  365.     public void testHighLight()  
  366.     {  
  367.         SearchResponse searchResponse = transportClient.prepareSearch(index)  
  368.                 .setTypes(type)  
  369.                 //.setQuery(QueryBuilders.matchQuery("name", "Fresh")) //查詢全部  
  370.                 .setQuery(QueryBuilders.queryString("name:F*"))  
  371.                 .setSearchType(SearchType.QUERY_THEN_FETCH)  
  372.                 .addHighlightedField("name")  
  373.                 .setHighlighterPreTags("<font color='red'>")  
  374.                 .setHighlighterPostTags("</font>")  
  375.                 .get();  
  376.       
  377.           
  378.         SearchHits hits = searchResponse.getHits();  
  379.         System.out.println("sum:" + hits.getTotalHits());  
  380.           
  381.         SearchHit[] hits2 = hits.getHits();  
  382.         for(SearchHit s : hits2)  
  383.         {  
  384.             Map<String, HighlightField> highlightFields = s.getHighlightFields();  
  385.             HighlightField highlightField = highlightFields.get("name");  
  386.             if(null != highlightField)  
  387.             {  
  388.                 Text[] fragments = highlightField.fragments();  
  389.                 System.out.println(fragments[0]);  
  390.             }  
  391.             System.out.println(s.getSourceAsString());  
  392.         }  
  393.     }  
  394.       
  395.     /** 
  396.      * 分組 
  397.      */  
  398.     @Test  
  399.     public void testGroupBy()  
  400.     {  
  401.         SearchResponse searchResponse = transportClient.prepareSearch(index).setTypes(type)  
  402.                 .setQuery(QueryBuilders.matchAllQuery())  
  403.                 .setSearchType(SearchType.QUERY_THEN_FETCH)  
  404.                 .addAggregation(AggregationBuilders.terms("group_age")  
  405.                         .field("age").size(0))//根據age分組,默認返回10,size(0)也是10  
  406.                 .get();  
  407.           
  408.         Terms terms = searchResponse.getAggregations().get("group_age");  
  409.         List<Bucket> buckets = terms.getBuckets();  
  410.         for(Bucket bt : buckets)  
  411.         {  
  412.             System.out.println(bt.getKey() + " " + bt.getDocCount());  
  413.         }  
  414.     }  
  415.       
  416.     /** 
  417.      * 聚合函數,本例之編寫了sum,其餘的聚合函數也能夠實現 
  418.      *  
  419.      */  
  420.     @Test  
  421.     public void testMethod()  
  422.     {  
  423.         SearchResponse searchResponse = transportClient.prepareSearch(index).setTypes(type)  
  424.                 .setQuery(QueryBuilders.matchAllQuery())  
  425.                 .setSearchType(SearchType.QUERY_THEN_FETCH)  
  426.                 .addAggregation(AggregationBuilders.terms("group_name").field("name")  
  427.                         .subAggregation(AggregationBuilders.sum("sum_age").field("age")))  
  428.                 .get();  
  429.           
  430.         Terms terms = searchResponse.getAggregations().get("group_name");  
  431.         List<Bucket> buckets = terms.getBuckets();  
  432.         for(Bucket bt : buckets)  
  433.         {  
  434.             Sum sum = bt.getAggregations().get("sum_age");  
  435.             System.out.println(bt.getKey() + "  " + bt.getDocCount() + " "+ sum.getValue());  
  436.         }  
  437.           
  438.     }  
  439.       
  440.       
  441.       
  442. }  
相關文章
相關標籤/搜索