基於Flink SQL構建流式應用

基於 Kafka, MySQL, Elasticsearch, Kibana,使用 Flink SQL 構建電商用戶行爲的實時分析應用。Flink SQL 可輕鬆鏈接各類外部系統,原生支持事件時間和亂序數據處理、維表關聯,有豐富的內置函數等。html

1)購買騰訊雲服務器,安裝Java 13.0.2,安裝Dockerjava

//ubuntu 18.04 安裝 docker
https://www.cnblogs.com/ws17345067708/p/10455460.html

2)下載 docker-compose.yml 文件。Docker Compose 包含的容器有:node

 1 version: '2.1'
 2 services:
 3   datagen:
 4     image: jark/datagen:0.1
 5     command: "java -classpath /opt/datagen/flink-sql-demo.jar myflink.SourceGenerator --input /opt/datagen/user_behavior.log --output kafka kafka:9094 --speedup 1000"
 6     depends_on:
 7       - kafka
 8     environment:
 9       ZOOKEEPER_CONNECT: zookeeper
10       KAFKA_BOOTSTRAP: kafka
11   mysql:
12     image: jark/mysql-example:0.1
13     ports:
14       - "3306:3306"
15     environment:
16       - MYSQL_ROOT_PASSWORD=123456
17   zookeeper:
18     image: wurstmeister/zookeeper:3.4.6
19     ports:
20       - "2181:2181"
21   kafka:
22     image: wurstmeister/kafka:2.12-2.2.1
23     ports:
24       - "9092:9092"
25       - "9094:9094"
26     depends_on:
27       - zookeeper
28     environment:
29       - KAFKA_ADVERTISED_LISTENERS=INSIDE://:9094,OUTSIDE://localhost:9092
30       - KAFKA_LISTENERS=INSIDE://:9094,OUTSIDE://:9092
31       - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
32       - KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE
33       - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
34       - KAFKA_CREATE_TOPICS="user_behavior:1:1"
35     volumes:
36       - /var/run/docker.sock:/var/run/docker.sock
37   elasticsearch:
38     image: docker.elastic.co/elasticsearch/elasticsearch:7.6.0
39     environment:
40       - cluster.name=docker-cluster
41       - bootstrap.memory_lock=true
42       - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
43       - discovery.type=single-node
44     ports:
45       - "9200:9200"
46       - "9300:9300"
47     ulimits:
48       memlock:
49         soft: -1
50         hard: -1
51       nofile:
52         soft: 65536
53         hard: 65536
54   kibana:
55     image: docker.elastic.co/kibana/kibana:7.6.0
56     ports:
57       - "5601:5601"
View Code
  • DataGen: 數據生成器。容器啓動後會自動開始生成用戶行爲數據,併發送到 Kafka 集羣中。
  • MySQL: 集成了 MySQL 5.7 ,以及預先建立好了類目表(category),預先填入了子類目與頂級類目的映射關係,後續做爲維表使用。
  • Kafka: 主要用做數據源。DataGen 組件會自動將數據灌入這個容器中。
  • Zookeeper: Kafka 容器依賴。
  • Elasticsearch: 主要存儲 Flink SQL 產出的數據。
  • Kibana: 可視化 Elasticsearch 中的數據。

3)啓動docker容器 docker-compose up -dpython

查看容器是否正常啓動 docker psmysql

中止全部容器 docker-compose downsql

注意:"docker-compose up -d"會報錯。python2和python3的差別大,使用pip由於多版本python會報錯docker

# pip
2 Traceback (most recent call last):
3   File "/usr/bin/pip", line 5, in <module>
4     from pkg_resources import load_entry_point
5 ImportError: No module named pkg_resources

解決辦法:apache

sudo apt-get clean
sudo apt-get update
sudo apt-get install --reinstall python-minimal python-lockfile

 

4) 下載 Flink 1.10.0 安裝包並解壓,下載 json,kafka,Elasticsearch,jdbc,MySQL 包依賴放在 Flink 的 lib 目錄。注意 Flink 的 Scala 版本與 Elasticsearch 鏈接器的 Scala 版本保持一致,可所有爲2.11版本。 Elasticsearch 版本6的 connector 可用來鏈接 Elasticsearch 7.6.0。json

wget -bc https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.10.0/flink-json-1.10.0.jar
wget -bc https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar
wget -bc https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch6_2.11/1.10.0/flink-sql-connector-elasticsearch6_2.11-1.10.0.jar
wget -bc https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.0/flink-jdbc_2.11-1.10.0.jar
wget -bc https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar

 

5) 將 conf/flink-conf.yaml 中的 taskmanager.numberOfTaskSlots 修改爲 10,會同時運行多個任務。bootstrap

6)啓動 Flink 集羣 ./bin/start-cluster.sh

經過 http://IP:8081 能夠訪問到 Flink Web UI。

7)啓動 SQL CLI:bin/sql-client.sh embedded

8)使用 DDL 建立 Kafka 表,做爲原始數據表

9)使用 DDL 建立 Elasticsearch 表統計每小時成交量、一天每10分鐘累計獨立用戶數、類目排行榜。

10)使用 Kibana 實時顯示可視化結果。

 

ref:

http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/

相關文章
相關標籤/搜索