maven依賴:node
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
<scope>provided</scope>
</dependency>sql
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>6.2.0</version>
<exclusions>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>apache
代碼:bootstrap
package com.suning.sevs.bussinessapp
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
import org.slf4j.LoggerFactory
import org.elasticsearch.spark._elasticsearch
//測試kafka
object testKafka {
def main(args: Array[String]): Unit = {
val logger = LoggerFactory.getLogger(testKafka.getClass)maven
val sparkconf = new SparkConf().setAppName("testKafka ")
.set("HADOOP_USER_NAME", 「user」)
.set("HADOOP_GROUP_NAME", "user")
.set("es.nodes", "10.10.2.1,10.10.2.2")
.set("es.port", "9900")
ide
// val spark = SparkSession
// .builder
// .appName("testKafka")
// .config(sparkconf)
// .getOrCreate()
// import spark.implicits._
// val topic = spark.readStream.format("kafka")
// .option("kafka.bootstrap.servers", "10.10.1.245:9092,10.10.1.246:9092")
// .option("subscribe", "mytopic")
// .option("startingOffsets", "latest")
// .option("minPartitions", "2")
// .load()
//
// val query=topic.writeStream.format("console").outputMode(OutputMode.Append()).start()測試
val ssc = new StreamingContext(sparkconf, Seconds(1))ui
val topicsSet = "mytopic".split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> "10.10.1.245:9092,10.10.1.246:9092")
val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
val lines = directKafkaStream.map(_._2)
lines.foreachRDD(rdd=>{
val esRdd=rdd.map(line=>{
Map("sys"->line,"mycode" -> "1")
}
)
esRdd.saveToEs("indexName/typeName")
})
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}