Multi Get API json
public static void multiGet() { // 批量查詢 MultiGetResponse response = getClient().prepareMultiGet() .add("my_person", "my_index", "1")// 查詢id爲1的文檔 .add("my_person", "my_index", "2", "3", "4")// ids,id列表 .add("telegraph", "msg", "2")// 能夠查詢其餘索引裏面的數據 .get(); // 獲取相應結果 for (MultiGetItemResponse multiGetItemResponse : response) { // 遍歷結果集 GetResponse getResponse = multiGetItemResponse.getResponse(); if (getResponse.isExists()) {// 判斷文檔是否存在 String json = getResponse.getSourceAsString(); System.out.println(json); } } }
測試併發
public static void main(String[] args) { multiGet(); }
執行結果測試
{ "name":"sean", "age":22, "salary":6000 } { "name":"sim", "age":20, "salary":5000 } { "name":"duck", "age":28, "salary":8000 } { "name":"lily", "age":20, "salary":4000 } {"title":"被更新以後title","content":"測試添加內容"}
Bulk APIui
/** * 批量操做 * @throws Exception */ public static void bulk() throws Exception { BulkRequestBuilder bulkRequest = getClient().prepareBulk(); bulkRequest.add(getClient().prepareIndex("telegraph", "msg", "3") .setSource(XContentFactory.jsonBuilder().startObject().field("title", "控股股東涉嫌內幕交易 被證監會立案調查") .field("content", "財聯社7月23日訊,嘉欣絲綢晚間公告,控股股東、董事長周國建因其涉嫌內幕交易,收到中國證監會的《調查通知書》,對其進行立案調查") .endObject())); bulkRequest.add(getClient().prepareIndex("telegraph", "msg", "4") .setSource(XContentFactory.jsonBuilder().startObject().field("title", "泛海控股股價13連陽 控股股東今日再增持213萬股") .field("content", "財聯社7月23日訊,泛海控股晚間公告,控股股東中國泛海於7月23日增持了213.16萬股公司股份,約佔公司股份總數的0.0410%,成交均價爲6.798 元/股") .endObject())); // 批量執行 BulkResponse bulkResponse = bulkRequest.get(); System.out.println(bulkResponse.status()); // 判斷是否存在失敗操做 if (bulkResponse.hasFailures()) { System.out.println("存在失敗操做"); } //遍歷每一個操做的執行結果 for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) { System.out.println(bulkItemResponse.getResponse().toString()); } }
測試操做spa
public static void main(String[] args) { try { bulk(); } catch (Exception e) { e.printStackTrace(); } }
BulkProcessor類提供了一個簡單接口,能夠根據請求的數量或大小自動刷新批量操做,也能夠在給定的時間段以後自動刷新批量操做。線程
/** * 批量處理器 */ public static void bulkProcessor() { BulkProcessor.Listener listener = new BulkProcessor.Listener() { public void beforeBulk(long executionId, BulkRequest request) { // 執行批量操做以前 System.out.println(request.numberOfActions()); } public void afterBulk(long executionId, BulkRequest request, Throwable failure) { // 執行批量操做以後,異常 System.out.println("執行錯誤:" + request.toString() + ",失敗:" + failure.getMessage()); } public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { // 執行批量操做以後 for (BulkItemResponse bulkItemResponse : response.getItems()) { System.out.println("執行成功"+bulkItemResponse.getResponse().toString()); } } }; // 設置執行器,包含執行時執行過程的監聽,以及執行屬性配置 BulkProcessor bulkProcessor = BulkProcessor.builder(getClient(), listener).setBulkActions(500) // 設置批量處理數量的閥值 .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))// 設置批量執執行處理請求大小閥值 .setFlushInterval(TimeValue.timeValueSeconds(5))// 設置刷新索引時間間隔 .setConcurrentRequests(1)// 設置併發處理線程個數 .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(100), 3))// 設置回滾策略,等待時間100,重試次數3 .build(); // 添加須要執行的請求 bulkProcessor.add(new DeleteRequest("telegraph", "msg", "3")); bulkProcessor.add(new DeleteRequest("telegraph", "msg", "4")); // 刷新請求 bulkProcessor.flush(); // 關閉執行器 bulkProcessor.close(); //刷新索引(沒有這一步不執行) getClient().admin().indices().prepareRefresh().get(); }
測試code
public static void main(String[] args) { try { bulkProcessor(); } catch (Exception e) { e.printStackTrace(); } }
根據查詢條件,刪除知足條件的文檔索引
/** * 根據查詢條件刪除文檔 */ public static void deleteQuery() { //根據查詢條件刪除文檔 BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(getClient()) .filter(QueryBuilders.matchQuery("title", "長生生物")).source("telegraph").get(); System.out.println(response.getDeleted());// 刪除文檔數量 }
測試接口
public static void main(String[] args) { try { deleteQuery(); } catch (Exception e) { e.printStackTrace(); } }