Elasticsearch升級數據遷移批量流式操做

1、簡介

最近Elasticsearch升級,準備從5.2.2升級到最新的7.4.2。html

膽子敢這麼肥的一個重要的緣由是由於這個業務部分的Elasticsearch集羣數據量不大,不到200G。java

瞭解了一下elasticdump,最後放棄了,決定本身寫代碼來遷移。apache

由於rest client高版本的和低版本的transport client的兼容問題,最後決定,讀數據使用tranport client,高版本寫數據本身封裝請求而後使用http方式去執行。json

2、bulk請求

bulk能夠包含索引添加(index)、建立(create)、刪除(delete)、更新(update)四種操做。app

index和create的區別是,index操做確定會被執行,id相同的時候會增長文檔的版本號。而create操做在id已經存在的時候就不會執行。elasticsearch

建議通常使用index操做,甚至更新內容比較多均可以考慮使用index來代替update。ide

bulk的語法格式以下。工具

POST _bulkpost

{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }

首先是要執行的操做,index、create、delete、update,必須包含索引名稱,對於delete和update操做id也是必須的。測試

接下來就是文檔的具體內容,4個操做中update稍微有點不一樣,文檔須要使用doc標識一下。

有2點須要注意:

  1. 最後須要一個空行
  2. content-type必須是application/x-ndjson

3、流式bulk操做

import java.io.IOException;

/**
 * elasticsearch bulk 請求
 * https://www.elastic.co/guide/en/elasticsearch/reference/7.4/docs-bulk.html
 */
public class BulkBuilder {

    private static final String INDEX_TEMPLATE_WITH_ID = "{ \"%s\" : { \"_index\" : \"%s\", \"_id\" : \"%s\" } }";
    private static final String INDEX_TEMPLATE = "{ \"%s\" : { \"_index\" : \"%s\"} }";
    private static final String DOC_TEMPLATE = "{ \"doc\" : %s }";

    private static final String INDEX = "index";
    private static final String CREATE = "create";
    private static final String DELETE = "delete";
    private static final String UPDATE = "update";

    /**
     * 換行、容許是\n或者\r\n
     */
    private static final String NEWLINE = "\n";

    /**
     * bulk請求的content-type
     */
    public static final String CONTENT_TYPE = "application/x-ndjson";

    String BULK_URL = "http://%s/_bulk";


    private StringBuffer body;

    private BulkBuilder(){
        body = new StringBuffer();
    }

    public static BulkBuilder getBuilder(){
        return new BulkBuilder();
    }

    public BulkBuilder index(String index, String id, String docJson){
        String head = String.format(INDEX_TEMPLATE_WITH_ID, INDEX, index, id);
        body.append(head).append(NEWLINE);
        body.append(docJson).append(NEWLINE);
        return this;
    }

    public BulkBuilder index(String index, String docJson){
        String head = String.format(INDEX_TEMPLATE, INDEX, index);
        body.append(head).append(NEWLINE);
        body.append(docJson).append(NEWLINE);
        return this;
    }

    public BulkBuilder update(String index, String id, String docJson){
        String head = String.format(INDEX_TEMPLATE_WITH_ID, UPDATE, index, id);
        body.append(head).append(NEWLINE);
        String updateBody = String.format(DOC_TEMPLATE, docJson);
        body.append(updateBody).append(NEWLINE);
        return this;
    }

    public BulkBuilder create(String index, String id, String docJson){
        String head = String.format(INDEX_TEMPLATE_WITH_ID, CREATE, index, id);
        body.append(head).append(NEWLINE);
        body.append(docJson).append(NEWLINE);
        return this;
    }

    public BulkBuilder create(String index, String docJson){
        String head = String.format(INDEX_TEMPLATE, CREATE, index);
        body.append(head).append(NEWLINE);
        body.append(docJson).append(NEWLINE);
        return this;
    }

    public BulkBuilder delete(String index, String id){
        String head = String.format(INDEX_TEMPLATE_WITH_ID, DELETE, index, id);
        body.append(head).append(NEWLINE);
        return this;
    }

