今日在學習scala和spark相關的知識。以前在eclipse下編寫了wordcount程序。可是關於導出jar包這塊仍是很困惑。因而學習sbt構建scala。html
關於sbt的介紹網上有不少的資料,這裏就不解釋了。參考:http://wiki.jikexueyuan.com/project/sbt-getting-started/install-sbt.htmljava
關於linux下(centos)安裝sbt: 依次執行linux
curl https://bintray.com/sbt/rpm/rpm > bintray-sbt-rpm.repo
sudo mv bintray-sbt-rpm.repo /etc/yum.repos.d/
sudo yum install sbtsql
使用sbt須要按照sbt的要求生成相關的目錄:apache
其中,kafka是項目根目錄,build.sbt的內容以下:centos
name := "test" version := "1.0" scalaVersion := "2.10.5" libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.2" % "provided" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.2" % "provided" libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.2" % "provided" libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.2" //注意版本
scalaVersion是指定編譯程序的scala版本,由於這裏用的是spark1.6.2,因此對應的scala版本爲2.10.5eclipse
libraryDependencies 是指程序的庫依賴,最後的provided 的意思是,spark內已經提供了這幾個庫,打包時,無需考慮這幾個。curl
src是項目源代碼所在位置:ide
KafkaWordCount.scala內容以下:學習
import java.util.HashMap import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.SparkConf object KafkaWordCount { def main(args: Array[String]) { if (args.length < 4) { System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>") System.exit(1) } val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName("KafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)) .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2) wordCounts.print() ssc.start() ssc.awaitTermination() } }
在項目的根目錄,即kafka目錄下面,運行: sbt compile 對項目進行編譯。sbt package 導出jar包。
在spark目錄運行:
./bin/spark-submit --master spark://192.168.1.241:7077 --class KafkaWordCount /root/kafka/target/scala-2.10/test_2.10-1.0.jar 127.0.0.1:2181 2 2 2
這樣運行出錯,提示沒有kafkaUtil這個類,網上查了下,是使用package打包時,並無將依賴的jar包打成一個,所以須要使用assembly插件
而且運行的時候,會有相似錯誤:
Exception in thread "main" org.apache.spark.SparkException:
Checkpoint RDD CheckpointRDD[85] at foreachRDD at WebPagePopularityValueCalculator.scala:68(0)
has different number of partitions than original RDD MapPartitionsRDD[81] at updateStateByKey at WebPagePopularityValueCalculator.scala:62(2)
這是由於在集羣模式運行時,須要將checkpoint文件夾設置爲hdfs相似的路徑。解決方法爲:使用hdfs的路徑: hdfs:ip:9000/data
關於assembly的介紹: http://blog.csdn.net/beautygao/article/details/32306637
參考:http://stackoverflow.com/questions/27198216/sbt-assembly-deduplicate-error-exclude-error
http://blog.csdn.net/ldds_520/article/details/51443606
http://www.cnblogs.com/scnu-ly/p/5106726.html