其餘更多java基礎文章:
java基礎學習(目錄)java
SparkStreaming是流式處理框架,是Spark API的擴展,支持可擴展、高吞吐量、容錯的實時數據流處理,實時數據的來源能夠是:Kafka, Flume, Twitter, ZeroMQ或者TCP sockets,而且可使用高級功能的複雜算子來處理流數據。例如:map,reduce,join,window 。最終,處理後的數據能夠存放在文件系統,數據庫等,方便實時展示。node
Spark Streaming是將流式計算分解成一系列短小的批處理做業。這裏的批處理引擎是Spark Core,也就是把Spark Streaming的輸入數據按照batch interval(如5秒)分紅一段一段的數據(Discretized Stream),每一段數據都轉換成Spark中的RDD(Resilient Distributed Dataset),而後將Spark Streaming中對DStream的Transformation操做變爲針對Spark中對RDD的Transformation操做,將RDD通過操做變成中間結果保存在內存中。整個流式計算根據業務的需求能夠對中間的結果進行疊加或者存儲到外部設備數據庫
DStream(Discretized Stream)做爲Spark Streaming的基礎抽象,它表明持續性的數據流。這些數據流既能夠經過外部輸入源賴獲取,也能夠經過現有的Dstream的transformation操做來得到。在內部實現上,DStream由一組時間序列上連續的RDD來表示。每一個RDD都包含了本身特定時間間隔內的數據流。bash
SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount")
.set("spark.testing.memory","2147480000");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("master", 9999);
複製代碼
Transformation | 含義 |
---|---|
map(func) | 對DStream中的各個元素進行func函數操做,而後返回一個新的DStream |
flatMap(func) | 與map方法相似,只不過各個輸入項能夠被輸出爲零個或多個輸出項 |
filter(func) | 過濾出全部函數func返回值爲true的DStream元素並返回一個新的DStream |
repartition(numPartitions) | 增長或減小DStream中的分區數,從而改變DStream的並行度 |
union(otherStream) | 將源DStream和輸入參數爲otherDStream的元素合併,並返回一個新的DStream. |
count() | 經過對DStream中的各個RDD中的元素進行計數,而後返回只有一個元素的RDD構成的DStream |
reduce(func) | 對源DStream中的各個RDD中的元素利用func進行聚合操做,而後返回只有一個元素的RDD構成的新的DStream. |
countByValue() | 對於元素類型爲K的DStream,返回一個元素爲(K,Long)鍵值對形式的新的DStream,Long對應的值爲源DStrea |
reduceByKey(func, [numTasks]) | 利用func函數對源DStream中的key進行聚合操做,而後返回新的(K,V)對構成的DStream |
join(otherStream, [numTasks]) | 輸入爲(K,V)、(K,W)類型的DStream,返回一個新的(K,(V,W)類型的DStream |
cogroup(otherStream, [numTasks]) | 輸入爲(K,V)、(K,W)類型的DStream,返回一個新的 (K, Seq[V], Seq[W]) 元組類型的DStream |
transform(func) | 經過RDD-to-RDD函數做用於DStream中的各個RDD,能夠是任意的RDD操做,從而返回一個新的RDD |
updateStateByKey(func) | 根據於key的前置狀態和key的新值,對key進行更新,返回一個新狀態的DStream |
/**
* batch interval:5s
* sliding interval:10s
* window length: 60s
* 因此每隔 10s 會取 12 個 rdd,在計算的時候會將這 12 個 rdd 聚合起來
* 而後一塊兒執行 reduceByKeyAndWindow 操做
* reduceByKeyAndWindow 是針對窗口操做的而不是針對 DStream 操做的
*/
JavaPairDStream<String, Integer> searchWordCountsDStream =
searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer,
Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
}, Durations.seconds(60), Durations.seconds(10));
複製代碼
//必須設置 checkpoint 目錄
jssc.checkpoint("hdfs://node01:8020/spark/checkpoint");
JavaPairDStream<String, Integer> searchWordCountsDStream =
searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer,
Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
},new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 - v2;
}
}, Durations.seconds(60), Durations.seconds(10));
複製代碼
spark-submit –supervise
複製代碼
以集羣方式提交到 yarn 上時, Driver 掛掉會自動重啓,不須要任何設置
提交任務,在客戶端啓動 Driver,那麼不論是提交到 standalone 仍是 yarn, Driver 掛掉後 都沒法重啓網絡
上面的方式從新啓動的 Driver 須要從新讀取 application 的信息而後進行任務調度,實 際需求是,新啓動的 Driver 能夠直接恢復到上一個 Driver 的狀態(能夠直接讀取上一個 StreamingContext 的 DSstream 操做邏輯和 job 執行進度,因此須要把上一個 StreamingContext 的元數據保存到 HDFS 上) ,直接進行任務調度,這就須要在代碼層面進 行配置。架構
public class SparkStreamingOnHDFS {
public static void main(String[] args) {
final SparkConf conf = new SparkConf()
.setMaster("local[1]")
.setAppName("SparkStreamingOnHDFS");
//這裏能夠設置一個線程,由於不須要一個專門接收數據的線程,而是監控一個目錄
final String checkpointDirectory = "hdfs://node01:9000/spark/checkpoint";
JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
@Override
public JavaStreamingContext create() {
return createContext(checkpointDirectory,conf);
}
};
JavaStreamingContext jsc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);
jsc.start();
jsc.awaitTermination();
// jsc.close();
}
@SuppressWarnings("deprecation")
private static JavaStreamingContext createContext(String checkpointDirectory,SparkConf conf) {
System.out.println("Creating new context");
SparkConf sparkConf = conf;
//每隔 15s 查看一下監控的目錄中是否新增了文件
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(15));
ssc.checkpoint(checkpointDirectory);
/**
* 只是監控文件夾下新增的文件,減小的文件是監控不到的,
文件內容有改動也是監控不到
*/
JavaDStream<String> lines = ssc.textFileStream("hdfs://node01:8020/spark");
/**
* 接下來能夠寫業務邏輯,好比 wordcount
*/
return ssc;
}
}
複製代碼
執行一次程序後, JavaStreamingContext 會在 checkpointDirectory 中保存,當修 改了業務邏輯後,再次運行程序, JavaStreamingContext.getOrCreate(checkpointDirectory, factory); 由於 checkpointDirectory 中有這個 application 的 JavaStreamingContext,因此不會 調用 JavaStreamingContextFactory 來建立 JavaStreamingContext,而是直接 checkpointDirectory 中的 JavaStreamingContext,因此即便業務邏輯改變了,執行的效 果也是以前的業務邏輯, 若是須要執行修改過的業務邏輯,能夠修改或刪除 checkpointDirectory併發
SparkConf conf = new SparkConf()
.setAppName("SparkStreamingOnKafkaReceiver")
.setMaster("local[2]")
.set("spark.streaming.receiver.writeAheadLog.enable","true");
JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5));
//設置持久化數據的目錄
jsc.checkpoint("hdfs://node01:8020/spark/checkpoint");
Map<String, Integer> topicConsumerConcurrency = new HashMap<String,Integer>();
//topic 名 receiver task 數量
topicConsumerConcurrency.put("test_create_topic", 1);
JavaPairReceiverInputDStream<String,String> lines =
KafkaUtils.createStream(
jsc,
"node02:2181,node03:2181,node04:2181",
"MyFirstConsumerGroup",
topicConsumerConcurrency,
StorageLevel.MEMORY_AND_DISK_SER());
/*
* 第一個參數是 StreamingContext
* 第二個參數是 ZooKeeper 集羣信息(接受 Kafka 數據的時候會從 Zookeeper 中得到
Offset 等元數據信息)
* 第三個參數是 Consumer Group
* 第四個參數是消費的 Topic 以及併發讀取 Topic 中 Partition 的線程數
* 第五個參數是持久化數據的級別,能夠自定義
*/
//對 lines 進行其餘操做……
複製代碼
kafka 客戶端生產數據的代碼:app
public class SparkStreamingDataManuallyProducerForKafka extends Thread {
private String topic; //發送給 Kafka 的數據的類別
private Producer<Integer, String> producerForKafka;
public SparkStreamingDataManuallyProducerForKafka(String topic){
this.topic = topic;
Properties conf = new Properties();
conf.put("metadata.broker.list","node01:9092,node02:9092,node03:9092");
conf.put("serializer.class", StringEncoder.class.getName());
producerForKafka = new Producer<Integer, String>(new ProducerConfig(conf)) ;
}
@Override
public void run() {
while(true){
counter ++;
String userLog = createUserLog();
//生產數據這個方法能夠根據實際需求本身編寫
producerForKafka.send(new KeyedMessage<Integer, String>(topic, userLog));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
new SparkStreamingDataManuallyProducerForKafka("test_create_topic").start();
//test_create_topic 是 topic 名
}
}
複製代碼
把 kafka 看成一個存儲系統,直接從 kafka 中讀數據, SparkStreaming 本身維護消費者 的消費偏移量框架
SparkConf conf = new SparkConf()
.setAppName("SparkStreamingOnKafkaDirected")
.setMaster("local[1]");
JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(10));
Map<String, String> kafkaParameters = new HashMap<String, String>();
kafkaParameters.put("metadata.broker.list","node01:9092,node02:9092,node03:9092");
HashSet<String> topics = new HashSet<String>();
topics.add("test_create_topic");
JavaPairInputDStream<String,String> lines =KafkaUtils.createDirectStream(jsc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParameters,
topics);
//對 lines 進行其餘操做……
複製代碼
在實際的應用中,Direct Approach方式很好地知足了咱們的須要,與Receiver-based方式相比,有如下幾方面的優點:異步
可是也存在一些不足,具體以下:
Receiver 方式調整 SparkStreaming 的並行度的方法:
其餘配置:
1. 並行化數據接收:處理多個topic的數據時比較有效
int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(numStreams);
for (int i = 0; i < numStreams; i++) {
kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();
複製代碼
2. spark.streaming.blockInterval:增長block數量,增長每一個batch rdd的partition數量,增長處理並行度
receiver從數據源源源不斷地獲取到數據;首先是會按照block interval,將指定時間間隔的數據,收集爲一個block;默認時間是200ms,官方推薦不要小於50ms;接着呢,會將指定batch interval時間間隔內的block,合併爲一個batch;建立爲一個rdd,而後啓動一個job,去處理這個batch rdd中的數據
batch rdd,它的partition數量是多少呢?一個batch有多少個block,就有多少個partition;就意味着並行度是多少;就意味着每一個batch rdd有多少個task會並行計算和處理。
固然是但願能夠比默認的task數量和並行度再多一些了;能夠手動調節block interval;減小block interval;每一個batch能夠包含更多的block;有更多的partition;也就有更多的task並行處理每一個batch rdd。
3. inputStream.repartition():重分區,增長每一個batch rdd的partition數量
有些時候,但願對某些dstream中的rdd進行定製化的分區 對dstream中的rdd進行重分區,去重分區成指定數量的分區,這樣也能夠提升指定dstream的rdd的計算並行度
4. 調節並行度
spark.default.parallelism
reduceByKey(numPartitions)
複製代碼
5. 使用Kryo序列化機制:
spark streaming,也是有很多序列化的場景的 提升序列化task發送到executor上執行的性能,若是task不少的時候,task序列化和反序列化的性能開銷也比較可觀 默認輸入數據的存儲級別是StorageLevel.MEMORY_AND_DISK_SER_2,receiver接收到數據,默認就會進行持久化操做;首先序列化數據,存儲到內存中;若是內存資源不夠大,那麼就寫入磁盤;並且,還會寫一份冗餘副本到其餘executor的block manager中,進行數據冗餘。
6. batch interval:每一個的處理時間必須小於batch interval
實際上你的spark streaming跑起來之後,其實都是能夠在spark ui上觀察它的運行狀況的;能夠看到batch的處理時間; 若是發現batch的處理時間大於batch interval,就必須調節batch interval 儘可能不要讓batch處理時間大於batch interval 好比你的batch每隔5秒生成一次;你的batch處理時間要達到6秒;就會出現,batch在你的內存中日積月累,一直囤積着,無法及時計算掉,釋放內存空間;並且對內存空間的佔用愈來愈大,那麼此時會致使內存空間快速消耗
若是發現batch處理時間比batch interval要大,就儘可能將batch interval調節大一些