Flink+kafka實現Wordcount實時計算

Flink介紹:

Flink 是一個針對流數據和批數據的分佈式處理引擎。它主要是由 Java 代碼實現。目前主要仍是依靠開源社區的貢獻而發展。對 Flink 而言,其所要處理的主要場景就是流數據,批數據只是流數據的一個極限特例而已。再換句話說,Flink 會把全部任務當成流來處理,這也是其最大的特色。Flink 能夠支持本地的快速迭代,以及一些環形的迭代任務。javascript

Flink的特性:

Flink是個分佈式流處理開源框架:
1>. 即便數據源是無序的或者晚到達的數據,也能保持結果準確性
2>. 有狀態而且容錯,能夠無縫的從失敗中恢復,並能夠保持exactly-once
3>. 大規模分佈式
4>. 實時計算場景的普遍應用(阿里雙十一實時交易額使用的Blink就是根據Flink改造而來)java

Flink能夠確保僅一次語義狀態計算;Flink有狀態意味着,程序能夠保持已經處理過的數據;
Flink支持流處理和窗口事件時間語義,Flink支持靈活的基於時間窗口,計數,或會話數據驅動的窗戶;
Flink容錯是輕量級和在同一時間容許系統維持高吞吐率和提供僅一次的一致性保證,Flink從失敗中恢復,零數據丟失;
Flink可以高吞吐量和低延遲;
Flink保存點提供版本控制機制,從而可以更新應用程序或再加工歷史數據沒有丟失並在最小的停機時間。apache

2. Kafka

Kafka介紹

Kafka是由Apache軟件基金會開發的一個開源流處理平臺,由Scala和Java編寫。Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,它能夠處理消費者規模的網站中的全部動做流數據。 這種動做(網頁瀏覽,搜索和其餘用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據一般是因爲吞吐量的要求而經過處理日誌和日誌聚合來解決。 對於像Hadoop的同樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是經過Hadoop的並行加載機制來統一線上和離線的消息處理,也是爲了經過集羣來提供實時的消息。bootstrap

Kafka特性

Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,有以下特性:
1>. 經過磁盤數據結構提供消息的持久化,這種結構對於即便數以TB的消息存儲也可以保持長時間的穩定性能。
2>. 高吞吐量即便是很是普通的硬件Kafka也能夠支持每秒數百萬的消息。
3>. 支持經過Kafka服務器和消費機集羣來分區消息。
4>. 支持Hadoop並行數據加載。api

Kafka的安裝配置及基礎使用

由於此篇博客是本地Flink消費Kafka的數據實現WordCount,因此Kafka不須要作過多配置,從Apache官網下載安裝包直接解壓便可使用
這裏咱們建立一個名爲test的topic
在producer輸入數據流:服務器

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

在consumer監控從producer輸入的數據流:網絡

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

1>. 建立maven project

<dependencies> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.0.0</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.8_2.11</artifactId> <version>1.0.0</version> </dependency> </dependencies>
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

4>. 設置監控數據流時間間隔(官方叫狀態與檢查點)

env.enableCheckpointing(1000);

5>. 配置kafka和zookeeper的ip和端口

Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.1.20:9092"); properties.setProperty("zookeeper.connect", "192.168.1.20:2181"); properties.setProperty("group.id", "test");
FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(),properties);

7>. 將Kafka的數據轉成flink的DataStream類型

DataStream<String> stream = env.addSource(myConsumer);

8>. 實施計算模型並輸出結果

DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1); counts.print();

計算模型具體邏輯代碼數據結構

public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { String[] tokens = value.toLowerCase().split("\\W+"); for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } }

4. 驗證

1>. Kafka producer輸入

2>. Flink客戶端馬上得出結果

完整代碼

package com.scn; import java.util.Properties; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.util.Collector; public class FilnkCostKafka { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.1.20:9092"); properties.setProperty("zookeeper.connect", "192.168.1.20:2181"); properties.setProperty("group.id", "test"); FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(), properties); DataStream<String> stream = env.addSource(myConsumer); DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1); counts.print(); env.execute("WordCount from Kafka data"); } public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { String[] tokens = value.toLowerCase().split("\\W+"); for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } } }
相關文章
相關標籤/搜索