Spark Streaming筆記整理(二):案例、SSC、數據源與自定義Receiver

實時WordCount案例

主要是監聽網絡端口中的數據,並實時進行wc的計算。javascript

Java版

測試代碼以下:php

package cn.xpleaf.bigdata.spark.java.streaming.p1; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import java.util.Arrays; /** * 使用Java開發SparkStreaming的第一個應用程序 * * 用於監聽網絡socket中的一個端口,實時獲取對應的文本內容 * 計算文本內容中的每個單詞出現的次數 */ public class _01SparkStreamingNetWorkWCOps { public static void main(String[] args) { if(args == null || args.length < 2) { System.err.println("Parameter Errors! Usage: <hostname> <port>"); System.exit(-1); } Logger.getLogger("org.apache.spark").setLevel(Level.OFF); SparkConf conf = new SparkConf() .setAppName(_01SparkStreamingNetWorkWCOps.class.getSimpleName()) /* * 設置爲local是沒法計算數據,可是可以接收數據 * 設置爲local[2]是既能夠計算數據,也能夠接收數據 * 當master被設置爲local的時候,只有一個線程,且只能被用來接收外部的數據,因此不可以進行計算,如此便不會作對應的輸出 * 因此在使用的本地模式時,同時是監聽網絡socket數據,線程個數必須大於等於2 */ .setMaster("local[2]"); /** * 第二個參數:Duration是SparkStreaming用於進行採集多長時間段內的數據將其拆分紅一個個batch * 該例表示每隔2秒採集一次數據,將數據打散成一個個batch(其實就是SparkCore中的一個個RDD) */ JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(2)); String hostname = args[0].trim(); int port = Integer.valueOf(args[1].trim()); JavaReceiverInputDStream<String> lineDStream = jsc.socketTextStream(hostname, port);// 默認的持久化級別StorageLevel.MEMORY_AND_DISK_SER_2 JavaDStream<String> wordsDStream = lineDStream.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); JavaPairDStream<String, Integer> pairsDStream = wordsDStream.mapToPair(word -> { return new Tuple2<String, Integer>(word, 1); }); JavaPairDStream<String, Integer> retDStream = pairsDStream.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); retDStream.print(); // 啓動流式計算 jsc.start(); // 等待執行結束 jsc.awaitTermination(); System.out.println("結束了沒有呀,哈哈哈~"); jsc.close(); } }

啓動程序,同時在主機上使用nc命令進行操做:java

[uplooking@uplooking01 ~]$ nc -lk 4893
hello youe hello he hello me

輸出結果以下:shell

------------------------------------------- Time: 1525929096000 ms ------------------------------------------- (youe,1) (hello,3) (me,1) (he,1)

同時也能夠在Spark UI上查看相應的做業執行狀況:apache

Spark Streaming筆記整理(二):案例、SSC、數據源與自定義Receiver

能夠看到,每2秒就會執行一次計算,即每隔2秒採集一次數據,將數據打散成一個個batch(其實就是SparkCore中的一個個RDD)。api

Scala版

測試代碼以下:網絡

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object _01SparkStreamingNetWorkOps {
    def main(args: Array[String]): Unit = { if (args == null || args.length < 2) { System.err.println( """Parameter Errors! Usage: <hostname> <port> |hostname: 監聽的網絡socket的主機名或ip地址 |port: 監聽的網絡socket的端口 """.stripMargin) System.exit(-1) } Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_01SparkStreamingNetWorkOps.getClass.getSimpleName) .setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(2)) val hostname = args(0).trim val port = args(1).trim.toInt val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port) val wordsDStream:DStream[String] = linesDStream.flatMap({case line => line.split(" ")}) val pairsDStream:DStream[(String, Integer)] = wordsDStream.map({case word => (word, 1)}) val retDStream:DStream[(String, Integer)] = pairsDStream.reduceByKey{case (v1, v2) => v1 + v2} retDStream.print() ssc.start() ssc.awaitTermination() ssc.stop() // stop中的boolean參數,設置爲true,關閉該ssc對應的SparkContext,默認爲false,只關閉自身 } }