    public String build(){
        return body.toString();
    }

    public String postBulk(String host) throws IOException {
        String url = String.format(BULK_URL, host);
        if(body.length() > 15){//body爲空的時候不執行請求
            return HttpUtil.postContent(url,body.toString(),CONTENT_TYPE);
        }else {
            return "";
        }
    }
}

上面的HttpUtil可使用後面給的Http工具,也可使用Spring的RestTemplate,或者使用JDK自帶的HttpConnection均可以。

使用起來也很是方便,下面給一個測試:

@Test
void update() throws IOException {
    BulkBuilder builder = BulkBuilder.getBuilder();
    JSONObject jsonObject = new JSONObject();
    jsonObject.put("name","youyou");
    jsonObject.put("age",20);
    builder.index("test","1",jsonObject.toString());
    JSONObject update = new JSONObject();
    update.put("age","200");
    builder.update("test","1",update.toJSONString());
    String rs = builder.postBulk("127.0.0.1:9200");
    System.out.println(rs);
}

4、http工具

import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.*;
import org.apache.http.config.ConnectionConfig;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.Charset;


public class HttpUtil {

    private static Logger logger = LoggerFactory.getLogger(HttpUtil.class);

    private static final CloseableHttpClient commonHttpClient ;

    static {
        HttpClientBuilder httpClientBuilder = HttpClients.custom();
        RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
        requestConfigBuilder.setSocketTimeout(3000);
        requestConfigBuilder.setConnectTimeout(3000);
        requestConfigBuilder.setConnectionRequestTimeout(3000);
        httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());

        PoolingHttpClientConnectionManager poolingHttpClientConnectionManager = new PoolingHttpClientConnectionManager();
        ConnectionConfig.Builder connectionConfigBuilder = ConnectionConfig.custom();
        ConnectionConfig connectionConfig = connectionConfigBuilder.setCharset(Charset.defaultCharset()).setBufferSize(4096).build();
        httpClientBuilder.setDefaultConnectionConfig(connectionConfig);
        httpClientBuilder.setUserAgent("Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:70.0) Gecko/20100101 Firefox/70.0");
        httpClientBuilder.setConnectionManager(poolingHttpClientConnectionManager);
        commonHttpClient = httpClientBuilder.build();
    }

    public static String getContent(String url) throws IOException {
        HttpGet method = new HttpGet(url);
        CloseableHttpResponse response = commonHttpClient.execute(method);
        return EntityUtils.toString(response.getEntity());
    }

    public static String postContent(String url,String body) throws IOException {
        HttpPost method = new HttpPost(url);
        method.setEntity(new StringEntity(body, ContentType.DEFAULT_TEXT));
        CloseableHttpResponse response = commonHttpClient.execute(method);
        return EntityUtils.toString(response.getEntity());
    }

    public static String postJsonContent(String url,String body) throws IOException {
        HttpPost method = new HttpPost(url);
        method.setEntity(new StringEntity(body, ContentType.APPLICATION_JSON));
        CloseableHttpResponse response = commonHttpClient.execute(method);
        return EntityUtils.toString(response.getEntity());
    }

    public static String putJsonContent(String url,String body) throws IOException {
        HttpPut method = new HttpPut(url);
        method.setEntity(new StringEntity(body, ContentType.APPLICATION_JSON));
        CloseableHttpResponse response = commonHttpClient.execute(method);
        return EntityUtils.toString(response.getEntity());
    }

    public static String delete(String url) throws IOException {
        HttpDelete method = new HttpDelete(url);
        CloseableHttpResponse response = commonHttpClient.execute(method);
        return EntityUtils.toString(response.getEntity());
    }

    public static String postContent(String url,String body,String contentType) throws IOException {
        HttpPost method = new HttpPost(url);
        method.setEntity(new StringEntity(body, ContentType.create(contentType)));
        CloseableHttpResponse response = commonHttpClient.execute(method);
        return EntityUtils.toString(response.getEntity());
    }
}
相關文章
相關標籤/搜索