Kafka:ZK+Kafka+Spark Streaming集羣環境搭建(二十)ES6.2.2 Client API

scala版本2.11html

java版本1.8java

spark版本2.2.1node

es版本6.2.2apache

hadoop版本2.9.0json

elasticsearch節點列表:api

192.168.0.120
192.168.0.121
192.168.0.122

內容導航:app

1)首先,講解使用elasticsearch client api講解如何建立(刪除、修改、查詢)index,type,mapping;對數據進行增刪改查。異步

2)而後,講解如何使用在spark下寫入elasticsearch。elasticsearch

3)最後,講解如何讀取kafka上的數據,而後讀取kafka上數據流寫入es。maven

使用elasticsearch client api

Client

Client是一個類,能夠經過該類實現對ES集羣各類操做:index/get/delete/search操做,以及對ES集羣的管理任務。

Client的構造須要基於TransportClient。

TransportClient

TransportClient能夠遠程鏈接到ES集羣,經過一個傳輸模塊,可是它不真正的鏈接到集羣,只是獲取集羣的一個或多個初始傳輸地址,在每次請求動做時,才真正鏈接到ES集羣。

Settgings

Settings類主要是在啓動Client以前,配置一些屬性參數,主要配置集羣名稱cluster name,還有其餘參數:

client.transport.sniff:是否爲傳輸client添加嗅探功能;

client.transport.ignore_cluster_name 設爲true,或略鏈接點的集羣名稱驗證;

client.transport.ping_timeout 設置ping節點的時間超時時長,默認5s;

client.transport.nodes_sample_interval 設置sample/ping nodes listed間隔時間,默認5s。

初始化client的示例以下:

1)ClientTools.java(單利方式提供TransportClient對象,關於如何建立client參考《https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html》)

package com.dx.es;

import java.net.InetAddress;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

public class ClientTools {
    private static ClientTools instance=null;
    private TransportClient client=null;
    
    private ClientTools(){
        this.client=null;
        init();
    }
        
    public static synchronized ClientTools getInstance(){
        if(instance==null){
            instance=new ClientTools();
        }
        return instance;
    }
    
    public TransportClient get(){
        return client;
    }
    
