[Spark Streaming_1] Spark Streaming 概述


 

0. 說明

  Spark Streaming 介紹 && 在 IDEA 中編寫 Spark Streaming 程序java

 


 

1. Spark Streaming 介紹


  Spark Streaming 是 Spark Core API 的擴展,針對實時數據流計算,具備可伸縮性、高吞吐量、自動容錯機制的特色。mysql

  數據源能夠來自於多種方式,例如 Kafka、Flume 等等。sql

  使用相似於 RDD 的高級算子進行復雜計算,像 map 、reduce 、join 和 window 等等。數據庫

  最後,處理的數據推送到數據庫、文件系統或者儀表盤等。也能夠對流計算應用機器學習和圖計算。apache

  

   在內部,Spark Streaming 接收實時數據流,而後切割成一個個批次,而後經過 Spark 引擎生成 result 的數據流。json

  

   Spark Streaming 提供了稱爲離散流(DStream-discretized stream)的高級抽象,表明了連續的數據流。離散流經過 Kafka、 Flume 等源建立,也能夠經過高級操做像 map、filter 等變換獲得,相似於 RDD 的行爲。內部,離散流表現爲連續的 RDD。api

 


 2. 在 IDEA 中編寫 Spark Streaming 程序(Scala)

  【2.1 添加依賴】

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.share</groupId>
    <artifactId>myspark</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <spark.version>2.1.0</spark.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.17</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>
    
</project>

 

  【2.2 編寫代碼】服務器

 

package com.share.sparkstreaming.scala

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Spark Streaming 的 Scala 版 Word Count 程序
  */
object SparkStreamingScala1 {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
    conf.setAppName("Streaming")
    // 至少2 以上
    conf.setMaster("local[2]")

    // 建立 Spark Streaming Context ,間隔 1 s
    val sc = new StreamingContext(conf , Seconds(1))

    // 對接 socket 文本流
    val lines = sc.socketTextStream("s101", 8888)
    val words = lines.flatMap(_.split(" "))
    val pair = words.map((_,1))
    val rdd = pair.reduceByKey(_+_)

    // 打印結果
    rdd.print()

    // 啓動上下文
    sc.start()

    // 等待中止
    sc.awaitTermination()
  }
}

 

 

 

 

  【2.3 修改 Log4j 日誌輸出級別】機器學習

  

 

  【2.4 啓動服務器 s101 的 nc】socket

  nc -lk 8888

 

   【2.5 運行程序並驗證】

  略

 


 

3. 在 IDEA 中編寫 Spark Streaming 程序(Java)

 

package com.share.sparkstreaming.java;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;

/**
 * Spark Streaming 的 Scala 版 Word Count 程序
 */
public class WordCountStreamingJava1 {
    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf();
        conf.setAppName("Streaming");
        conf.setMaster("local[*]");

        // 建立 Spark Streaming Context ,間隔 2 s
        JavaStreamingContext sc = new JavaStreamingContext(conf, Durations.seconds(2));
        // 對接 socket 文本流
        JavaDStream<String> ds1 = sc.socketTextStream("s101", 8888);

        // 壓扁
        JavaDStream<String> ds2 = ds1.flatMap(new FlatMapFunction<String, String>() {
            public Iterator<String> call(String s) {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });
        // 變換成對
        JavaPairDStream<String, Integer> ds3 = ds2.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        });
        // 聚合
        JavaPairDStream<String, Integer> ds4 = ds3.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        
        // 打印結果
        ds4.print();
        // 啓動上下文
        sc.start();
        // 等待中止
        sc.awaitTermination();
    }
}
相關文章
相關標籤/搜索