[spark]Spark Streaming教程

 

(一)官方入門示例

廢話不說,先來個示例,有個感性認識再介紹。java

這個示例來自spark自帶的example,基本步驟以下:apache

(1)使用如下命令輸入流消息:api

$ nc -lk 9999bash

(2)在一個新的終端中運行NetworkWordCount,統計上面的詞語數量並輸出:服務器

$ bin/run-example streaming.NetworkWordCount localhost 9999socket

(3)在第一步建立的輸入流程中敲入一些內容,在第二步建立的終端中會看到統計結果,如:maven

第一個終端輸入的內容:ide

hello world againoop

第二個端口的輸出this

-------------------------------------------
Time: 1436758706000 ms
-------------------------------------------
(again,1)
(hello,1)
(world,1)

簡單解釋一下,上面的示例經過手工敲入內容,並傳給spark streaming統計單詞數量,而後將結果打印出來。

附上代碼:

package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel

/**
 * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
 *
 * Usage: NetworkWordCount
 *  and  describe the TCP server that Spark Streaming would connect to receive data.
 *
 * To run this on your local machine, you need to first run a Netcat server
 *    `$ nc -lk 9999`
 * and then run the example
 *    `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999`
 */
object NetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount  ")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

 

 

(二)Spark Streaming kafka示例

本示例使用java+maven來構建一個wordcount

一、建立項目,在pom.xml添加以下的依賴關係

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.4.0</version>
</dependency>
 
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
</dependency>

 

二、寫代碼,此部分代碼使用了官方的代碼:

package com.netease.gdc.kafkaStreaming;

import java.util.Map;
import java.util.HashMap;
import java.util.regex.Pattern;


import scala.Tuple2;
import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

/**
 * Consumes messages from one or more topics in Kafka and does wordcount.
 *
 * Usage: JavaKafkaWordCount
 * is a list of one or more zookeeper servers that make quorum
 * is the name of kafka consumer group
 * is a list of one or more kafka topics to consume from
 *is the number of threads the kafka consumer should use
 *
 * To run this example:
 *   `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \
 *    zoo03 my-consumer-group topic1,topic2 1`
 */

public final class JavaKafkaWordCount {
  private static final Pattern SPACE = Pattern.compile(" ");

  private JavaKafkaWordCount() {
  }

  public static void main(String[] args) {
    if (args.length < 4) {
      System.err.println("Usage: JavaKafkaWordCount
");
      System.exit(1);
    }

    SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
    // Create the context with a 1 second batch size
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));

    int numThreads = Integer.parseInt(args[3]);
    Map topicMap = new HashMap();
    String[] topics = args[2].split(",");
    for (String topic: topics) {
      topicMap.put(topic, numThreads);
    }

    JavaPairReceiverInputDStream messages =
            KafkaUtils.createStream(jssc, args[0], args[1], topicMap);

    JavaDStream lines = messages.map(new Function() {
      @Override
      public String call(Tuple2 tuple2) {
        return tuple2._2();
      }
    });

    JavaDStream words = lines.flatMap(new FlatMapFunction() {
      @Override
      public Iterable call(String x) {
        return Lists.newArrayList(SPACE.split(x));
      }
    });

    JavaPairDStream wordCounts = words.mapToPair(
      new PairFunction() {
        @Override
        public Tuple2 call(String s) {
          return new Tuple2(s, 1);
        }
      }).reduceByKey(new Function2() {
        @Override
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      });

    wordCounts.print();
    jssc.start();
    jssc.awaitTermination();
  }
}

 

三、上傳到服務器中而後編譯

mvn clean package

四、提交job到spark中

 

/home/hadoop/spark/bin/spark-submit --jars ../mylib/metrics-core-2.2.0.jar,../mylib/zkclient-0.3.jar,../mylib/spark-streaming-kafka_2.10-1.4.0.jar,../mylib/kafka-clients-0.8.2.1.jar,../mylib/kafka_2.10-0.8.2.1.jar  --class com.netease.gdc.kafkaStreaming.JavaKafkaWordCount --master spark://192.168.16.102:7077  target/kafkaStreaming-0.0.1-SNAPSHOT.jar 192.168.172.111:2181/kafka my-consumer-group test 3

 

固然,前提是kafka集羣已經正常運行,且存在test這個topic

 

五、驗證

打開一個console producer,輸入內容,而後觀察wordcount的結果。

結果形式以下:

(hi,1)

  

(三)基本步驟

本部分介紹建立一個spark streaming應用的基本步驟

一、構建依賴關係,以maven爲例,須要在pom.xml中添加如下內容

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.4.0</version>
</dependency>

 若是須要使用其它數據源,則還須要將相應的依賴關係放入pom.xml。

如使用kafka做爲數據源:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.4.0</version>
</dependency>
 

固然,spark的核心包也要包含:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.4.0</version>
</dependency>
相關文章
相關標籤/搜索