在spring boot三分鐘上手無界流處理系統Spark Streaming,並實現流式點贊統計

在頁面上每次點贊,把這個被點讚的文章id發送到kafka,而後經過spark streaming讀取kafka裏的數據,統計出點讚的數量,更新回mysql中 html

完整案例代碼已上傳github:github.com/neatlife/my…java

獲取案例項目

能夠在https://start.spring.io上建立項目 mysql

這裏加上web-starter, kafka-starter便可,spark和spark kafka streaming相關以來須要手動添加,並無對應的starter可用git

而後在pom.xml中引入kafka的客戶端和spark streaming的客戶端github

<!--spark相關依賴-->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.6.2</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.10</artifactId>
    <version>1.6.2</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.10</artifactId>
    <version>1.6.2</version>
</dependency>
複製代碼

把點讚的postId發到kafka

上手kafka能夠參考: juejin.im/post/5d159d…web

建立kafka的topic,好比這裏使用mylikes這個topicspring

kafka-topics --create --topic mylikes --replication-factor 1 --partitions 1 --zookeeper zoo1:2181
複製代碼

操做效果以下sql

新增一個點贊接口,核心代碼以下shell

@RequestMapping("/send-like")
public String sendLike(@RequestParam(value = "post_id", required = true) Integer postId) {
    producer.send(postId);
    return "test1";
}
複製代碼

kafka發送核心代碼以下數據庫

public void send(Integer postId) {
        ProducerRecord producerRecord = new ProducerRecord(topic, postId.toString(), "1");
        this.kafkaTemplate.send(producerRecord);
        System.out.println("Sent sample postId [" + postId + "] to " + topic);
}
複製代碼

記下這裏使用的kafka的topic:mylikes,在spark裏也須要使用這個topic 這裏注意發送到kafka的key和value都是字符串,id和點贊數是int,因此在spark進行處理時須要作這個類型轉換

在spark中從kafka中讀取數據計算點贊量

建立從kafka中讀取數據的spark客戶端

SparkConf conf = new SparkConf()
        .setAppName("mySparkLikes")
        .setMaster("local[*]")
        .set("spark.default.parallelism", "15")
        .set("spark.streaming.concurrentJobs", "5")
        .set("spark.executor.memory", "1G")
        .set("spark.cores.max", "3")
        .set("spark.local.dir", "/tmp/mySparkLikes")
        .set("spark.streaming.kafka.maxRatePerPartition", "5");

Set<String> topics = Collections.singleton(topic);

Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "127.0.0.1:9092");

JavaStreamingContext jsc = new JavaStreamingContext(
        new JavaSparkContext(conf),
        Durations.seconds(3));
jsc.checkpoint("checkpoint");
複製代碼

建立kafka數據流

// 獲得數據流
final JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
        jsc,
        String.class,
        String.class,
        StringDecoder.class,
        StringDecoder.class,
        kafkaParams,
        topics
);
System.out.println("stream started!");
stream.print();
複製代碼

stream.print() 觸發讀取數據

將kafka裏的字符串類型的postId和點贊次數轉成整數類型

JavaPairDStream<Integer, Integer> countDStream = stream
        .transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<Integer, Integer>>() {

            @Override
            public JavaPairRDD<Integer, Integer> call(JavaPairRDD<String, String> stringStringJavaPairRDD) throws Exception {
                return stringStringJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, String>, Integer, Integer>() {

                    @Override
                    public Tuple2<Integer, Integer> call(Tuple2<String, String> stringStringTuple2) throws Exception {
                        return new Tuple2<>(new Integer(stringStringTuple2._1), new Integer(stringStringTuple2._2));
                    }
                });
            }
        })
        .reduceByKey(Integer::sum);
複製代碼

生成點贊次數的sql語句

countDStream.foreachRDD(v -> {
    v.foreach(record -> {
        String sql = String.format("UPDATE `post` SET likes = likes + %s WHERE id=%d", record._2, record._1);
        System.out.println(sql);
    });
    log.info("一批次數據流處理完: {}", v);
});
複製代碼

啓動流計算

jsc.start();
複製代碼

添加一個接口來調用上面的代碼

@RequestMapping("/launch")
public String launch() {
    sparkLikeService.launch();
    return "test2";
}
複製代碼

先訪問/launch來啓動流計算引擎,而後訪問send-like接口生成點贊數據,查看控制檯生成的sql語句,操做效果以下

能夠看到已經拿到了點贊數的sql,可使用jpa把這個點贊數據存放到數據庫中了

本地調試

這個spark的job能夠本地調試,可是須要知足如下幾個條件

  1. spark須要在本地啓動
  2. job鏈接master的地址須要是:local[*]

參考資料

  1. www.iteblog.com/archives/13…
  2. github.com/eBay/Spark/…
  3. blog.csdn.net/guotong1988…
  4. www.4spaces.org/spark-map-f…
  5. blog.csdn.net/wuxintdrh/a…
相關文章
相關標籤/搜索