ElasticSearch 的使用度愈來愈普及了,不少公司都在使用。有作日誌搜索的,有作商品搜索的,有作訂單搜索的。html
大部分使用場景都是經過程序按期去導入數據到 ElasticSearch 中,或者經過 CDC 的方式來構建索引。在這種場景下,更新數據都是單條更新,好比 ID=1 的數據發生了修改操做,那麼就會把 ElasticSearch 中 ID=1 的這條數據更新下。java
但有些場景下須要根據條件同時更新多條數據,就像 Mysql 中咱們使用 Update Table Set Name=XXX where Age=18 去更新一批數據同樣。sql
正好有同窗微信問我怎麼批量更新,接下來就看看在 ElasticSearch 中是如何去進行按條件更新的操做。數組
ElasticSearch 的客戶端官方推薦使用 elasticsearch-rest-high-level-client。因此本文也是基於 elasticsearch-rest-high-level-client 來構建代碼。微信
首先來回顧下單條數據的更新是怎麼作的,代碼以下:app
UpdateRequest updateRequest = new UpdateRequest(index, type, id); updateRequest.doc(documentJson, XContentType.JSON); restHighLevelClient.update(updateRequest, options);
構建 UpdateRequest 的時候就指定了索引,類型,ID 三個字段,也就精確到了某一條數據,因此更新的天然也是這一條數據。elasticsearch
首先咱們準備幾條測試數據,以下:ide
{ id: 1, title: "Java怎麼學", type: 1, userId: 1, tags: [ "java" ], textContent: "我要學Java", status: 1, heat: 100 } { id: 2, title: "Java怎麼學", type: 1, userId: 1, tags: [ "java" ], textContent: "我要學Java", status: 1, heat: 100 }
假如咱們的需求是將 userId=1 的全部文檔數據改爲無效,也就是 status=0。若是不用按條件更新,你就得查詢出 userId=1 的全部數據,而後一條條更新,這就太慢了。微服務
下面看看按條件更新是如何使用的,以下:測試
POST http://47.105.66.210:9200/article_v1/doc/_update_by_query { "script": { "source":"ctx._source['status']=0;" }, "query": { "term": { "userId": 1 } } }
按條件更新須要使用_update_by_query 來進行,query 用於指定更新數據的匹配條件,script 用於更新的邏輯。
詳細使用文檔:
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html[1]
https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting-using.html[2]
UpdateByQueryRequest request = new UpdateByQueryRequest("article_v1"); request.setQuery(new TermQueryBuilder("userId", 1)); request.setScript(new Script("ctx._source['status']=0;")); restHighLevelClient.updateByQuery(request, RequestOptions.DEFAULT);
是否是也很簡單,跟單條數據更新差很少,使用 UpdateByQueryRequest 構建更新對象,而後設置 Query 和 Script 就能夠了。
好比咱們的需求是要移除 tags 中的 java,以下:
POST http://47.105.66.210:9200/article_v1/doc/_update_by_query { "script": { "source":"ctx._source['tags'].removeIf(item -> item == 'java');" }, "query": { "term": { "userId": 1 } } }
新增的話只須要將 removeIf 改爲 add 就能夠了。
ctx._source['tags'].add('java');
若是有特殊的業務邏輯,Script 中還能夠寫判斷來判斷是否須要修改。
POST http://47.105.66.210:9200/article_v1/doc/_update_by_query { "script": { "source":"if(ctx._source.type == 11) {ctx._source['tags'].add('java');}" }, "query": { "term": { "userId": 1 } } }
大部分場景下的更新都比較簡單,根據某個字段去更新某個值,或者去更新多個值。在 Java 中若是每一個地方都去寫腳本,就重複了,最好是抽一個比較通用的方法來更新。
下面是簡單的示列,其中還有不少須要考慮的點,像數據類型我只處理了數字,字符串,和 List,其餘的你們須要本身去擴展。
public BulkByScrollResponse updateByQuery(String index, QueryBuilder query, Map<String, Object> document) { UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(index); updateByQueryRequest.setQuery(query); StringBuilder script = new StringBuilder(); Set<String> keys = document.keySet(); for (String key : keys) { String appendValue = ""; Object value = document.get(key); if (value instanceof Number) { appendValue = value.toString(); } else if (value instanceof String) { appendValue = "'" + value.toString() + "'"; } else if (value instanceof List){ appendValue = JsonUtils.toJson(value); } else { appendValue = value.toString(); } script.append("ctx._source.").append(key).append("=").append(appendValue).append(";"); } updateByQueryRequest.setScript(new Script(script.toString())); return updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT); } public BulkByScrollResponse updateByQuery(UpdateByQueryRequest updateByQueryRequest, RequestOptions options) { Map<String, Object> catData = new HashMap<>(1); catData.put(ElasticSearchConstant.UPDATE_BY_QUERY_REQUEST, updateByQueryRequest.toString()); return CatTransactionManager.newTransaction(() -> { try { return restHighLevelClient.updateByQuery(updateByQueryRequest, options); }catch (IOException e) { throw new RuntimeException(e); } }, ElasticSearchConstant.ES_CAT_TYPE, ElasticSearchConstant.UPDATE, catData); }
若是有了這麼一個方法,那麼使用方式以下:
@Test public void testUpdate5() { Map<String, Object> document = new HashMap<>(); document.put("title", "Java"); document.put("status", 0); document.put("tags", Lists.newArrayList("JS", "CSS")); kittyRestHighLevelClient.updateByQuery(elasticSearchIndexConfig.getArticleSaveIndexName(), new TermQueryBuilder("userId", 1), document); }
關於做者:尹吉歡,簡單的技術愛好者,《Spring Cloud 微服務-全棧技術與案例解析》, 《Spring Cloud 微服務 入門 實戰與進階》做者, 公衆號 猿天地 發起人。
參考資料
[1]
docs-update-by-query.html: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html
[2]
modules-scripting-using.html: https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting-using.html