這裏先引入一個基本的案例來演示流的建立:獲取指定端口上的數據並進行詞頻統計。項目依賴和代碼實現以下:html
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>2.4.3</version>
</dependency>
複製代碼
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object NetworkWordCount {
def main(args: Array[String]) {
/*指定時間間隔爲 5s*/
val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
/*建立文本輸入流,並進行詞頻統計*/
val lines = ssc.socketTextStream("hadoop001", 9999)
lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).print()
/*啓動服務*/
ssc.start()
/*等待服務結束*/
ssc.awaitTermination()
}
}
複製代碼
使用本地模式啓動 Spark 程序,而後使用 nc -lk 9999
打開端口並輸入測試數據:java
[root@hadoop001 ~]# nc -lk 9999
hello world hello spark hive hive hadoop
storm storm flink azkaban
複製代碼
此時控制檯輸出以下,能夠看到已經接收到數據並按行進行了詞頻統計。git
下面針對示例代碼進行講解:github
Spark Streaming 編程的入口類是 StreamingContext,在建立時候須要指明 sparkConf
和 batchDuration
(批次時間),Spark 流處理本質是將流數據拆分爲一個個批次,而後進行微批處理,batchDuration
就是批次拆分的時間間隔。這個時間能夠根據業務需求和服務器性能進行指定,若是業務要求低延遲而且服務器性能也容許,則這個時間能夠指定得很短。redis
這裏須要注意的是:示例代碼使用的是本地模式,配置爲 local[2]
,這裏不能配置爲 local[1]
。這是由於對於流數據的處理,Spark 必須有一個獨立的 Executor 來接收數據,而後再由其餘的 Executors 來處理,因此爲了保證數據可以被處理,至少要有 2 個 Executors。這裏咱們的程序只有一個數據流,在並行讀取多個數據流的時候,也須要保證有足夠的 Executors 來接收和處理數據。shell
在示例代碼中使用的是 socketTextStream
來建立基於 Socket 的數據流,實際上 Spark 還支持多種數據源,分爲如下兩類:數據庫
在基本數據源中,Spark 支持監聽 HDFS 上指定目錄,當有新文件加入時,會獲取其文件內容做爲輸入流。建立方式以下:apache
// 對於文本文件,指明監聽目錄便可
streamingContext.textFileStream(dataDirectory)
// 對於其餘文件,須要指明目錄,以及鍵的類型、值的類型、和輸入格式
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
複製代碼
被監聽的目錄能夠是具體目錄,如 hdfs://host:8040/logs/
;也可使用通配符,如 hdfs://host:8040/logs/2017/*
。編程
關於高級數據源的整合單獨整理至:Spark Streaming 整合 Flume 和 Spark Streaming 整合 Kafkabash
在示例代碼中,使用 streamingContext.start()
表明啓動服務,此時還要使用 streamingContext.awaitTermination()
使服務處於等待和可用的狀態,直到發生異常或者手動使用 streamingContext.stop()
進行終止。
DStream 是 Spark Streaming 提供的基本抽象。它表示連續的數據流。在內部,DStream 由一系列連續的 RDD 表示。因此從本質上而言,應用於 DStream 的任何操做都會轉換爲底層 RDD 上的操做。例如,在示例代碼中 flatMap 算子的操做其實是做用在每一個 RDDs 上 (以下圖)。由於這個緣由,因此 DStream 可以支持 RDD 大部分的transformation算子。
除了可以支持 RDD 的算子外,DStream 還有部分獨有的transformation算子,這當中比較經常使用的是 updateStateByKey
。文章開頭的詞頻統計程序,只能統計每一次輸入文本中單詞出現的數量,想要統計全部歷史輸入中單詞出現的數量,可使用 updateStateByKey
算子。代碼以下:
object NetworkWordCountV2 {
def main(args: Array[String]) {
/* * 本地測試時最好指定 hadoop 用戶名,不然會默認使用本地電腦的用戶名, * 此時在 HDFS 上建立目錄時可能會拋出權限不足的異常 */
System.setProperty("HADOOP_USER_NAME", "root")
val sparkConf = new SparkConf().setAppName("NetworkWordCountV2").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
/*必需要設置檢查點*/
ssc.checkpoint("hdfs://hadoop001:8020/spark-streaming")
val lines = ssc.socketTextStream("hadoop001", 9999)
lines.flatMap(_.split(" ")).map(x => (x, 1))
.updateStateByKey[Int](updateFunction _) //updateStateByKey 算子
.print()
ssc.start()
ssc.awaitTermination()
}
/** * 累計求和 * * @param currentValues 當前的數據 * @param preValues 以前的數據 * @return 相加後的數據 */
def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
val current = currentValues.sum
val pre = preValues.getOrElse(0)
Some(current + pre)
}
}
複製代碼
使用 updateStateByKey
算子,你必須使用 ssc.checkpoint()
設置檢查點,這樣當使用 updateStateByKey
算子時,它會去檢查點中取出上一次保存的信息,並使用自定義的 updateFunction
函數將上一次的數據和本次數據進行相加,而後返回。
在監聽端口輸入以下測試數據:
[root@hadoop001 ~]# nc -lk 9999
hello world hello spark hive hive hadoop
storm storm flink azkaban
hello world hello spark hive hive hadoop
storm storm flink azkaban
複製代碼
此時控制檯輸出以下,全部輸入都被進行了詞頻累計:
# 保存檢查點信息
19/05/27 16:21:05 INFO CheckpointWriter: Saving checkpoint for time 1558945265000 ms
to file 'hdfs://hadoop001:8020/spark-streaming/checkpoint-1558945265000'
# 刪除已經無用的檢查點信息
19/05/27 16:21:30 INFO CheckpointWriter:
Deleting hdfs://hadoop001:8020/spark-streaming/checkpoint-1558945265000
複製代碼
Spark Streaming 支持如下輸出操做:
Output Operation | Meaning |
---|---|
print() | 在運行流應用程序的 driver 節點上打印 DStream 中每一個批次的前十個元素。用於開發調試。 |
saveAsTextFiles(prefix, [suffix]) | 將 DStream 的內容保存爲文本文件。每一個批處理間隔的文件名基於前綴和後綴生成:「prefix-TIME_IN_MS [.suffix]」。 |
saveAsObjectFiles(prefix, [suffix]) | 將 DStream 的內容序列化爲 Java 對象,並保存到 SequenceFiles。每一個批處理間隔的文件名基於前綴和後綴生成:「prefix-TIME_IN_MS [.suffix]」。 |
saveAsHadoopFiles(prefix, [suffix]) | 將 DStream 的內容保存爲 Hadoop 文件。每一個批處理間隔的文件名基於前綴和後綴生成:「prefix-TIME_IN_MS [.suffix]」。 |
foreachRDD(func) | 最通用的輸出方式,它將函數 func 應用於從流生成的每一個 RDD。此函數應將每一個 RDD 中的數據推送到外部系統,例如將 RDD 保存到文件,或經過網絡將其寫入數據庫。 |
前面的四個 API 都是直接調用便可,下面主要講解通用的輸出方式 foreachRDD(func)
,經過該 API 你能夠將數據保存到任何你須要的數據源。
這裏咱們使用 Redis 做爲客戶端,對文章開頭示例程序進行改變,把每一次詞頻統計的結果寫入到 Redis,並利用 Redis 的 HINCRBY
命令來進行詞頻統計。這裏須要導入 Jedis 依賴:
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
複製代碼
具體實現代碼以下:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis
object NetworkWordCountToRedis {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("NetworkWordCountToRedis").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
/*建立文本輸入流,並進行詞頻統計*/
val lines = ssc.socketTextStream("hadoop001", 9999)
val pairs: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _)
/*保存數據到 Redis*/
pairs.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
var jedis: Jedis = null
try {
jedis = JedisPoolUtil.getConnection
partitionOfRecords.foreach(record => jedis.hincrBy("wordCount", record._1, record._2))
} catch {
case ex: Exception =>
ex.printStackTrace()
} finally {
if (jedis != null) jedis.close()
}
}
}
ssc.start()
ssc.awaitTermination()
}
}
複製代碼
其中 JedisPoolUtil
的代碼以下:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class JedisPoolUtil {
/* 聲明爲 volatile 防止指令重排序 */
private static volatile JedisPool jedisPool = null;
private static final String HOST = "localhost";
private static final int PORT = 6379;
/* 雙重檢查鎖實現懶漢式單例 */
public static Jedis getConnection() {
if (jedisPool == null) {
synchronized (JedisPoolUtil.class) {
if (jedisPool == null) {
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(30);
config.setMaxIdle(10);
jedisPool = new JedisPool(config, HOST, PORT);
}
}
}
return jedisPool.getResource();
}
}
複製代碼
這裏將上面保存到 Redis 的代碼單獨抽取出來,並去除異常判斷的部分。精簡後的代碼以下:
pairs.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val jedis = JedisPoolUtil.getConnection
partitionOfRecords.foreach(record => jedis.hincrBy("wordCount", record._1, record._2))
jedis.close()
}
}
複製代碼
這裏能夠看到一共使用了三次循環,分別是循環 RDD,循環分區,循環每條記錄,上面咱們的代碼是在循環分區的時候獲取鏈接,也就是爲每個分區獲取一個鏈接。可是這裏你們可能會有疑問:爲何不在循環 RDD 的時候,爲每個 RDD 獲取一個鏈接,這樣所須要的鏈接數會更少。實際上這是不可行的,若是按照這種狀況進行改寫,以下:
pairs.foreachRDD { rdd =>
val jedis = JedisPoolUtil.getConnection
rdd.foreachPartition { partitionOfRecords =>
partitionOfRecords.foreach(record => jedis.hincrBy("wordCount", record._1, record._2))
}
jedis.close()
}
複製代碼
此時在執行時候就會拋出 Caused by: java.io.NotSerializableException: redis.clients.jedis.Jedis
,這是由於在實際計算時,Spark 會將對 RDD 操做分解爲多個 Task,Task 運行在具體的 Worker Node 上。在執行以前,Spark 會對任務進行閉包,以後閉包被序列化併發送給每一個 Executor,而 Jedis
顯然是不能被序列化的,因此會拋出異常。
第二個須要注意的是 ConnectionPool 最好是一個靜態,惰性初始化鏈接池 。這是由於 Spark 的轉換操做自己就是惰性的,且沒有數據流時不會觸發寫出操做,因此出於性能考慮,鏈接池應該是惰性的,所以上面 JedisPool
在初始化時採用了懶漢式單例進行惰性初始化。
在監聽端口輸入以下測試數據:
[root@hadoop001 ~]# nc -lk 9999
hello world hello spark hive hive hadoop
storm storm flink azkaban
hello world hello spark hive hive hadoop
storm storm flink azkaban
複製代碼
使用 Redis Manager 查看寫入結果 (以下圖),能夠看到與使用 updateStateByKey
算子獲得的計算結果相同。
本片文章全部源碼見本倉庫:spark-streaming-basis
Spark 官方文檔:spark.apache.org/docs/latest…
更多大數據系列文章能夠參見 GitHub 開源項目: 大數據入門指南