基於binlog的離線分析平臺的一些初步實踐mysql
參考文檔: git
http://seanlook.com/2018/01/13/maxwell-binlog/web
https://yq.aliyun.com/articles/338423正則表達式
直接上圖sql
方案1:mongodb
方案2:數據庫
方案3apache
方案1的比較簡單,基本上也是知足使用,也是不錯的選擇。可是功能上比較單一。bootstrap
方案2比較複雜,引入了更多的組件,將數據存到MongoDB裏面。這種引入了kafka的比較適合有多個異構數據庫或者DW數倉抽數的場景。
方案3也比較複雜,和方案2相似,區別就是將數據存到ES裏面,而且graylog自帶了一個web查詢的界面。
這裏咱們實驗採用的是方案2,先把binlog採集到kafka,而後就能夠任意自由消費binlog,更加靈活些。
實驗涉及到的軟件:
OS 版本:CentOS7.5
maxwell 版本:1.22.4
nifi 版本:1.9.2
kafka-eagle 版本:1.3.9
maxwell部署節點: 192.168.20.10
zk+kafka部署節點: 192.168.2.4
kafka-eagle部署的節點: 192.168.2.4
nifi部署的節點: 192.168.2.4
模擬的業務MySQL數據庫:192.168.2.4:3306
kafka 和 zk的部署,不是這裏的重點。我這裏的zk和kafka都是部署在 192.168.2.4上面的,這裏的具體操做我直接跳過。
我實驗中, zk和kafka都是單機部署的,生產環境下必定要使用集羣模式。
1、最好將主機名和ip關係,寫到各主機的 /etc/hosts 中,否則可能遇到解析失敗的狀況
2、須要注意的是,我這裏的zk是高版本的,默認會監聽 8080端口,建議改爲其餘的,把8080端口留給其它服務使用。
[root@Test-dba01 /usr/local/zookeeper-3.5.5-bin ] # cat conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=./data/
clientPort=2181
admin.serverPort=12345
啓動後,能夠看到監聽的端口起來了
[root@Test-dba01 /usr/local/kafka ] # ss -lnt| egrep 2181
LISTEN 0 50 :::2181 :::*
[root@Test-dba01 /usr/local/kafka ] # ss -lnt| egrep 12345
LISTEN 0 50 :::12345 :::*
kafka-eagle 是國內的一個大佬開發出來的, 我這裏用到它主要是喜歡它附帶的ksql功能,支持直接查詢kafka的topic裏面的數據。
此外,這個工具還有不少好用的功能,這裏我就不介紹了。
貼下個人配置
cd /root/kafka-eagle-bin-1.3.9/kafka-eagle-web-1.3.9
egrep -v '^$|^#' /root/kafka-eagle-bin-1.3.9/kafka-eagle-web-1.3.9/conf/system-config.properties
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=192.168.2.4:2181
kafka.zk.limit.size=25
kafka.eagle.webui.port=8048
cluster1.kafka.eagle.offset.storage=kafka
cluster2.kafka.eagle.offset.storage=zk
kafka.eagle.metrics.charts=false
kafka.eagle.sql.fix.error=false
kafka.eagle.sql.topic.records.max=5000
kafka.eagle.mail.enable=false
kafka.eagle.mail.sa=alert_sa@163.com
kafka.eagle.mail.username=alert_sa@163.com
kafka.eagle.mail.password=mqslimczkdqabbbh222222
kafka.eagle.mail.server.host=smtp.163.com
kafka.eagle.mail.server.port=25
kafka.eagle.topic.token=keadmin
cluster1.kafka.eagle.sasl.enable=false
cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
cluster1.kafka.eagle.sasl.mechanism=PLAIN
cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="kafka-eagle";
cluster2.kafka.eagle.sasl.enable=false
cluster2.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
cluster2.kafka.eagle.sasl.mechanism=PLAIN
cluster2.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="kafka-eagle";
kafka.eagle.driver=org.sqlite.JDBC
kafka.eagle.url=jdbc:sqlite:./db/ke.db
kafka.eagle.username=root
kafka.eagle.password=www.kafka-eagle.org
主要就是修改了下zk的地址和sqlite數據庫的路徑,其它保持默認
啓動進程:
export KE_HOME=/root/kafka-eagle-bin-1.3.9/kafka-eagle-web-1.3.9
export PATH=$PATH:$KE_HOME/bin
./bin/ke.sh start
登陸web頁面
http://192.168.2.4:8048/ke/
用戶名 admin
密碼 123456
具體功能,你們自由探索,整個工具仍是很強大的。
maxwell 使用的是 1.22.4 版本
0、在 192.168.2.4的mysql開通帳號,便於maxwell鏈接上去拉取binlog
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'XXXXXX';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
1、在192.168.20.10上部署 maxwell
cd /usr/local/
curl -sLo - https://github.com/zendesk/maxwell/releases/download/v1.22.4/maxwell-1.22.4.tar.gz | tar zxvf -
cd maxwell-1.22.4/
2、輸出到kafka的方式
2.1 拷貝 kafka-clients-2.3.0.jar 到 maxwell的lib/kafka-clients/目錄下
2.2 修改配置文件
cp config.properties.example config.properties 而後修改下, 修改後的內容以下:
log_level=info
producer=kafka
# maxwell的元數據存放的MySQL的鏈接信息
host=localhost
user=maxwell
password=maxwell
producer=kafka
host=127.0.0.1
port=3306
user=maxwell
password=XXXXXX
schema_database=maxwell
gtid_mode=true
ssl=DISABLED
replication_ssl=DISABLED
schema_ssl=DISABLED
# 上游MySQL的鏈接信息
replication_host=192.168.2.4
replication_user=maxwell
replication_password=XXXXXX
replication_port=3306
# 定義須要輸出哪些數據
output_binlog_position=true
output_gtid_position=true
output_nulls=true
output_server_id=true
output_ddl=true
output_commit_info=true
kafka.bootstrap.servers=192.168.2.4:9092 # 生產環境上,這裏須要填多個kafka的鏈接方式
kafka_topic=maxwell
ddl_kafka_topic=maxwell_ddl
kafka.compression.type=snappy
kafka.retries=5
kafka.acks=1
producer_partition_by=database
# 下面是複製的過濾規則,不符合下面條件的binlog不會被保留下來【支持正則表達式】
# filter= exclude: test.*, include: db.*, include: coupons.*, include: testdb.user
# 暴露metrics地址用於監控
metrics_type=http
metrics_prefix=MaxwellMetrics
metrics_jvm=true
http_port=8081
2.3 前臺啓動
啓動前,先去建立2個topic:
bin/kafka-topics.sh --zookeeper 192.168.2.4:2181 --create --topic maxwell --partitions 20 --replication-factor 2
bin/kafka-topics.sh --zookeeper 192.168.2.4:2181 --create --topic maxwell_ddl --partitions 6 --replication-factor 2
測試期間,咱們先前臺啓動maxwell進程
bin/maxwell --config config.properties --producer=kafka --kafka_version=2.3.0
另外建議:在 192.168.2.4 上咱們啓動2個前臺consumer進程,用於觀察數據進入kafka的狀況:
cd /opt/kafka1/bin/
./kafka-console-consumer.sh --bootstrap-server 192.168.2.4:9092 --topic maxwell
./kafka-console-consumer.sh --bootstrap-server 192.168.2.4:9092 --topic maxwell_ddl
maxwell topic裏面的數據;相似這樣:
{"database":"test","table":"resourcesinfo","type":"delete","ts":1571644826,"xid":5872872,"xoffset":78,"position":"mysql-bin.0003306,"data":{"id":94,"name":"222","hostname":"33","spec":"","belong":"","createtime":"0000-00-00 00:00:00.000000"}}
maxwell_ddl topic裏面的數據;相似這樣:
{"type":"table-create","database":"leaf","table":"d2sf","def":{"database":"leaf","charset":"utf8mb4","table":"d2sf","columns":[{"type":"varchar","name":"biz_tag","charset":"utf8mb4"},{"type":"bigint","name":"max_id","signed":true},{"type":"int","name":"step","signed":true},{"type":"varchar","name":"description","charset":"utf8mb4"},{"type":"timestamp","name":"update_time","column-length":0}],"primary-key":["biz_tag"]},"ts":1571642076000,"sql":"create table d2sf like leaf_alloc","position":"mysql-bin.000003:172413504","gtid":"fd2adbd9-e263-11e8-847a-141877487b3d:1386014"}
不是這裏的重點步驟。
我這裏是在 192.168.2.4上,部署的單機多實例的mongodb複製集。
192.168.2.4:27017 standby
192.168.2.4:27017 primary
192.168.2.4:27019 ARBITER
沒有設置密碼登陸。
而後,建立個測試用的數據庫和表
production:PRIMARY> use testdb
production:PRIMARY> db.createCollection("maxwell")
NIFI是一個ETL工具,比較簡單。
cd /root/
tar xf nifi-1.9.2.tar.gz -C ./
cd /root/nifi-1.9.2
咱們這裏也不優化相關參數了,先嚐試跑起來看看效果
./bin/nifi.sh start
稍等3分鐘,查看下狀態
./bin/nifi.sh status
Java home: /usr/local/jdk
NiFi home: /root/nifi-1.9.2
Bootstrap Config File: /root/nifi-1.9.2/conf/bootstrap.conf
2019-10-21 17:46:48,372 INFO [main] org.apache.nifi.bootstrap.Command Apache NiFi is currently running, listening to Bootstrap on port 43024, PID=130790
訪問web界面
http://192.168.2.4:8080/nifi/
拖動 "process group" 這個按鈕,到網頁中間,建立一個名爲test的 "process group"
而後雙擊 test這個方框,在這個頁面上,建立一個2個processpor,並用線條鏈接起來
高能預警: 下面的配置操做,有點難度,我貼的圖也不太好敘述,不必定能幫到您,若是有問題須要本身再摸索下!
而後,咱們再 192.168.2.4 上,隨便的crud些數據, 看看 NIFI 界面上是否有數值的變化。
若是,這裏沒問題後。咱們到mongodb數據庫裏面看看數據是否進去了。
到mongodb裏面,查看是否有數據進來
use maxwell
db.maxwell.findOne()
有數據後,咱們就能夠繼續基於mongodb的各類操做了
db.maxwell.createIndex({ts:1},{background:true})
db.maxwell.createIndex({table:1},{background:true})
db.maxwell.createIndex({database:1},{background:true})
db.maxwell.createIndex({database:1,table:1},{background:true})
db.maxwell.find({"table":"tbsdb"}).pretty()
db.maxwell.find({"table":"leaf_alloc"}).pretty()
db.maxwell.find({"database":"leaf"}).pretty()
db.maxwell.find({"database":"test"}).pretty() 日誌相似這樣:
統計某個時間範圍內的操做:
db.maxwell.count({'ts':{$lt:1571673600,$gt:1571587200},"database":"test","type":"delete"})
db.maxwell.count({'ts':{$lt:1571673600,$gt:1571587200},"database":"test","type":"update"})
db.maxwell.count({'ts':{$lt:1571673600,$gt:1571587200},"database":"test","type":"insert"})