啓動程序,同時在主機上使用nc命令進行操做:app

[uplooking@uplooking01 ~]$ nc -lk 4893
hello youe hello he hello me

輸出結果以下:socket

------------------------------------------- Time: 1525929574000 ms ------------------------------------------- (youe,1) (hello,3) (me,1) (he,1)

StreamingContext和DStream詳解

Spark Streaming筆記整理(二):案例、SSC、數據源與自定義Receiver

StreamingContext的建立方式

一、在Spark中有兩種建立StreamingContext的方式tcp

1)根據SparkConf進行建立

val conf = new SparkConf().setAppName(appname).setMaster(master); val ssc = new StreamingContext(conf, Seconds(10));

2)還能夠根據SparkContext進行建立

val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(10));

appname,是用來在Spark UI上顯示的應用名稱。master,是一個Spark、Mesos或者Yarn集羣的URL,或者是local[*]。

二、batch interval:Seconds(10)能夠根據咱們本身應用程序的狀況進行不一樣的設置。

StreamingContext的建立、啓動和銷燬

1、一個StreamingContext定義以後,必須執行如下程序進行實時計算的執行

一、建立輸入DStream來建立輸入不一樣的數據源。

二、對DStream定義transformation和output等各類算子操做,來定義咱們須要的各類實時計算邏輯。

三、調用StreamingContext的start()方法,進行啓動咱們的實時處理數據。

四、調用StreamingContext的awaitTermination()方法,來等待應用程序的終止。可使用CTRL+C手動中止,或者就是讓它持續不斷的運行進行計算。

五、也能夠經過調用StreamingContext的stop()方法,來中止應用程序。

2、備註(十分重要)

一、只要咱們一個StreamingContext啓動以後,咱們就不能再往這個Application其中添加任何計算邏輯了。好比執行start()方法以後,還給某個DStream執行一個算子,這是不容許的。

二、一個StreamingContext中止以後,是確定不可以重啓的。調用stop()以後,不能再調用start()

三、必須保證一個JVM同時只能有一個StreamingContext啓動。在你的應用程序中,不能建立兩個StreamingContext。

四、調用stop()方法時,會同時中止內部的SparkContext,若是不但願如此,還但願後面繼續使用SparkContext建立其餘類型的Context,好比SQLContext,那麼就用stop(false)。

五、一個SparkContext能夠建立多個StreamingContext,只要上一個先用stop(false)中止,再建立下一個便可。(注意與第2點的區別,這裏是再建立了一個StreamingContext)

輸入DStream和Receiver

Spark Streaming筆記整理(二):案例、SSC、數據源與自定義Receiver

輸入DStream表明了來自數據源的輸入數據流。咱們以前作過了一些例子,好比從文件讀取、從TCP、從HDFS讀取等。每一個DSteam都會綁定一個Receiver對象,該對象是一個關鍵的核心組件,用來從咱們的各類數據源接受數據,並將其存儲在Spark的內存當中,這個內存的StorageLevel,咱們能夠本身進行指定,老師在之後的例子中會講解這部分。

Spark Streaming提供了兩種內置的數據源支持:

一、基礎數據源:SSC API中直接提供了對這些數據源的支持,好比文件、tcp socket、Akka Actor等。

二、高級數據源:好比Kafka、Flume、Kinesis和Twitter等數據源,要引入第三方的JAR來完成咱們的工做。

三、自定義數據源:好比咱們的ZMQ、RabbitMQ、ActiveMQ等任何格式的自定義數據源。關於自定義數據源,老師在講解最後一個項目的時候,會使用此自定義數據源若是從ZMQ中讀取數據。官方提供的Spark-ZMQ是基於zmq2.0版本的,由於如今的 生產環境都是基於ZMQ4以上的版本了,因此必須本身定義並實現了一個自定義的receiver機制。

Spark Streaming的運行機制local[*]分析

一、若是咱們想要在咱們的Spark Streaming應用中並行讀取N多數據的話,咱們能夠啓動建立多個DStream。這樣子就會建立多個Receiver,老師最多的一個案例是啓動了128個Receive,每一個receiver每秒的數據是1000W以上。

