ElasticSearch版本控制--java實現

1、前言

最近工做中有這樣一個ElasticSearch(如下簡稱ES)寫入的場景,Flink處理完數據實時寫入ES。如今須要將一批歷史數據經過Flink加載到到ES,有兩個點須要保證:git

  1. 對於歷史數據,ES已有文檔,則捨棄舊數據,ES沒有則插入歷史數據。
  2. 對於新數據,能對現有的ES數據進行更新。

參考ElasticSearch進階篇(一)--版本控制,能夠使用ES的版本實現該需求的開發。shell

2、代碼實現及驗證

代碼實現

請求寫數據時加入version和version_type參數,主要代碼以下:segmentfault

IndexRequest indexRequest = Requests.indexRequest()
                .index(indexName)
                .id("1")
                // 指定版本比較的業務字段,具體業務具體分析,通常取時間戳較爲合適
                .version(Long.parseLong(dataMap1.get("create").toString()))
                // 指定使用外部版本號
                .versionType(VersionType.EXTERNAL)
                .source(dataMap);

驗證

驗證demo可以使用當前時間的時間戳做爲版本比較依據。驗證思路以下:elasticsearch

  1. 運行demo程序,在當前時間戳下,插入一條數據,經過kibana等工具檢驗數據是否插入成功。並記錄當前的時間戳。
  2. 更改某些字段值對數據進行更新,再次運行程序,檢驗數據是否更新成功。
  3. 將時間版本比較的字段值固定爲第一次執行程序的時間戳,檢驗數據是否更新成功。

驗證結果以下圖:


工具

3、總結

由截圖可看到,第一步和第二步都能執行成功,第三步執行會出現版本衝突的異常,根據提示很方便能識別出緣由,即ElasticSearch進階篇(一)--版本控制中得出的結論,使用version和version_type=EXTERNAL進行版本控制時,只有要寫入文檔的版本號大於已有文檔的版本號才能更新成功。url

案例代碼參考:elasticsearch_demospa

相關文章
相關標籤/搜索