ES18-JAVA API 批量操做

1.批量查詢

Multi Get API json

public static void multiGet() {
		// 批量查詢
		MultiGetResponse response = getClient().prepareMultiGet()
				.add("my_person", "my_index", "1")// 查詢id爲1的文檔
				.add("my_person", "my_index", "2", "3", "4")// ids,id列表
				.add("telegraph", "msg", "2")// 能夠查詢其餘索引裏面的數據
				.get();
		// 獲取相應結果
		for (MultiGetItemResponse multiGetItemResponse : response) { // 遍歷結果集
			GetResponse getResponse = multiGetItemResponse.getResponse();
			if (getResponse.isExists()) {// 判斷文檔是否存在
				String json = getResponse.getSourceAsString();
				System.out.println(json);
			}
		}

	}

測試併發

public static void main(String[] args) {
		multiGet();
	}

執行結果測試

{
  "name":"sean",
  "age":22,
  "salary":6000
}

{
  "name":"sim",
  "age":20,
  "salary":5000
}

{
  "name":"duck",
  "age":28,
  "salary":8000
}

{
  "name":"lily",
  "age":20,
  "salary":4000
}

{"title":"被更新以後title","content":"測試添加內容"}

2.批量操做

Bulk APIui

/**
	 * 批量操做
	 * @throws Exception
	 */
	public static void bulk() throws Exception {
		BulkRequestBuilder bulkRequest = getClient().prepareBulk();

		bulkRequest.add(getClient().prepareIndex("telegraph", "msg", "3")
				.setSource(XContentFactory.jsonBuilder().startObject().field("title", "控股股東涉嫌內幕交易 被證監會立案調查")
						.field("content", "財聯社7月23日訊,嘉欣絲綢晚間公告,控股股東、董事長周國建因其涉嫌內幕交易,收到中國證監會的《調查通知書》,對其進行立案調查")
						.endObject()));
		bulkRequest.add(getClient().prepareIndex("telegraph", "msg", "4")
				.setSource(XContentFactory.jsonBuilder().startObject().field("title", "泛海控股股價13連陽 控股股東今日再增持213萬股")
						.field("content",
								"財聯社7月23日訊,泛海控股晚間公告,控股股東中國泛海於7月23日增持了213.16萬股公司股份,約佔公司股份總數的0.0410%,成交均價爲6.798 元/股")
						.endObject()));
		// 批量執行
		BulkResponse bulkResponse = bulkRequest.get();
		System.out.println(bulkResponse.status());
		// 判斷是否存在失敗操做
		if (bulkResponse.hasFailures()) {
			System.out.println("存在失敗操做");
		}
		//遍歷每一個操做的執行結果
		for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
			System.out.println(bulkItemResponse.getResponse().toString());
		}
	}

測試操做spa

public static void main(String[] args) {
		try {
			bulk();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

3.批量處理器(Bulk Processor)

BulkProcessor類提供了一個簡單接口,能夠根據請求的數量或大小自動刷新批量操做,也能夠在給定的時間段以後自動刷新批量操做。線程

/**
	 * 批量處理器
	 */
	public static void bulkProcessor() {

		BulkProcessor.Listener listener = new BulkProcessor.Listener() {

			public void beforeBulk(long executionId, BulkRequest request) {
				// 執行批量操做以前
				System.out.println(request.numberOfActions());
			}

			public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
				// 執行批量操做以後,異常
				System.out.println("執行錯誤:" + request.toString() + ",失敗:" + failure.getMessage());
			}

			public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
				// 執行批量操做以後
				for (BulkItemResponse bulkItemResponse : response.getItems()) {
					System.out.println("執行成功"+bulkItemResponse.getResponse().toString());
				}
			}
		};

		// 設置執行器,包含執行時執行過程的監聽,以及執行屬性配置
		BulkProcessor bulkProcessor = BulkProcessor.builder(getClient(), listener).setBulkActions(500) // 設置批量處理數量的閥值
				.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))// 設置批量執執行處理請求大小閥值
				.setFlushInterval(TimeValue.timeValueSeconds(5))// 設置刷新索引時間間隔
				.setConcurrentRequests(1)// 設置併發處理線程個數
				.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(100), 3))// 設置回滾策略,等待時間100,重試次數3
				.build();

		// 添加須要執行的請求
		bulkProcessor.add(new DeleteRequest("telegraph", "msg", "3"));
		bulkProcessor.add(new DeleteRequest("telegraph", "msg", "4"));
		// 刷新請求
		bulkProcessor.flush();
		// 關閉執行器
		bulkProcessor.close();
		//刷新索引(沒有這一步不執行)
		getClient().admin().indices().prepareRefresh().get();
	}

測試code

public static void main(String[] args) {
		try {
			bulkProcessor();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

4.查詢刪除

根據查詢條件,刪除知足條件的文檔索引

/**
	 * 根據查詢條件刪除文檔
	 */
	public static void deleteQuery() {

		//根據查詢條件刪除文檔
		BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(getClient())
				.filter(QueryBuilders.matchQuery("title", "長生生物")).source("telegraph").get();

		System.out.println(response.getDeleted());// 刪除文檔數量

	}

測試接口

public static void main(String[] args) {
		try {
			deleteQuery();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
相關文章
相關標籤/搜索