二、可是要注意的是,咱們Spark Streaming Application的Executor進程,是個長時間運行的一個進程,所以它會獨佔分給他的cpu core。因此它只能本身處理這件事情了,不能再幹其餘活了。

三、使用本地模式local運行咱們的Spark Streaming程序時,絕對不能使用local或者 local[1]的模式。由於Spark Streaming運行的時候,必需要至少要有2個線程。若是隻給了一條的話,Spark Streaming Application程序會直接hang在哪兒。 兩條線程的一條用來分配給Receiver接收數據,另一條線程用來處理接受到的數據。所以咱們想要進行本地測試的話,必須知足local[N],這個N必定要大於2

四、若是咱們想要在咱們的Spark進羣上運行的話,那麼首先,必需要求咱們的集羣每一個節點上,有>1個cpu core。其次,給Spark Streaming的每一個executor分配的core,必須>1,這樣,才能保證分配到executor上運行的輸入DStream,兩條線程並行,一條運行Receiver,接收數據;一條處理數據。不然的話,只會接收數據,不會處理數據。

DStream與HDFS集成

輸入DFStream基礎數據源

基於HDFS文件的實時計算,其實就是監控咱們的一個HDFS目錄,只要其中有新文件出現,就實時處理。至關於處理實時的文件流。

===》Spark Streaming會監視指定的HDFS目錄,而且處理出如今目錄中的文件。

1)在HDFS中的全部目錄下的文件,必須知足相同的格式,否則的話,不容易處理。必須使用移動或者重命名的方式,將文件移入目錄。一旦處理以後,文件的內容及時改變,也不會再處理了。

2)基於HDFS的數據結源讀取是沒有receiver的,所以不會佔用一個cpu core。

3)實際上在下面的測試案例中,一直也沒有效果,也就是監聽不到HDFS中的文件,本地文件也沒有效果;

基於HDFS的實時WordCounter案例實戰

測試代碼以下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext} /** * SparkStreaming監聽hdfs的某一個目錄的變化(新增文件) */ object _02SparkStreamingHDFSOps { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_02SparkStreamingHDFSOps.getClass.getSimpleName) .setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(5)) val linesDStream:DStream[String] = ssc.textFileStream("hdfs://ns1/input/spark/streaming/") // val linesDStream:DStream[String] = ssc.textFileStream("D:/data/spark/streaming") linesDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() ssc.stop() } }

DStream與Kafka集成(基於Receiver方式)

Spark與Kafka集成的方式

一、利用Kafka的Receiver方式進行集成

二、利用Kafka的Direct方式進行集成

Spark-Streaming獲取kafka數據的兩種方式-Receiver與Direct的方式,能夠從代碼中簡單理解成Receiver方式是經過zookeeper來鏈接kafka隊列,Direct方式是直接鏈接到kafka的節點上獲取數據了。

基於Kafka的Receiver方式集成

這種方式使用Receiver來獲取數據。Receiver是使用Kafka的高層次Consumer API來實現的。receiver從Kafka中獲取的數據都是存儲在Spark Executor的內存中的,而後Spark Streaming啓動的job會去處理那些數據。然而,在默認的配置下,這種方式可能會由於底層的失敗而丟失數據。若是要啓用高可靠機制,讓數據零丟失,就必須啓用Spark Streaming的預寫日誌機制(Write Ahead Log,WAL)。該機制會同步地將接收到的Kafka數據寫入分佈式文件系統(好比HDFS)上的預寫日誌中。因此,即便底層節點出現了失敗,也可使用預寫日誌中的數據進行恢復。

補充說明:

(1)、Kafka中的topic的partition,與Spark中的RDD的partition是沒有關係的。因此,在KafkaUtils.createStream()中,提升partition的數量,只會增長一個Receiver中,讀取partition的線程的數量。不會增長Spark處理數據的並行度。

(2)、能夠建立多個Kafka輸入DStream,使用不一樣的consumer group和topic,來經過多個receiver並行接收數據。

(3)、若是基於容錯的文件系統,好比HDFS,啓用了預寫日誌機制,接收到的數據都會被複制一份到預寫日誌中。所以,在KafkaUtils.createStream()中,設置的持久化級別是StorageLevel.MEMORY_AND_DISK_SER。

