./sbin/start-master.sh
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://hadoop000:7077
package com.spark import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Spark Streaming處理Socket數據 * 測試: nc */ object NetworkWordCount { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local").setAppName("NetworkWordCount") /** * 建立StreamingContext須要兩個參數:SparkConf和batch interval */ val ssc = new StreamingContext(sparkConf, Seconds(5)) val lines = ssc.socketTextStream("localhost", 6789) val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) result.print() ssc.start() ssc.awaitTermination() } }
2.生成jar包apache
3.上傳jar包windows
nc -lk 6789
./spark-submit --master local[2] --class com.spark.NetworkWordCount --name NetworkWordCount /home/hadoop/tmp/spark.jar
運行程序,出現下面的錯誤:centos
<dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-scala_2.11</artifactId> <version>2.6.5</version> </dependency>
去maven reposition查找對應的依賴: socket
在這裏,使用1.3.0版本的。 maven
<dependency> <groupId>net.jpountz.lz4</groupId> <artifactId>lz4</artifactId> <version>1.3.0</version> </dependency>
接受數據:ide
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
def fileStream[ K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag ] (directory: String): InputDStream[(K, V)] = { new FileInputDStream[K, V, F](this, directory) }
def socketTextStream( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[String] = withNamedScope("socket text stream") { socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) }