一,架構介紹html
生產中因爲歷史緣由web後端,mysql集羣,kafka集羣(或者其它消息隊列)會存在一下三種結構。mysql
1,數據先入mysql集羣,再入kafkaweb
數據入mysql集羣是不可更改的,如何再高效的將數據寫入kafka呢?面試
A),在表中存在自增ID的字段,而後根據ID,按期掃描表,而後將數據入kafka。sql
B),有時間字段的,能夠按照時間字段按期掃描入kafka集羣。數據庫
C),直接解析binlog日誌,而後解析後的數據寫入kafka。json
2,web後端同時將數據寫入kafka和mysql集羣後端
3,web後端將數據先入kafka,再入mysql集羣架構
這個方式,有不少優勢,好比能夠用kafka解耦,而後將數據按照離線存儲和計算,實時計算兩個模塊構建很好的大數據架構。抗高峯,便於擴展等等。socket
二,實現步驟
1,mysql安裝準備
安裝mysql估計看這篇文章的人都沒什麼問題,因此本文不具體講解了。
A),假如你單機測試請配置好server_id
B),開啓binlog,只需配置log-bin
[root@localhost ~]# cat /etc/my.cnf
[mysqld]
server_id=1
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
user=mysql
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
log-bin=/var/lib/mysql/mysql-binlog
[mysqld_safe]
log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid
?
建立測試庫和表
create database school character set utf8 collate utf8_general_ci;
?
create table student(
name varchar(20) not null comment '姓名',
sid int(10) not null primary key comment '學員',
majora varchar(50) not null default '' comment '專業',
tel varchar(11) not null unique key comment '手機號',
birthday date not null comment '出生日期'
);
2,binlog日誌解析
兩種方式:
一是掃面binlog文件(有須要的話請聯繫浪尖)
二是經過複製同步的方式
暫實現了第二種方式,樣例代碼以下:
MysqlBinlogParse mysqlBinlogParse=new MysqlBinlogParse(args[0],Integer.valueOf(args[1]),args[2],args[3]){
@Override
public void processDelete(String queryType, String database, String sql) {
try {
String jsonString=SqlParse.parseDeleteSql(sql);
JSONObject jsonObject=JSONObject.fromObject(jsonString);
jsonObject.accumulate("database", database);
jsonObject.accumulate("queryType", queryType);
System.out.println(sql);
System.out.println(" ");
System.out.println(" ");
System.out.println(jsonObject.toString());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void processInsert(String queryType, String database, String sql) {
try {
String jsonString=SqlParse.parseInsertSql(sql);
JSONObject jsonObject=JSONObject.fromObject(jsonString);
jsonObject.accumulate("database", database);
jsonObject.accumulate("queryType", queryType);
System.out.println(sql);
System.out.println(" ");
System.out.println(" ");
System.out.println(jsonObject.toString());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void processUpdate(String queryType, String database, String sql) {
String jsonString;
try {
jsonString=SqlParse.parseUpdateSql(sql);
JSONObject jsonObject=JSONObject.fromObject(jsonString);
jsonObject.accumulate("database", database);
jsonObject.accumulate("queryType", queryType);
System.out.println(sql);
System.out.println(" ");
System.out.println(" ");
System.out.println(jsonObject.toString());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
mysqlBinlogParse.setServerId(3);
mysqlBinlogParse.start();
?
?
3,sql語法解析
從原始的mysql 的binlog event中,咱們能解析到的信息,主要的也就是mysql的database,query類型(INSERT,DELETE,UPDATE),具體執行的sql。我這裏封裝了三個重要的方法。只暴露了這三個接口,那麼咱們要明白的事情是,美味的英文咱們入kafka,而後流式處理的時候但願的到的是跟插入mysql後同樣格式的數據。這個時候咱們就要本身作sql的解析,將query的sql解析成字段形式的數據,供流式處理。解析的格式以下:
A),INSERT
B),DELETE
C),UPDATE
最終浪尖是將解析後的數據封裝成了json,而後咱們本身寫kafka producer將消息發送到kafka,後端就能夠處理了。
三,總結
最後,浪尖仍是建議web後端數據最好先入消息隊列,如kafka,而後分離線和實時將數據進行解耦分流,用於實時處理和離線處理。
?
消息隊列的訂閱者能夠根據須要隨時擴展,能夠很好的擴展數據的使用者。
?
消息隊列的橫向擴展,增長吞吐量,作起來仍是很簡單的。這個用傳統數據庫,分庫分表仍是很麻煩的。
?
因爲消息隊列的存在,也能夠幫助咱們抗高峯,避免高峯時期後端處理壓力過大致使整個業務處理宕機。
?
具體源碼球友能夠在知識星球獲取。
歡迎你們進入知識星球,學習更多更深刻的大數據知識,面試經驗,獲取更多更詳細的資料。