與Kafka的集成--Maven

<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.6.2</version> </dependency>

Kafka啓動、驗證和測試

啓動kafka服務

kafka-server-start.sh -daemon config/server.properties

建立topic

kafka-topics.sh --create --topic spark-kafka --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 --partitions 3 --replication-factor 3

列舉kafka中已經建立的topic

kafka-topics.sh --list --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181

列舉每一個節點都保護那些topic、Partition

kafka-topics.sh --describe --zookeeper uplooking01:2181, uplooking02:2181, uplooking03:21821 --topic spark-kafka
  leader:負責處理消息的讀和寫,leader是從全部節點中隨機選擇的.
  replicas:列出了全部的副本節點,無論節點是否在服務中.
  isr:是正在服務中的節點.

產生數據

kafka-console-producer.sh --topic spark-kafka --broker-list uplooking01:9092,uplooking02:9092,uplooking03:9092

消費數據

kafka-console-consumer.sh --topic spark-kafka --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181

案例

測試代碼以下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Kafka和SparkStreaming基於Receiver的模式集成 */ object _03SparkStreamingKafkaReceiverOps { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_03SparkStreamingKafkaReceiverOps.getClass.getSimpleName) .setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(5)) // ssc.checkpoint("hdfs://ns1/checkpoint/streaming/kafka") // checkpoint文件保存到hdfs中 ssc.checkpoint("file:///D:/data/spark/streaming/checkpoint/streaming/kafka") // checkpoint文件保存到本地文件系統 /** * 使用Kafka Receiver的方式,來建立的輸入DStream,須要使用SparkStreaming提供的Kafka整合API * KafkaUtils */ val zkQuorum = "uplooking01:2181,uplooking02:2181,uplooking03:2181" val groupId = "kafka-receiver-group-id" val topics:Map[String, Int] = Map("spark-kafka"->3) // ReceiverInputDStream中的key就是當前一條數據在kafka中的key,value就是該條數據對應的value val linesDStream:ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics) val retDStream = linesDStream.map(t => t._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_) retDStream.print() ssc.start() ssc.awaitTermination() ssc.stop() } }

在kafka中生產數據:

[uplooking@uplooking02 kafka]$ kafka-console-producer.sh --topic spark-kafka --broker-list uplooking01:9092,uplooking02:9092,uplooking03:9092
hello you hello he hello me

輸出結果以下:

------------------------------------------- Time: 1525965130000 ms ------------------------------------------- (hello,3) (me,1) (you,1) (he,1)

在上面的代碼中,還啓用了Spark Streaming的預寫日誌機制(Write Ahead Log,WAL)。

若是數據保存在本地文件系統,則以下:

Spark Streaming筆記整理(二):案例、SSC、數據源與自定義Receiver

若是數據保存在HDFS中,則以下:

Spark Streaming筆記整理(二):案例、SSC、數據源與自定義Receiver

DStream與Kafka集成(基於Direct方式)

Spark和Kafka集成Direct的特色

(1)Direct的方式是會直接操做kafka底層的元數據信息,這樣若是計算失敗了,能夠把數據從新讀一下,從新處理。即數據必定會被處理。拉數據,是RDD在執行的時候直接去拉數據。

(2)因爲直接操做的是kafka,kafka就至關於你底層的文件系統。這個時候能保證嚴格的事務一致性,即必定會被處理,並且只會被處理一次。而Receiver的方式則不能保證,由於Receiver和ZK中的數據可能不一樣步,Spark Streaming可能會重複消費數據,這個調優能夠解決,但顯然沒有Direct方便。而Direct api直接是操做kafka的,spark streaming本身負責追蹤消費這個數據的偏移量或者offset,而且本身保存到checkpoint,因此它的數據必定是同步的,必定不會被重複。即便重啓也不會重複,由於checkpoint了,可是程序升級的時候,不能讀取原先的checkpoint,面對升級checkpoint無效這個問題,怎麼解決呢?升級的時候讀取我指定的備份就能夠了,即手動的指定checkpoint也是能夠的,這就再次完美的確保了事務性,有且僅有一次的事務機制。那麼怎麼手動checkpoint呢?構建SparkStreaming的時候,有getorCreate這個api,它就會獲取checkpoint的內容,具體指定下這個checkpoint在哪就行了。

