1、簡介html
ElasticSearch和Solr都是基於Lucene的搜索引擎,不過ElasticSearch天生支持分佈式,而Solr是4.0版本後的SolrCloud纔是分佈式版本,Solr的分佈式支持須要ZooKeeper的支持。java
這裏有一個詳細的ElasticSearch和Solr的對比:http://solr-vs-elasticsearch.com/node
2、基本用法git
集羣(Cluster): ES是一個分佈式的搜索引擎,通常由多臺物理機組成。這些物理機,經過配置一個相同的cluster name,互相發現,把本身組織成一個集羣。github
節點(Node):同一個集羣中的一個Elasticearch主機。sql
主分片(Primary shard):索引(下文介紹)的一個物理子集。同一個索引在物理上能夠切多個分片,分佈到不一樣的節點上。分片的實現是Lucene 中的索引。數據庫
注意:ES中一個索引的分片個數是創建索引時就要指定的,創建後不可再改變。因此開始建一個索引時,就要預計數據規模,將分片的個數分配在一個合理的範圍。json
副本分片(Replica shard):每一個主分片能夠有一個或者多個副本,個數是用戶本身配置的。ES會盡可能將同一索引的不一樣分片分佈到不一樣的節點上,提升容錯性。對一個索引,只要不是全部shards所在的機器都掛了,就還能用。api
索引(Index):邏輯概念,一個可檢索的文檔對象的集合。相似與DB中的database概念。同一個集羣中可創建多個索引。好比,生產環境常見的一種方法,對每月產生的數據建索引,以保證單個索引的量級可控。網絡
類型(Type):索引的下一級概念,大概至關於數據庫中的table。同一個索引裏能夠包含多個 Type。
文檔(Document):即搜索引擎中的文檔概念,也是ES中一個能夠被檢索的基本單位,至關於數據庫中的row,一條記錄。
字段(Field):至關於數據庫中的column。ES中,每一個文檔,實際上是以json形式存儲的。而一個文檔能夠被視爲多個字段的集合。好比一篇文章,可能包括了主題、摘要、正文、做者、時間等信息,每一個信息都是一個字段,最後被整合成一個json串,落地到磁盤。
映射(Mapping):至關於數據庫中的schema,用來約束字段的類型,不過 Elasticsearch 的 mapping 能夠不顯示地指定、自動根據文檔數據建立。
Elasticsearch集羣能夠包含多個索引(indices),每個索引能夠包含多個類型(types),每個類型包含多個文檔(documents),而後每一個文檔包含多個字段(Fields),這種面向文檔型的儲存,也算是NoSQL的一種吧。
ES比傳統關係型數據庫,對一些概念上的理解:
Relational DB -> Databases -> Tables -> Rows -> Columns Elasticsearch -> Indices -> Types -> Documents -> Fields
從建立一個Client到添加、刪除、查詢等基本用法:
一、建立Client
public ElasticSearchService(String ipAddress, int port) { client = new TransportClient() .addTransportAddress(new InetSocketTransportAddress(ipAddress, port)); }
這裏是一個TransportClient。
ES下兩種客戶端對比:
TransportClient:輕量級的Client,使用Netty線程池,Socket鏈接到ES集羣。自己不加入到集羣,只做爲請求的處理。
Node Client:客戶端節點自己也是ES節點,加入到集羣,和其餘ElasticSearch節點同樣。頻繁的開啓和關閉這類Node Clients會在集羣中產生「噪音」。
二、建立/刪除Index和Type信息
// 建立索引 public void createIndex() { client.admin().indices().create(new CreateIndexRequest(IndexName)) .actionGet(); } // 清除全部索引 public void deleteIndex() { IndicesExistsResponse indicesExistsResponse = client.admin().indices() .exists(new IndicesExistsRequest(new String[] { IndexName })) .actionGet(); if (indicesExistsResponse.isExists()) { client.admin().indices().delete(new DeleteIndexRequest(IndexName)) .actionGet(); } } // 刪除Index下的某個Type public void deleteType(){ client.prepareDelete().setIndex(IndexName).setType(TypeName).execute().actionGet(); } // 定義索引的映射類型 public void defineIndexTypeMapping() { try { XContentBuilder mapBuilder = XContentFactory.jsonBuilder(); mapBuilder.startObject() .startObject(TypeName) .startObject("properties") .startObject(IDFieldName).field("type", "long").field("store", "yes").endObject() .startObject(SeqNumFieldName).field("type", "long").field("store", "yes").endObject() .startObject(IMSIFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject() .startObject(IMEIFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject() .startObject(DeviceIDFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject() .startObject(OwnAreaFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject() .startObject(TeleOperFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject() .startObject(TimeFieldName).field("type", "date").field("store", "yes").endObject() .endObject() .endObject() .endObject(); PutMappingRequest putMappingRequest = Requests .putMappingRequest(IndexName).type(TypeName) .source(mapBuilder); client.admin().indices().putMapping(putMappingRequest).actionGet(); } catch (IOException e) { log.error(e.toString()); } }
這裏自定義了某個Type的索引映射(Mapping),默認ES會自動處理數據類型的映射:針對整型映射爲long,浮點數爲double,字符串映射爲string,時間爲date,true或false爲boolean。
注意:針對字符串,ES默認會作「analyzed」處理,即先作分詞、去掉stop words等處理再index。若是你須要把一個字符串作爲總體被索引到,須要把這個字段這樣設置:field("index", "not_analyzed")。
詳情參考:https://www.elastic.co/guide/en/elasticsearch/guide/current/mapping-intro.html
三、索引數據
// 批量索引數據 public void indexHotSpotDataList(List<Hotspotdata> dataList) { if (dataList != null) { int size = dataList.size(); if (size > 0) { BulkRequestBuilder bulkRequest = client.prepareBulk(); for (int i = 0; i < size; ++i) { Hotspotdata data = dataList.get(i); String jsonSource = getIndexDataFromHotspotData(data); if (jsonSource != null) { bulkRequest.add(client .prepareIndex(IndexName, TypeName, data.getId().toString()) .setRefresh(true).setSource(jsonSource)); } } BulkResponse bulkResponse = bulkRequest.execute().actionGet(); if (bulkResponse.hasFailures()) { Iterator<BulkItemResponse> iter = bulkResponse.iterator(); while (iter.hasNext()) { BulkItemResponse itemResponse = iter.next(); if (itemResponse.isFailed()) { log.error(itemResponse.getFailureMessage()); } } } } } } // 索引數據 public boolean indexHotspotData(Hotspotdata data) { String jsonSource = getIndexDataFromHotspotData(data); if (jsonSource != null) { IndexRequestBuilder requestBuilder = client.prepareIndex(IndexName, TypeName).setRefresh(true); requestBuilder.setSource(jsonSource) .execute().actionGet(); return true; } return false; } // 獲得索引字符串 public String getIndexDataFromHotspotData(Hotspotdata data) { String jsonString = null; if (data != null) { try { XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); jsonBuilder.startObject().field(IDFieldName, data.getId()) .field(SeqNumFieldName, data.getSeqNum()) .field(IMSIFieldName, data.getImsi()) .field(IMEIFieldName, data.getImei()) .field(DeviceIDFieldName, data.getDeviceID()) .field(OwnAreaFieldName, data.getOwnArea()) .field(TeleOperFieldName, data.getTeleOper()) .field(TimeFieldName, data.getCollectTime()) .endObject(); jsonString = jsonBuilder.string(); } catch (IOException e) { log.equals(e); } } return jsonString; }
ES支持批量和單個數據索引。
四、查詢獲取數據
// 獲取少許數據100個 private List<Integer> getSearchData(QueryBuilder queryBuilder) { List<Integer> ids = new ArrayList<>(); SearchResponse searchResponse = client.prepareSearch(IndexName) .setTypes(TypeName).setQuery(queryBuilder).setSize(100) .execute().actionGet(); SearchHits searchHits = searchResponse.getHits(); for (SearchHit searchHit : searchHits) { Integer id = (Integer) searchHit.getSource().get("id"); ids.add(id); } return ids; } // 獲取大量數據 private List<Integer> getSearchDataByScrolls(QueryBuilder queryBuilder) { List<Integer> ids = new ArrayList<>(); // 一次獲取100000數據 SearchResponse scrollResp = client.prepareSearch(IndexName) .setSearchType(SearchType.SCAN).setScroll(new TimeValue(60000)) .setQuery(queryBuilder).setSize(100000).execute().actionGet(); while (true) { for (SearchHit searchHit : scrollResp.getHits().getHits()) { Integer id = (Integer) searchHit.getSource().get(IDFieldName); ids.add(id); } scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()) .setScroll(new TimeValue(600000)).execute().actionGet(); if (scrollResp.getHits().getHits().length == 0) { break; } } return ids; }
這裏的QueryBuilder是一個查詢條件,ES支持分頁查詢獲取數據,也能夠一次性獲取大量數據,須要使用Scroll Search。
五、聚合(Aggregation Facet)查詢
// 獲得某段時間內設備列表上每一個設備的數據分佈狀況<設備ID,數量> public Map<String, String> getDeviceDistributedInfo(String startTime, String endTime, List<String> deviceList) { Map<String, String> resultsMap = new HashMap<>(); QueryBuilder deviceQueryBuilder = getDeviceQueryBuilder(deviceList); QueryBuilder rangeBuilder = getDateRangeQueryBuilder(startTime, endTime); QueryBuilder queryBuilder = QueryBuilders.boolQuery() .must(deviceQueryBuilder).must(rangeBuilder); TermsBuilder termsBuilder = AggregationBuilders.terms("DeviceIDAgg").size(Integer.MAX_VALUE) .field(DeviceIDFieldName); SearchResponse searchResponse = client.prepareSearch(IndexName) .setQuery(queryBuilder).addAggregation(termsBuilder) .execute().actionGet(); Terms terms = searchResponse.getAggregations().get("DeviceIDAgg"); if (terms != null) { for (Terms.Bucket entry : terms.getBuckets()) { resultsMap.put(entry.getKey(), String.valueOf(entry.getDocCount())); } } return resultsMap; }
Aggregation查詢能夠查詢相似統計分析這樣的功能:如某個月的數據分佈狀況,某類數據的最大、最小、總和、平均值等。
詳情參考:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-aggs.html
3、集羣配置
配置文件elasticsearch.yml
集羣名和節點名:
#cluster.name: elasticsearch
#node.name: "Franz Kafka"
是否參與master選舉和是否存儲數據
#node.master: true
#node.data: true
分片數和副本數
#index.number_of_shards: 5
#index.number_of_replicas: 1
master選舉最少的節點數,這個必定要設置爲整個集羣節點個數的一半加1,即N/2+1
#discovery.zen.minimum_master_nodes: 1
discovery ping的超時時間,擁塞網絡,網絡狀態不佳的狀況下設置高一點
#discovery.zen.ping.timeout: 3s
注意,分佈式系統整個集羣節點個數N要爲奇數個!!
如何避免ElasticSearch發生腦裂(brain split):http://blog.trifork.com/2013/10/24/how-to-avoid-the-split-brain-problem-in-elasticsearch/
即便集羣節點個數爲奇數,minimum_master_nodes爲整個集羣節點個數一半加1,也難以免腦裂的發生,詳情看討論:https://github.com/elastic/elasticsearch/issues/2488
4、Elasticsearch插件
一、elasticsearch-head是一個elasticsearch的集羣管理工具:./elasticsearch-1.7.1/bin/plugin -install mobz/elasticsearch-head
二、elasticsearch-sql:使用SQL語法查詢elasticsearch:./bin/plugin -u https://github.com/NLPchina/elasticsearch-sql/releases/download/1.3.5/elasticsearch-sql-1.3.5.zip --install sql
github地址:https://github.com/NLPchina/elasticsearch-sql
三、elasticsearch-bigdesk是elasticsearch的一個集羣監控工具,能夠經過它來查看ES集羣的各類狀態。
安裝:./bin/plugin -install lukas-vlcek/bigdesk
訪問:http://192.103.101.203:9200/_plugin/bigdesk/,
四、elasticsearch-servicewrapper插件是ElasticSearch的服務化插件,
在https://github.com/elasticsearch/elasticsearch-servicewrapper下載該插件後,解壓縮,將service目錄拷貝到elasticsearch目錄的bin目錄下。
然後,能夠經過執行如下語句安裝、啓動、中止ElasticSearch:
sh elasticsearch install
sh elasticsearch start
sh elasticsearch stop
參考:
https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html
http://stackoverflow.com/questions/10213009/solr-vs-elasticsearch