Spark Streaming 介紹 && 在 IDEA 中編寫 Spark Streaming 程序java
Spark Streaming 是 Spark Core API 的擴展,針對實時數據流計算,具備可伸縮性、高吞吐量、自動容錯機制的特色。mysql
數據源能夠來自於多種方式,例如 Kafka、Flume 等等。sql
使用相似於 RDD 的高級算子進行復雜計算,像 map 、reduce 、join 和 window 等等。數據庫
最後,處理的數據推送到數據庫、文件系統或者儀表盤等。也能夠對流計算應用機器學習和圖計算。apache
在內部,Spark Streaming 接收實時數據流,而後切割成一個個批次,而後經過 Spark 引擎生成 result 的數據流。json
Spark Streaming 提供了稱爲離散流(DStream-discretized stream)的高級抽象,表明了連續的數據流。離散流經過 Kafka、 Flume 等源建立,也能夠經過高級操做像 map、filter 等變換獲得,相似於 RDD 的行爲。內部,離散流表現爲連續的 RDD。api
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.share</groupId> <artifactId>myspark</artifactId> <version>1.0-SNAPSHOT</version> <properties> <spark.version>2.1.0</spark.version> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.17</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> </dependencies> </project>
【2.2 編寫代碼】服務器
package com.share.sparkstreaming.scala import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Spark Streaming 的 Scala 版 Word Count 程序 */ object SparkStreamingScala1 { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("Streaming") // 至少2 以上 conf.setMaster("local[2]") // 建立 Spark Streaming Context ,間隔 1 s val sc = new StreamingContext(conf , Seconds(1)) // 對接 socket 文本流 val lines = sc.socketTextStream("s101", 8888) val words = lines.flatMap(_.split(" ")) val pair = words.map((_,1)) val rdd = pair.reduceByKey(_+_) // 打印結果 rdd.print() // 啓動上下文 sc.start() // 等待中止 sc.awaitTermination() } }
【2.3 修改 Log4j 日誌輸出級別】機器學習
【2.4 啓動服務器 s101 的 nc】socket
nc -lk 8888
【2.5 運行程序並驗證】
略
package com.share.sparkstreaming.java; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; /** * Spark Streaming 的 Scala 版 Word Count 程序 */ public class WordCountStreamingJava1 { public static void main(String[] args) throws InterruptedException { SparkConf conf = new SparkConf(); conf.setAppName("Streaming"); conf.setMaster("local[*]"); // 建立 Spark Streaming Context ,間隔 2 s JavaStreamingContext sc = new JavaStreamingContext(conf, Durations.seconds(2)); // 對接 socket 文本流 JavaDStream<String> ds1 = sc.socketTextStream("s101", 8888); // 壓扁 JavaDStream<String> ds2 = ds1.flatMap(new FlatMapFunction<String, String>() { public Iterator<String> call(String s) { return Arrays.asList(s.split(" ")).iterator(); } }); // 變換成對 JavaPairDStream<String, Integer> ds3 = ds2.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); // 聚合 JavaPairDStream<String, Integer> ds4 = ds3.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); // 打印結果 ds4.print(); // 啓動上下文 sc.start(); // 等待中止 sc.awaitTermination(); } }