(3)因爲底層是直接讀數據,沒有所謂的Receiver,直接是週期性(Batch Intervel)的查詢kafka,處理數據的時候,咱們會使用基於kafka原生的Consumer api來獲取kafka中特定範圍(offset範圍)中的數據。這個時候,Direct Api訪問kafka帶來的一個顯而易見的性能上的好處就是,若是你要讀取多個partition,Spark也會建立RDD的partition,這個時候RDD的partition和kafka的partition是一致的。而Receiver的方式,這2個partition是沒任何關係的。這個優點是你的RDD,其實本質上講在底層讀取kafka的時候,kafka的partition就至關於原先hdfs上的一個block。這就符合了數據本地性。RDD和kafka數據都在這邊。因此讀數據的地方,處理數據的地方和驅動數據處理的程序都在一樣的機器上,這樣就能夠極大的提升性能。不足之處是因爲RDD和kafka的patition是一對一的,想提升並行度就會比較麻煩。提升並行度仍是repartition,即從新分區,由於產生shuffle,很耗時。這個問題,之後也許新版本能夠自由配置比例,不是一對一。由於提升並行度,能夠更好的利用集羣的計算資源,這是頗有意義的。

(4)不須要開啓wal機制,從數據零丟失的角度來看,極大的提高了效率,還至少能節省一倍的磁盤空間。從kafka獲取數據,比從hdfs獲取數據,由於zero copy的方式,速度確定更快。

Kafka Direct VS Receiver

從高層次的角度看,以前的和Kafka集成方案(reciever方法)使用WAL工做方式以下:

1)運行在Spark workers/executors上的Kafka Receivers接二連三地從Kafka中讀取數據,其中用到了Kafka中高層次的消費者API。

2)接收到的數據被存儲在Spark workers/executors中的內存,同時也被寫入到WAL中。只有接收到的數據被持久化到log中,Kafka Receivers纔會去更新Zookeeper中Kafka的偏移量。

3)接收到的數據和WAL存儲位置信息被可靠地存儲,若是期間出現故障,這些信息被用來從錯誤中恢復,並繼續處理數據。

Spark Streaming筆記整理(二):案例、SSC、數據源與自定義Receiver

  • 這個方法能夠保證從Kafka接收的數據不被丟失。可是在失敗的狀況下,有些數據頗有可能會被處理不止一次!這種狀況在一些接收到的數據被可靠地保存到WAL中,可是尚未來得及更新Zookeeper中Kafka偏移量,系統出現故障的狀況下發生。這致使數據出現不一致性:Spark Streaming知道數據被接收,可是Kafka那邊認爲數據尚未被接收,這樣在系統恢復正常時,Kafka會再一次發送這些數據
  • 這種不一致產生的緣由是由於兩個系統沒法對那些已經接收到的數據信息保存進行原子操做。爲了解決這個問題,只須要一個系統來維護那些已經發送或接收的一致性視圖,並且,這個系統須要擁有從失敗中恢復的一切控制權利。基於這些考慮,社區決定將全部的消費偏移量信息只存儲在Spark Streaming中,而且使用Kafka的低層次消費者API來從任意位置恢復數據

爲了構建這個系統,新引入的Direct API採用徹底不一樣於Receivers和WALs的處理方式。它不是啓動一個Receivers來接二連三地從Kafka中接收數據並寫入到WAL中,而是簡單地給出每一個batch區間須要讀取的偏移量位置,最後,每一個batch的Job被運行,那些對應偏移量的數據在Kafka中已經準備好了。這些偏移量信息也被可靠地存儲(checkpoint),在從失敗中恢復

