本期內容:java
1,Spark Streaming與Broadcast、Accumulator聯合apache
2,在線黑名單過濾和計算實戰app
廣播能夠自定義,經過Broadcast、Accumulator聯合能夠完成複雜的業務邏輯。socket
如下代碼實如今本機9999端口監聽,並向鏈接上的客戶端發送單詞,其中包含黑名單的單詞Hadoop,Mahout和Hive。ide
package org.scala.opt import java.io.{PrintWriter, IOException} import java.net.{Socket, SocketException, ServerSocket}oop case class ServerThread(socket : Socket) extends Thread("ServerThread") { override def run(): Unit = { val ptWriter = new PrintWriter(socket.getOutputStream) try { var count = 0 var totalCount = 0 var isThreadRunning : Boolean = true val batchCount = 1 val words = List("Java Scala C C++ C# Python JavaScript", "Hadoop Spark Ngix MFC Net Mahout Hive") while (isThreadRunning) { words.foreach(ptWriter.println) count += 1 if (count >= batchCount) { totalCount += count count = 0 println("batch " + batchCount + " totalCount => " + totalCount) Thread.sleep(1000) } //out.println此類中的方法不會拋出 I/O 異常,儘管其某些構造方法可能拋出異常。客戶端可能會查詢調用 checkError() 是否出現錯誤。 if(ptWriter.checkError()) { isThreadRunning = false println("ptWriter error then close socket") } } } catch { case e : SocketException => println("SocketException : ", e) case e : IOException => e.printStackTrace(); } finally { if (ptWriter != null) ptWriter.close() println("Client " + socket.getInetAddress + " disconnected") if (socket != null) socket.close() } println(Thread.currentThread().getName + " Exit") } } object SocketServer { def main(args : Array[String]) : Unit = { try { val listener = new ServerSocket(9999) println("Server is started, waiting for client connect...") while (true) { val socket = listener.accept() println("Client : " + socket.getLocalAddress + " connected") new ServerThread(socket).start() } listener.close() } catch { case e: IOException => System.err.println("Could not listen on port: 9999.") System.exit(-1) } } }spa |
如下代碼實現接收本機9999端口發送的單詞,統計黑名單出現的次數的功能。.net
package com.dt.spark.streaming_scala import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, Accumulator} import org.apache.spark.broadcast.Broadcast /** * 第103課: 動手實戰聯合使用Spark Streaming、Broadcast、Accumulator實如今線黑名單過濾和計數 * 本期內容: 1,Spark Streaming與Broadcast、Accumulator聯合 2,在線黑名單過濾和計算實戰 */ object _103SparkStreamingBroadcastAccumulator { @volatile private var broadcastList : Broadcast[List[String]] = null @volatile private var accumulator : Accumulator[Int] = null def main(args : Array[String]) : Unit = { val conf = new SparkConf().setMaster("local[5]").setAppName("_103SparkStreamingBroadcastAccumulator") val ssc = new StreamingContext(conf, Seconds(5)) ssc.sparkContext.setLogLevel("WARN") /** * 使用Broadcast廣播黑名單到每一個Executor中 */ broadcastList = ssc.sparkContext.broadcast(Array("Hadoop", "Mahout", "Hive").toList) /** * 全局計數器,用於通知在線過濾了多少各黑名單 */ accumulator = ssc.sparkContext.accumulator(0, "OnlineBlackListCounter") ssc.socketTextStream("localhost", 9999).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).foreachRDD {rdd =>{ if (!rdd.isEmpty()) { rdd.filter(wordPair => { if (broadcastList.value.contains(wordPair._1)) {scala println("BlackList word %s appeared".formatted(wordPair._1)) accumulator.add(wordPair._2) false } else { true } }).collect() println("BlackList appeared : %d times".format(accumulator.value)) } }} ssc.start() ssc.awaitTermination() ssc.stop() } }日誌 |
Server發送端日誌以下,不斷打印輸出的次數。
Spark Streaming端打印黑名單的單詞及出現的次數。