    public void close(){
        if(null != client){
            try {
                client.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    private void init() {
        if(null != this.client){
            return;
        }
        
        try {
            Settings settings = Settings.builder()
                    .put("cluster.name",Config.getInstance().get("cluster.name"))
                    .put("client.transport.sniff", Boolean.valueOf(Config.getInstance().get("client.transport.sniff")))
                    .build();
            
            @SuppressWarnings("unchecked")
            PreBuiltTransportClient preBuiltTransportClient = new PreBuiltTransportClient(settings);
            
            this.client = preBuiltTransportClient;
            this.client.addTransportAddress(new TransportAddress(InetAddress.getByName(Config.getInstance().get("host1")), 9300));
            this.client.addTransportAddress(new TransportAddress(InetAddress.getByName(Config.getInstance().get("host2")), 9300));

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2)(es配置信息管理)

package com.dx.es;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;

public class Config {
    private static Config instance=null;
    private Map<String, String> confItems=null;
    
    private Config(){
        this.confItems=new HashMap<String, String>();
    
        init();
    }
    
    public static synchronized Config getInstance(){
        if(instance==null){
            instance=new Config();
        }
        return instance;
    }

    public String get(String key){
        if(!this.confItems.containsKey(key))
            return null;
        
        return this.confItems.get(key);
    }
    
    private void init() {        
        Properties prop = new Properties();     
        try{
            // 讀取屬性文件conf.properties
            InputStream in = new BufferedInputStream (new FileInputStream("E:\\spark_hadoop_cdh\\workspace\\ES_Client_API\\src\\main\\resources\\conf.properties"));
            // 加載屬性列表
            prop.load(in);     
            Iterator<String> it=prop.stringPropertyNames().iterator();
            while(it.hasNext()){
                String key=it.next();
                System.out.println(key+":"+prop.getProperty(key));
                this.confItems.put(key, prop.getProperty(key));
            }
            in.close();          
        }
        catch(Exception e){
            System.out.println(e);
        }
    }
}
View Code

conf.properties配置內容爲:

cluster.name=es-application
client.transport.sniff=true
es_ip=192.168.0.120
host1=slave1
host2=slave2

Index API

參考:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-index.html#java-docs-index

package com.dx.es;

import java.io.IOException;
import java.util.Date;

import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.rest.RestStatus;

public class ClientAPITest {    
    public static void main(String[] args) {
        TransportClient client =    ClientTools.getInstance().get();
        
        XContentBuilder jsonBuilder=null;
        try {
            jsonBuilder = XContentFactory.jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject();
        } catch (IOException e) {
            e.printStackTrace();
        }
        
        IndexResponse response = client.prepareIndex("twitter","tweet","1")
            .setSource(jsonBuilder)
            .get();
        
        // Index name
        String _index = response.getIndex();
        // Type name
        String _type = response.getType();
        // Document ID (generated or not)
        String _id = response.getId();
        // Version (if it's the first time you index this document, you will get: 1)
        long _version = response.getVersion();
        // status has stored current instance statement.
        RestStatus status = response.status();
        if(status==RestStatus.CREATED){
            System.out.println("success !!!");
        }
        
        client.close();
    }
}

執行後效果,建立了index.type,和一條記錄。

Get API

參考《https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-get.html》

package com.dx.es;

import java.util.Map;

import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.transport.TransportClient;

public class ClientAPITest {
    public static void main(String[] args) {
        TransportClient client = ClientTools.getInstance().get();
        
        GetResponse response = client.prepareGet("twitter", "tweet", "1").get();
        
        Map<String, Object> fields = response.getSource();
        for(Map.Entry<String, Object> kvEntry : fields.entrySet()){
            System.out.println(kvEntry.getKey()+":"+kvEntry.getValue());
        }
        
        client.close();
    }
}

 打印結果:

postDate:2018-08-05T06:48:18.334Z
message:trying out Elasticsearch
user:kimchy

Delete API

參考《https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-delete.html》

package com.dx.es;

import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.rest.RestStatus;

public class ClientAPITest {
    public static void main(String[] args) {
        TransportClient client = ClientTools.getInstance().get();
        
        DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();
        
        if(RestStatus.OK==    response.status()){
            System.out.println("Success ...");
        }
        
        client.close();
    }
}

經過es-head插件查看index.type依然存儲只是數據爲空。

Delete By Query API

參考:《https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-delete-by-query.html#java-docs-delete-by-query》

package com.dx.es;

import java.io.IOException;
import java.util.Date;

import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;

public class ClientAPITest {
    public static void main(String[] args) {
        TransportClient client = ClientTools.getInstance().get();
        create(client);
        
        BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
                .filter(QueryBuilders.matchQuery("gender", "male")) 
                .source("twitter")                                  
                .get();                                             
        long deleted = response.getDeleted();   
        System.out.println(deleted);
        
        client.close();
    }

    private static void create(TransportClient client) {
        XContentBuilder jsonBuilder = null;
        for (int i = 1; i <= 10; i++) {
            try {
                jsonBuilder = XContentFactory.jsonBuilder()
                        .startObject()
                        .field("user", "kimchy"+i)
                        .field("gender", ((i%2==0) ? "male" : "famale"))
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                        .endObject();
            } catch (IOException e) {
                e.printStackTrace();
            }

            IndexResponse response = client.prepareIndex("twitter", "tweet", String.valueOf(i)).setSource(jsonBuilder).get();
        }
    }
}

新增以後查看出記錄:

刪除以後,數據結果:

若是執行一個耗時刪除處理,能夠採用異步方式刪除,使用execute方法替換get,同事提供監聽功能。

package com.dx.es;

import java.io.IOException;
import java.util.Date;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;

public class ClientAPITest {
    public static void main(String[] args) {
        TransportClient client = ClientTools.getInstance().get();
        // create(client);
        
        DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
            .filter(QueryBuilders.matchQuery("gender", "male"))
            .source("twitter")
            .execute(new ActionListener<BulkByScrollResponse>() {
                public void onResponse(BulkByScrollResponse response) {
                    long deleted = response.getDeleted();
                    System.out.println(deleted);
                }

                public void onFailure(Exception e) {
                    // Handle the exception
                    e.printStackTrace();
                }
            });

        try {
            Thread.sleep(60000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        
        client.close();
    }

    private static void create(TransportClient client) {
        XContentBuilder jsonBuilder = null;
        for (int i = 1; i <= 10; i++) {
            try {
                jsonBuilder = XContentFactory.jsonBuilder()
                        .startObject()
                        .field("user", "kimchy"+i)
                        .field("gender", ((i%2==0) ? "male" : "famale"))
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                        .endObject();
            } catch (IOException e) {
                e.printStackTrace();
            }

            IndexResponse response = client.prepareIndex("twitter", "tweet", String.valueOf(i)).setSource(jsonBuilder).get();
        }
    }
}
View Code

Update API

建立UpdateRequest把它發送給client:

package com.dx.es;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentFactory;

public class ClientAPITest {
    public static void main(String[] args)  {
        TransportClient client = ClientTools.getInstance().get();

        UpdateRequest updateRequest = new UpdateRequest();
        updateRequest.index("twitter");
        updateRequest.type("tweet");
        updateRequest.id("1");
        try {
            updateRequest.doc(XContentFactory.jsonBuilder()
                        .startObject()
                        .field("gender", "male")
                        .endObject());
        } catch (IOException e) {
            e.printStackTrace();
        }
        
        try {
            client.update(updateRequest).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }        
        
        client.close();
    }
}

或者使用prepareUpdate()方法:

方式一:

package com.dx.es;

import java.util.HashMap;
import java.util.Map;

import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.script.Script;

public class ClientAPITest {
    public static void main(String[] args) {
        TransportClient client = ClientTools.getInstance().get();

        UpdateRequestBuilder updateRequestBuilder = client.prepareUpdate("twitter", "tweet", "1");
        Map<String, Object> params = new HashMap<String, Object>();
        updateRequestBuilder.setScript(new Script(Script.DEFAULT_SCRIPT_TYPE, Script.DEFAULT_SCRIPT_LANG, "ctx._source.gender = \"female\"",params));
        updateRequestBuilder.get();

        client.close();
    }
}

方式二:

package com.dx.es;

import java.io.IOException;

import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentFactory;

public class ClientAPITest {
    public static void main(String[] args) {
        TransportClient client = ClientTools.getInstance().get();

        try {
            client.prepareUpdate("twitter", "tweet", "1")
                    .setDoc(XContentFactory.jsonBuilder().startObject().field("gender", "male").endObject()).get();
        } catch (IOException e) {
            e.printStackTrace();
        }

        client.close();
    }
}

Update by script

package com.dx.es;

import java.util.concurrent.ExecutionException;

import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.script.Script;

public class ClientAPITest {
    public static void main(String[] args) {
        TransportClient client = ClientTools.getInstance().get();

        UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1")
                .script(new Script("ctx._source.gender = \"female\""));
        try {
            client.update(updateRequest).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        
        client.close();
    }
}

Update by merging documents

package com.dx.es;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentFactory;

public class ClientAPITest {
    public static void main(String[] args) {
        TransportClient client = ClientTools.getInstance().get();

        UpdateRequest updateRequest = null;
        try {
            updateRequest = new UpdateRequest("twitter", "tweet", "1")
                    .doc(
                            XContentFactory.jsonBuilder().startObject()
                                .field("gender", "male")
                                .endObject()
                    );
        } catch (IOException e) {
            e.printStackTrace();
        }
        
        try {
            client.update(updateRequest).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        client.close();
    }
}

Upsert

package com.dx.es;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentFactory;

public class ClientAPITest {
    public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
        TransportClient client = ClientTools.getInstance().get();

        IndexRequest indexRequest = new IndexRequest("twitter", "tweet", "11")
                .source(XContentFactory.jsonBuilder()                
                        .startObject()
                        .field("user", "Joe Smith")
                        .field("gender", "male")
                        .endObject()
                );
        
        UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "11")
                .doc(XContentFactory.jsonBuilder()
                        .startObject()
                        .field("user", "Joe Dalton")
                        .field("gender", "male")
                        .endObject()
                )
                .upsert(indexRequest);
        
        client.update(updateRequest).get();
        
        client.close();
    }
}

備註:若是對應的id數據已經存儲在值則執行update,不然執行index。

Multi Get API

package com.dx.es;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.client.transport.TransportClient;

public class ClientAPITest {
    public static void main(String[] args)  {
        TransportClient client = ClientTools.getInstance().get();

        MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
                .add("twitter", "tweet", "1")           
                .add("twitter", "tweet", "2", "3", "4") 
                .add("twitter", "tweet", "11")          
                .get();

            for (MultiGetItemResponse itemResponse : multiGetItemResponses) { 
                GetResponse response = itemResponse.getResponse();
                if (response.isExists()) {                      
                    String json = response.getSourceAsString(); 
                    System.out.println(json);
                }
            }
        
        client.close();
    }
}

返回打印結果:

{"user":"kimchy1","gender":"male","postDate":"2018-08-05T10:04:26.631Z","message":"trying out Elasticsearch"}
{"user":"kimchy2","gender":"male","postDate":"2018-08-05T10:04:26.673Z","message":"trying out Elasticsearch"}
{"user":"kimchy3","gender":"famale","postDate":"2018-08-05T10:04:26.720Z","message":"trying out Elasticsearch"}
{"user":"kimchy4","gender":"male","postDate":"2018-08-05T10:04:26.730Z","message":"trying out Elasticsearch"}
{"user":"Joe Dalton","gender":"male"}

Bulk API

package com.dx.es;

import java.io.IOException;
import java.util.Date;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentFactory;

public class ClientAPITest {
    public static void main(String[] args) throws IOException  {
        TransportClient client = ClientTools.getInstance().get();
        BulkRequestBuilder bulkRequest = client.prepareBulk();

        // either use client#prepare, or use Requests# to directly build index/delete requests
        bulkRequest.add(client.prepareIndex("twitter", "tweet", "12")
                .setSource(XContentFactory.jsonBuilder()
                            .startObject()
                                .field("user", "auth")
                                .field("postDate", new Date())
                                .field("message", "trying out Elasticsearch")
                            .endObject()
                          )
                );

        bulkRequest.add(client.prepareIndex("twitter", "tweet", "13")
                .setSource(XContentFactory.jsonBuilder()
                            .startObject()
                                .field("user", "judy")
                                .field("postDate", new Date())
                                .field("message", "another post")
                            .endObject()
                          )
                );

        BulkResponse bulkResponse = bulkRequest.get();        
        if (bulkResponse.hasFailures()) {
            // process failures by iterating through each bulk response item
            System.out.println(    bulkResponse.buildFailureMessage());
        }
                
        MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
                .add("twitter", "tweet", "12", "13") 
                .get();

            for (MultiGetItemResponse itemResponse : multiGetItemResponses) { 
                GetResponse response = itemResponse.getResponse();
                if (response.isExists()) {                      
                    String json = response.getSourceAsString(); 
                    System.out.println(json);
                }
            }
        
        client.close();
    }
}

Using Bulk Processor

參考:《https://www.elastic.co/guide/en/elasticsearch/client/java-api/6.2/java-docs-bulk-processor.html#java-docs-bulk-processor》

package com.dx.es;

import java.io.IOException;
import java.util.Date;

import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;

public class ClientAPITest {
    public static void main(String[] args) throws IOException  {
        TransportClient client = ClientTools.getInstance().get();
        
        BulkProcessor bulkProcessor = BulkProcessor.builder(
                client,  
                new BulkProcessor.Listener() {
                    public void beforeBulk(long executionId, BulkRequest request) {
                        
                    }

                    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                        
                    }

                    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                        
                    } 
                })
                .setBulkActions(10000) 
                .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) 
                .setFlushInterval(TimeValue.timeValueSeconds(5)) 
                .setConcurrentRequests(1) 
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) 
                .build();

        bulkProcessor.add(new DeleteRequest("twitter", "tweet", "1"));
        bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));

        bulkProcessor.add(new IndexRequest("twitter", "tweet", "12")
                .source(XContentFactory.jsonBuilder()
                        .startObject()
                        .field("user", "auth")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                        .endObject()
                        )
            );
        bulkProcessor.add(new IndexRequest("twitter", "tweet", "13")
                .source(XContentFactory.jsonBuilder()
                        .startObject()
                        .field("user", "judy")
                        .field("postDate", new Date())
                        .field("message", "another post")
                        .endObject()
                      )
                );

        // Flush any remaining requests
        bulkProcessor.flush();
        // Or close the bulkProcessor if you don't need it anymore
        bulkProcessor.close();

        // Refresh your indices
        client.admin().indices().prepareRefresh().get();
                
        MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
                .add("twitter", "tweet", "1", "2", "12", "13") 
                .get();

            for (MultiGetItemResponse itemResponse : multiGetItemResponses) { 
                GetResponse response = itemResponse.getResponse();
                if (response.isExists()) {                      
                    String json = response.getSourceAsString(); 
                    System.out.println(json);
                }
            }
        
        client.close();
    }
}

 

什麼狀況下重建索引?《Elasticsearch索引管理-reindex重建索引》------字段類型發生變化時須要重建索引。

 

使用在spark下寫入elasticsearch

若是要使用spark相關類(例如:SparkConf)須要引入spark-core,要把RDD相關數據寫入ES須要引入elasticsearch-spark-20_2.11

maven引入以下:

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-20 -->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-20_2.11</artifactId>
            <version>6.2.2</version>
        </dependency>        

代碼實現:

package com.dx.es;

import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

public class JavaEsSpark_Test {
    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf();
        conf.setMaster("local[*]"); // 指定運行模式模式
        conf.setAppName("spark to es");// 設置任務名
        conf.set("es.index.auto.create", "true");// 開啓自動建立索引        
        conf.set("es.nodes", "192.168.0.120,192.168.0.121,192.168.0.122");// es的節點,多個用逗號分隔
        conf.set("es.port", "9200");// 端口號
        JavaSparkContext jsc = new JavaSparkContext(conf);

        Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);
        Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");

        JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
        JavaEsSpark.saveToEs(javaRDD, "spark/docs");
        
        jsc.close();
    }
}

執行以後經過head工具查看是否插入成功。

 

參考:

Es Client Api

https://www.sojson.com/blog/87.html

https://www.sojson.com/blog/88.html

https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html

ES索引存儲原理:

https://blog.csdn.net/cyony/article/details/65437708?locationNum=9&fps=1

寫入ES示例:

http://qindongliang.iteye.com/blog/2372853

相關文章
相關標籤/搜索