Spark Streaming筆記整理(二):案例、SSC、數據源與自定義Receiver

  • 須要注意的是,Spark Streaming能夠在失敗之後從新從Kafka中讀取並處理那些數據段。然而,因爲僅處理一次的語義,最後從新處理的結果和沒有失敗處理的結果是一致的。
  • 所以,Direct API消除了須要使用WAL和Receivers的狀況,並且確保每一個Kafka記錄僅被接收一次並被高效地接收。這就使得咱們能夠將Spark Streaming和Kafka很好地整合在一塊兒。整體來講,這些特性使得流處理管道擁有高容錯性,高效性,並且很容易地被使用。

案例

測試代碼以下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import kafka.serializer.StringDecoder import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Kafka和SparkStreaming基於Direct的模式集成 * * 在公司中使用Kafka-Direct方式 */ object _04SparkStreamingKafkaDirectOps { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_04SparkStreamingKafkaDirectOps.getClass.getSimpleName) .setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(5)) // ssc.checkpoint("hdfs://ns1/checkpoint/streaming/kafka") // checkpoint文件也是能夠保存到hdfs中的,不過必要性不大了,對於direct的方式來講 val kafkaParams:Map[String, String] = Map("metadata.broker.list"-> "uplooking01:9092,uplooking02:9092,uplooking03:9092") val topics:Set[String] = Set("spark-kafka") val linesDStream:InputDStream[(String, String)] = KafkaUtils. // 參數分別爲:key類型,value類型,key的×××,value的××× createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) val retDStream = linesDStream.map(t => t._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_) retDStream.print() ssc.start() ssc.awaitTermination() ssc.stop() } }

生產數據:

[uplooking@uplooking02 kafka]$ kafka-console-producer.sh --topic spark-kafka --broker-list uplooking01:9092,uplooking02:9092,uplooking03:9092
hello you hello he hello me

輸出結果以下:

------------------------------------------- Time: 1525966750000 ms ------------------------------------------- (hello,3) (me,1) (you,1) (he,1)

自定義Receiver

測試代碼以下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * SparkStreaming自定義Receiver
  * 經過模擬Network來學習自定義Receiver
  *
  * 自定義的步驟:
  *     1.建立一個類繼承一個類或者實現某個接口
  *     2.複寫啓動的個別方法
  *     3.進行註冊調用
  */
object _05SparkStreamingCustomReceiverOps {
    def main(args: Array[String]): Unit = {
        if (args == null || args.length < 2) {
            System.err.println(
                """Parameter Errors! Usage: <hostname> <port> |hostname: 監聽的網絡socket的主機名或ip地址 |port: 監聽的網絡socket的端口 """.stripMargin) System.exit(-1) } Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_05SparkStreamingCustomReceiverOps.getClass.getSimpleName) .setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(5)) val hostname = args(0).trim val port = args(1).trim.toInt val linesDStream:ReceiverInputDStream[String] = ssc.receiverStream[String](new MyNetWorkReceiver(hostname, port)) val retDStream:DStream[(String, Int)] = linesDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_) retDStream.print() ssc.start() ssc.awaitTermination() ssc.stop() } } /** * 自定義receiver */ class MyNetWorkReceiver(storageLevel:StorageLevel) extends Receiver[String](storageLevel) { private var hostname:String = _ private var port:Int = _ def this(hostname:String, port:Int, storageLevel:StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2) { this(storageLevel) this.hostname = hostname this.port = port } /** * 啓動及其初始化receiver資源 */ override def onStart(): Unit = { val thread = new Thread() { override def run(): Unit = { receive() } } thread.setDaemon(true) // 設置成爲後臺線程 thread.start() } // 接收數據的核心api 讀取網絡socket中的數據 def receive(): Unit = { val socket = new Socket(hostname, port) val ins = socket.getInputStream() val br = new BufferedReader(new InputStreamReader(ins)) var line:String = null while((line = br.readLine()) != null) { store(line) } ins.close() socket.close() } override def onStop(): Unit = { } }

啓動nc,並輸入數據:

[uplooking@uplooking01 ~]$ nc -lk 4893
hello you hello he hello me

輸出結果以下:

(hello,3)
(me,1)
(you,1)
(he,1)


原文連接:http://blog.51cto.com/xpleaf/2115102
相關文章
相關標籤/搜索