Scala建立SparkStreaming獲取Kafka數據代碼過程

正文

  首先打開spark官網,找一個本身用版本我選的是1.6.3的,而後進入SparkStreaming   ,經過搜索這個位置找到Kafka,html

  

    點擊過去會找到一段Scala的代碼    java

     import org.apache.spark.streaming.kafka._

     val kafkaStream = KafkaUtils.createStream(streamingContext, 
       [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

 

     若是想看createStream方法,能夠值經過SparkStreaming中的 Where to go from here 中看到,有Java,Scala,Python的documents選擇本身編碼的一種點擊進去。我這裏用的Scala,點擊KafkaUtils進去後會看到這個類中有不少的方法,其中咱們要找的是createStream方法,看看有哪些重載。咱們把這個方法的解釋賦值過來。apache

    

  defcreateStream(jssc: JavaStreamingContextzkQuorum: String, groupId: String, topics: Map[String, Integer])JavaPairReceiverInputDStream[String, String]

       Create an input stream that pulls messages from Kafka Brokers. Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.api

       jssc

    JavaStreamingContext objectapp

       zkQuorum

    Zookeeper quorum (hostname:port,hostname:port,..)oop

       groupId

    The group id for this consumer學習

       topics

    Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own threadui

       returns

    DStream of (Kafka message key, Kafka message value)this

 

 

    最後咱們在IDEA中寫Scala獲取Kafka代碼編碼

    

  def main(args: Array[String]): Unit = {
     val spark = SparkSession.builder()
    .appName(Constants.SPARK_APP_NAME_PRODUCT)
    .getOrCreate()
     val map = Map("topic" -> 1)
     val ssc = new StreamingContext(spark.sparkContext, Seconds(5))
     val createStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, "hadoop01:9092,hadoop02:9092,hadoop03:9092", "groupId", map, StorageLevel.MEMORY_AND_DISK_SER)
     val map1: DStream[String] = createStream.map(_._2)

  }

  

    

     簡答的代碼過程,由於還有一些後續的工做要作,因此只是簡單的寫了一些從Kafa獲取數據的代碼從官網查找的一個過程,也是懷着學習的態度與你們一塊兒交流,但願大牛們多多指點。

 

            i want to take you to travel ,this is my current mood

相關文章
相關標籤/搜索