Spark2.0流式處理讀Kafka並寫ES

maven依賴:node

       <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.0</version>
            <scope>provided</scope>            
        </dependency>sql

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.1.0</version>
            <scope>provided</scope>
        </dependency>
        
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.0</version>
            <scope>provided</scope>
        </dependency>
        
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-20_2.11</artifactId>
            <version>6.2.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>log4j-over-slf4j</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>            
         </dependency>apache

代碼:bootstrap

package com.suning.sevs.bussinessapp

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
import org.slf4j.LoggerFactory
import org.elasticsearch.spark._elasticsearch

//測試kafka
object testKafka {
  def main(args: Array[String]): Unit = {
    val logger = LoggerFactory.getLogger(testKafka.getClass)maven

    val sparkconf = new SparkConf().setAppName("testKafka ")
      .set("HADOOP_USER_NAME", 「user」)
      .set("HADOOP_GROUP_NAME", "user")
            .set("es.nodes", "10.10.2.1,10.10.2.2")
            .set("es.port", "9900")
    ide

    //     val spark = SparkSession
    //                 .builder
    //                 .appName("testKafka")
    //                 .config(sparkconf)
    //                 .getOrCreate()
    //       import spark.implicits._
    //        val topic = spark.readStream.format("kafka")
    //     .option("kafka.bootstrap.servers", "10.10.1.245:9092,10.10.1.246:9092")
    //    .option("subscribe", "mytopic")     
    //    .option("startingOffsets", "latest")  
    //    .option("minPartitions", "2") 
    //    .load()
    //    
    //    val query=topic.writeStream.format("console").outputMode(OutputMode.Append()).start()測試

    val ssc = new StreamingContext(sparkconf, Seconds(1))ui

    val topicsSet = "mytopic".split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "10.10.1.245:9092,10.10.1.246:9092")
    val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    val lines = directKafkaStream.map(_._2)

    lines.foreachRDD(rdd=>{
      
      val esRdd=rdd.map(line=>{
        Map("sys"->line,"mycode" -> "1")
      }
      )
      esRdd.saveToEs("indexName/typeName")
        
    })
  
    // Start the computation
    ssc.start()
    ssc.awaitTermination()

  }

}

相關文章
相關標籤/搜索