首先打開spark官網,找一個本身用版本我選的是1.6.3的,而後進入SparkStreaming ,經過搜索這個位置找到Kafka,html
點擊過去會找到一段Scala的代碼 java
import org.apache.spark.streaming.kafka._ val kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
若是想看createStream方法,能夠值經過SparkStreaming中的 Where to go from here 中看到,有Java,Scala,Python的documents選擇本身編碼的一種點擊進去。我這裏用的Scala,點擊KafkaUtils進去後會看到這個類中有不少的方法,其中咱們要找的是createStream方法,看看有哪些重載。咱們把這個方法的解釋賦值過來。apache
最後咱們在IDEA中寫Scala獲取Kafka代碼編碼
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(Constants.SPARK_APP_NAME_PRODUCT)
.getOrCreate()
val map = Map("topic" -> 1)
val ssc = new StreamingContext(spark.sparkContext, Seconds(5))
val createStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, "hadoop01:9092,hadoop02:9092,hadoop03:9092", "groupId", map, StorageLevel.MEMORY_AND_DISK_SER)
val map1: DStream[String] = createStream.map(_._2)
}
簡答的代碼過程,由於還有一些後續的工做要作,因此只是簡單的寫了一些從Kafa獲取數據的代碼從官網查找的一個過程,也是懷着學習的態度與你們一塊兒交流,但願大牛們多多指點。
Create an input stream that pulls messages from Kafka Brokers. Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.api
JavaStreamingContext objectapp
Zookeeper quorum (hostname:port,hostname:port,..)oop
The group id for this consumer學習
Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own threadui
DStream of (Kafka message key, Kafka message value)this