最近工做中有這樣一個ElasticSearch(如下簡稱ES)寫入的場景,Flink處理完數據實時寫入ES。如今須要將一批歷史數據經過Flink加載到到ES,有兩個點須要保證:git
參考ElasticSearch進階篇(一)--版本控制,能夠使用ES的版本實現該需求的開發。shell
請求寫數據時加入version和version_type參數,主要代碼以下:segmentfault
IndexRequest indexRequest = Requests.indexRequest() .index(indexName) .id("1") // 指定版本比較的業務字段,具體業務具體分析,通常取時間戳較爲合適 .version(Long.parseLong(dataMap1.get("create").toString())) // 指定使用外部版本號 .versionType(VersionType.EXTERNAL) .source(dataMap);
驗證demo可以使用當前時間的時間戳做爲版本比較依據。驗證思路以下:elasticsearch
驗證結果以下圖:工具
由截圖可看到,第一步和第二步都能執行成功,第三步執行會出現版本衝突的異常,根據提示很方便能識別出緣由,即ElasticSearch進階篇(一)--版本控制中得出的結論,使用version和version_type=EXTERNAL進行版本控制時,只有要寫入文檔的版本號大於已有文檔的版本號才能更新成功。url
案例代碼參考:elasticsearch_demospa