在頁面上每次點贊,把這個被點讚的文章id發送到kafka,而後經過spark streaming讀取kafka裏的數據,統計出點讚的數量,更新回mysql中 html
完整案例代碼已上傳github:github.com/neatlife/my…java
能夠在https://start.spring.io上建立項目 mysql
這裏加上web-starter, kafka-starter便可,spark和spark kafka streaming相關以來須要手動添加,並無對應的starter可用git
而後在pom.xml中引入kafka的客戶端和spark streaming的客戶端github
<!--spark相關依賴-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.2</version>
</dependency>
複製代碼
上手kafka能夠參考: juejin.im/post/5d159d…web
建立kafka的topic,好比這裏使用mylikes這個topicspring
kafka-topics --create --topic mylikes --replication-factor 1 --partitions 1 --zookeeper zoo1:2181
複製代碼
操做效果以下sql
新增一個點贊接口,核心代碼以下shell
@RequestMapping("/send-like")
public String sendLike(@RequestParam(value = "post_id", required = true) Integer postId) {
producer.send(postId);
return "test1";
}
複製代碼
kafka發送核心代碼以下數據庫
public void send(Integer postId) {
ProducerRecord producerRecord = new ProducerRecord(topic, postId.toString(), "1");
this.kafkaTemplate.send(producerRecord);
System.out.println("Sent sample postId [" + postId + "] to " + topic);
}
複製代碼
記下這裏使用的kafka的topic:mylikes,在spark裏也須要使用這個topic 這裏注意發送到kafka的key和value都是字符串,id和點贊數是int,因此在spark進行處理時須要作這個類型轉換
SparkConf conf = new SparkConf()
.setAppName("mySparkLikes")
.setMaster("local[*]")
.set("spark.default.parallelism", "15")
.set("spark.streaming.concurrentJobs", "5")
.set("spark.executor.memory", "1G")
.set("spark.cores.max", "3")
.set("spark.local.dir", "/tmp/mySparkLikes")
.set("spark.streaming.kafka.maxRatePerPartition", "5");
Set<String> topics = Collections.singleton(topic);
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "127.0.0.1:9092");
JavaStreamingContext jsc = new JavaStreamingContext(
new JavaSparkContext(conf),
Durations.seconds(3));
jsc.checkpoint("checkpoint");
複製代碼
// 獲得數據流
final JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
jsc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topics
);
System.out.println("stream started!");
stream.print();
複製代碼
stream.print()
觸發讀取數據
JavaPairDStream<Integer, Integer> countDStream = stream
.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<Integer, Integer>>() {
@Override
public JavaPairRDD<Integer, Integer> call(JavaPairRDD<String, String> stringStringJavaPairRDD) throws Exception {
return stringStringJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, String>, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<String, String> stringStringTuple2) throws Exception {
return new Tuple2<>(new Integer(stringStringTuple2._1), new Integer(stringStringTuple2._2));
}
});
}
})
.reduceByKey(Integer::sum);
複製代碼
countDStream.foreachRDD(v -> {
v.foreach(record -> {
String sql = String.format("UPDATE `post` SET likes = likes + %s WHERE id=%d", record._2, record._1);
System.out.println(sql);
});
log.info("一批次數據流處理完: {}", v);
});
複製代碼
jsc.start();
複製代碼
添加一個接口來調用上面的代碼
@RequestMapping("/launch")
public String launch() {
sparkLikeService.launch();
return "test2";
}
複製代碼
先訪問/launch來啓動流計算引擎,而後訪問send-like接口生成點贊數據,查看控制檯生成的sql語句,操做效果以下
能夠看到已經拿到了點贊數的sql,可使用jpa把這個點贊數據存放到數據庫中了
這個spark的job能夠本地調試,可是須要知足如下幾個條件