最近Elasticsearch升級,準備從5.2.2升級到最新的7.4.2。html
膽子敢這麼肥的一個重要的緣由是由於這個業務部分的Elasticsearch集羣數據量不大,不到200G。java
瞭解了一下elasticdump,最後放棄了,決定本身寫代碼來遷移。apache
由於rest client高版本的和低版本的transport client的兼容問題,最後決定,讀數據使用tranport client,高版本寫數據本身封裝請求而後使用http方式去執行。json
bulk能夠包含索引添加(index)、建立(create)、刪除(delete)、更新(update)四種操做。app
index和create的區別是,index操做確定會被執行,id相同的時候會增長文檔的版本號。而create操做在id已經存在的時候就不會執行。elasticsearch
建議通常使用index操做,甚至更新內容比較多均可以考慮使用index來代替update。ide
bulk的語法格式以下。工具
POST _bulkpost
{ "index" : { "_index" : "test", "_id" : "1" } } { "field1" : "value1" } { "delete" : { "_index" : "test", "_id" : "2" } } { "create" : { "_index" : "test", "_id" : "3" } } { "field1" : "value3" } { "update" : {"_id" : "1", "_index" : "test"} } { "doc" : {"field2" : "value2"} }
首先是要執行的操做,index、create、delete、update,必須包含索引名稱,對於delete和update操做id也是必須的。測試
接下來就是文檔的具體內容,4個操做中update稍微有點不一樣,文檔須要使用doc標識一下。
有2點須要注意:
import java.io.IOException; /** * elasticsearch bulk 請求 * https://www.elastic.co/guide/en/elasticsearch/reference/7.4/docs-bulk.html */ public class BulkBuilder { private static final String INDEX_TEMPLATE_WITH_ID = "{ \"%s\" : { \"_index\" : \"%s\", \"_id\" : \"%s\" } }"; private static final String INDEX_TEMPLATE = "{ \"%s\" : { \"_index\" : \"%s\"} }"; private static final String DOC_TEMPLATE = "{ \"doc\" : %s }"; private static final String INDEX = "index"; private static final String CREATE = "create"; private static final String DELETE = "delete"; private static final String UPDATE = "update"; /** * 換行、容許是\n或者\r\n */ private static final String NEWLINE = "\n"; /** * bulk請求的content-type */ public static final String CONTENT_TYPE = "application/x-ndjson"; String BULK_URL = "http://%s/_bulk"; private StringBuffer body; private BulkBuilder(){ body = new StringBuffer(); } public static BulkBuilder getBuilder(){ return new BulkBuilder(); } public BulkBuilder index(String index, String id, String docJson){ String head = String.format(INDEX_TEMPLATE_WITH_ID, INDEX, index, id); body.append(head).append(NEWLINE); body.append(docJson).append(NEWLINE); return this; } public BulkBuilder index(String index, String docJson){ String head = String.format(INDEX_TEMPLATE, INDEX, index); body.append(head).append(NEWLINE); body.append(docJson).append(NEWLINE); return this; } public BulkBuilder update(String index, String id, String docJson){ String head = String.format(INDEX_TEMPLATE_WITH_ID, UPDATE, index, id); body.append(head).append(NEWLINE); String updateBody = String.format(DOC_TEMPLATE, docJson); body.append(updateBody).append(NEWLINE); return this; } public BulkBuilder create(String index, String id, String docJson){ String head = String.format(INDEX_TEMPLATE_WITH_ID, CREATE, index, id); body.append(head).append(NEWLINE); body.append(docJson).append(NEWLINE); return this; } public BulkBuilder create(String index, String docJson){ String head = String.format(INDEX_TEMPLATE, CREATE, index); body.append(head).append(NEWLINE); body.append(docJson).append(NEWLINE); return this; } public BulkBuilder delete(String index, String id){ String head = String.format(INDEX_TEMPLATE_WITH_ID, DELETE, index, id); body.append(head).append(NEWLINE); return this; } public String build(){ return body.toString(); } public String postBulk(String host) throws IOException { String url = String.format(BULK_URL, host); if(body.length() > 15){//body爲空的時候不執行請求 return HttpUtil.postContent(url,body.toString(),CONTENT_TYPE); }else { return ""; } } }
上面的HttpUtil可使用後面給的Http工具,也可使用Spring的RestTemplate,或者使用JDK自帶的HttpConnection均可以。
使用起來也很是方便,下面給一個測試:
@Test void update() throws IOException { BulkBuilder builder = BulkBuilder.getBuilder(); JSONObject jsonObject = new JSONObject(); jsonObject.put("name","youyou"); jsonObject.put("age",20); builder.index("test","1",jsonObject.toString()); JSONObject update = new JSONObject(); update.put("age","200"); builder.update("test","1",update.toJSONString()); String rs = builder.postBulk("127.0.0.1:9200"); System.out.println(rs); }
import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.*; import org.apache.http.config.ConnectionConfig; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.Charset; public class HttpUtil { private static Logger logger = LoggerFactory.getLogger(HttpUtil.class); private static final CloseableHttpClient commonHttpClient ; static { HttpClientBuilder httpClientBuilder = HttpClients.custom(); RequestConfig.Builder requestConfigBuilder = RequestConfig.custom(); requestConfigBuilder.setSocketTimeout(3000); requestConfigBuilder.setConnectTimeout(3000); requestConfigBuilder.setConnectionRequestTimeout(3000); httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build()); PoolingHttpClientConnectionManager poolingHttpClientConnectionManager = new PoolingHttpClientConnectionManager(); ConnectionConfig.Builder connectionConfigBuilder = ConnectionConfig.custom(); ConnectionConfig connectionConfig = connectionConfigBuilder.setCharset(Charset.defaultCharset()).setBufferSize(4096).build(); httpClientBuilder.setDefaultConnectionConfig(connectionConfig); httpClientBuilder.setUserAgent("Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:70.0) Gecko/20100101 Firefox/70.0"); httpClientBuilder.setConnectionManager(poolingHttpClientConnectionManager); commonHttpClient = httpClientBuilder.build(); } public static String getContent(String url) throws IOException { HttpGet method = new HttpGet(url); CloseableHttpResponse response = commonHttpClient.execute(method); return EntityUtils.toString(response.getEntity()); } public static String postContent(String url,String body) throws IOException { HttpPost method = new HttpPost(url); method.setEntity(new StringEntity(body, ContentType.DEFAULT_TEXT)); CloseableHttpResponse response = commonHttpClient.execute(method); return EntityUtils.toString(response.getEntity()); } public static String postJsonContent(String url,String body) throws IOException { HttpPost method = new HttpPost(url); method.setEntity(new StringEntity(body, ContentType.APPLICATION_JSON)); CloseableHttpResponse response = commonHttpClient.execute(method); return EntityUtils.toString(response.getEntity()); } public static String putJsonContent(String url,String body) throws IOException { HttpPut method = new HttpPut(url); method.setEntity(new StringEntity(body, ContentType.APPLICATION_JSON)); CloseableHttpResponse response = commonHttpClient.execute(method); return EntityUtils.toString(response.getEntity()); } public static String delete(String url) throws IOException { HttpDelete method = new HttpDelete(url); CloseableHttpResponse response = commonHttpClient.execute(method); return EntityUtils.toString(response.getEntity()); } public static String postContent(String url,String body,String contentType) throws IOException { HttpPost method = new HttpPost(url); method.setEntity(new StringEntity(body, ContentType.create(contentType))); CloseableHttpResponse response = commonHttpClient.execute(method); return EntityUtils.toString(response.getEntity()); } }