Spark機器學習html
模型隨着接收的新消息,不斷更新本身;而不是像離線訓練一次次從新訓練。apache
輸入源:Akka actors、消息隊列、Flume、Kafka、……
app
http://spark.apache.org/docs/latest/streaming-programming-guide.htmldom
類羣(lineage):應用到RDD上的轉換算子和執行算子的集合機器學習
依賴Spark MLlib和Spark Streamingsocket
name := "scala-spark-streaming-app" version := "1.0" scalaVersion := "2.11.7" libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.5.1" libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.5.1"
使用國內鏡像倉庫maven
~/.sbt/repositorieside
[repositories] local osc: http://maven.oschina.net/content/groups/public/ typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly sonatype-oss-releases maven-central sonatype-oss-snapshots
object StreamingProducer { def main(args: Array[String]) { val random = new Random() // Maximum number of events per second val MaxEvents = 6 // Read the list of possible names val namesResource = this.getClass.getResourceAsStream("/names.csv") val names = scala.io.Source.fromInputStream(namesResource) .getLines() .toList .head .split(",") .toSeq // Generate a sequence of possible products val products = Seq( "iPhone Cover" -> 9.99, "Headphones" -> 5.49, "Samsung Galaxy Cover" -> 8.95, "iPad Cover" -> 7.49 ) /** Generate a number of random product events */ def generateProductEvents(n: Int) = { (1 to n).map { i => val (product, price) = products(random.nextInt(products.size)) val user = random.shuffle(names).head (user, product, price) } } // create a network producer val listener = new ServerSocket(9999) println("Listening on port: 9999") while (true) { val socket = listener.accept() new Thread() { override def run = { println("Got client connected from: " + socket.getInetAddress) val out = new PrintWriter(socket.getOutputStream(), true) while (true) { Thread.sleep(1000) val num = random.nextInt(MaxEvents) val productEvents = generateProductEvents(num) productEvents.foreach{ event => out.write(event.productIterator.mkString(",")) out.write("\n") } out.flush() println(s"Created $num events...") } socket.close() } }.start() } } }
sbt run Multiple main classes detected, select one to run: [1] MonitoringStreamingModel [2] SimpleStreamingApp [3] SimpleStreamingModel [4] StreamingAnalyticsApp [5] StreamingModelProducer [6] StreamingProducer [7] StreamingStateApp Enter number: 6
object SimpleStreamingApp { def main(args: Array[String]) { val ssc = new StreamingContext("local[2]", "First Streaming App", Seconds(10)) val stream = ssc.socketTextStream("localhost", 9999) // here we simply print out the first few elements of each batch stream.print() ssc.start() ssc.awaitTermination() } }
sbt run Enter number: 2
閱讀全文請點擊:http://click.aliyun.com/m/8713/學習