不使用es-hadoop的saveToES,與scala版本衝突問題太多。
不使用bulkprocessor,異步提交,es容易oom,速度反而不快。
使用BulkRequestBuilder同步提交。java
主要代碼node
public static void main(String[] args){ System.setProperty("hadoop.home.dir", "D:\\hadoop"); System.setProperty("es.set.netty.runtime.available.processors", "false"); SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SendRecord"); conf.set("spark.streaming.backpressure.enabled", "true"); conf.set("spark.streaming.receiver.maxRate", "1000"); conf.set("spark.streaming.kafka.maxRatePerPartition", "1000"); conf.set("es.nodes", "eshost"); conf.set("es.port", "9200"); JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(2)); Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put("bootstrap.servers", "kafkahost:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "sparkGroup4"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false); Collection<String> topics = Arrays.asList("users"); JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream (ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)); JavaDStream<User> kafkaDStream = stream.map(new Function<ConsumerRecord<String, String>, User>() { @Override public User call(ConsumerRecord<String, String> record) throws Exception { Gson gson = new Gson(); return gson.fromJson(record.value(), User.class); } }); kafkaDStream.foreachRDD(new VoidFunction<JavaRDD<User>>() { @Override public void call(JavaRDD<User> userJavaRDD) throws Exception { userJavaRDD.foreachPartition(new VoidFunction<Iterator<User>>() { @Override public void call(Iterator<User> userIterator) throws Exception { TransportClient client = ESClient.getClient(); BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); Map<String, Object> map = new HashMap<>(); while(userIterator.hasNext()){ User user = userIterator.next(); map.put("name", user.getName()); map.put("age", user.getAge()); map.put("desc", user.getDescription()); IndexRequest request = client.prepareIndex("users", "info").setSource(map).request(); bulkRequestBuilder.add(request); } if(bulkRequestBuilder.numberOfActions() > 0){ BulkResponse bulkItemResponses = bulkRequestBuilder.execute().actionGet(); } } }); } }); ssc.start(); try { // Wait for the computation to terminate. ssc.awaitTermination(); } catch (InterruptedException e) { e.printStackTrace(); } }
ESClient:bootstrap
public class ESClient { public static TransportClient getClient(){ return Holder.client; } private static class Holder{ private static TransportClient client; static{ try { Settings setting = Settings.builder() .put("cluster.name", "es") .put("client.transport.sniff", false) .put("client.transport.ping_timeout", "60s") .put("client.transport.nodes_sampler_interval", "60s") .build(); client = new PreBuiltTransportClient(setting); client.addTransportAddress(new TransportAddress(new InetSocketAddress("eshost",9300))); } catch (Exception e) { System.out.println(e.getMessage()); } } } }