Spark Streaming和Kafka整合之路(最新版本)

最近完成了Spark Streaming和Kafka的整合工做,耗時雖然不長,可是當中仍是遇到了很多的坑,記錄下來,你們方便繞行。html

先說一下環境:git

Spark 2.0.0    kafka_2.11-0.10.0.0github

以前的項目當中,已經在pom當中添加了須要的Spark Streaming的依賴,此次只須要添加Spark Streaming Kafka的以來就好了,問題來了。首先是我以前添加的Spark Streaming的依賴:apache

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.0.0</version>
    </dependency>bootstrap

而後是找到的spark streaming對kafka的支持依賴:測試

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.11</artifactId>
    <version>1.6.2</version>
</dependency>ui

請注意2個version部分,好像差的有點多。無論了,照着例子寫寫看,果真報了各類class not found的錯誤。基本能夠判斷是版本差別形成的問題。spa

但是,在http://mvnrepository.com上找不到更高版本的依賴怎麼辦呢?scala

考慮了一下,只有一個辦法了,下載spark源碼,自行編譯打包須要的jar包。server

在github上找到spark項目,clone下來,懶病又犯了,也沒仔細看當中的說明,直接就clean compile等等。結果又是各類報錯。好吧,好好看看吧,github上給了個地址:http://spark.apache.org/docs/latest/building-spark.html,照着作就沒問題了。

而後把項目當中pom裏面對streaming kafka的依賴刪掉,引入咱們本身生成的jar包:

spark-streaming-kafka-0-10_2.11-2.1.0-SNAPSHOT.jar

 

而後貼上代碼:

    val conf = new SparkConf().setAppName("kafkastream").setMaster("spark://master:7077").
      set("spark.driver.host", "192.168.1.142").
      setJars(List("/src/git/msgstream/out/artifacts/msgstream_jar/msgstream.jar",
        "/src/git/msgstream/lib/kafka-clients-0.10.0.0.jar",
        "/src/git/msgstream/lib/kafka_2.11-0.10.0.0.jar",
        "/src/git/msgstream/lib/spark-streaming-kafka-0-10_2.11-2.1.0-SNAPSHOT.jar"))
    val ssc = new StreamingContext(conf, Seconds(2))

    val topics = List("woozoom")
    val kafkaParams = Map(("bootstrap.servers", "master:9092,slave01:9092,slave02:9092"),
      ("group.id", "sparkstreaming"), ("key.deserializer", classOf[StringDeserializer]),
      ("value.deserializer", classOf[StringDeserializer]))
    val preferredHosts = LocationStrategies.PreferConsistent
    val offsets = Map(new TopicPartition("woozoom", 0) -> 2L)

    val lines = KafkaUtils.createDirectStream[String, String](
      ssc,
      preferredHosts,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets))

    lines.foreachRDD(rdd => {
      rdd.foreach(x => {
        println(x)
      })
    })

    ssc.start()
    ssc.awaitTermination()

上面標紅的部分,是須要注意的,而這些原本我也是不會寫的,後來去到spark源碼找到test代碼

/src/git/spark/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala

測試,經過!!!

 

總結:

一、spark項目不少時候,資源不是很充分,想找例子的話,2個途徑,一個spark安裝包當中的example可是這個不少時候,版本是比較老的,不是很理想。更好地是從spark源碼當中找他的測試用例,這個基本上和你用的最新版本是徹底匹配的。

二、編譯過不少開源項目,通常大的項目都會有相應的build說明,照着那個作,會爲你節省不少時間。

三、從最開始遇到的版本號的問題來看,不少時候咱們遇到的問題並不必定是咱們本身的問題,不迷信,大膽的相信本身的推測,很是有助於問題的解決。

相關文章
相關標籤/搜索