老劉是一名即將找工做的研二學生,寫博客一方面是總結大數據開發的知識點,一方面是但願可以幫助夥伴讓自學今後不求人。因爲老劉是自學大數據開發,博客中確定會存在一些不足,還但願你們可以批評指正,讓咱們一塊兒進步!java
大數據領域數據源有業務庫的數據,也有移動端埋點數據、服務器端產生的日誌數據。咱們在對數據進行採集時根據下游對數據的要求不一樣,咱們可使用不一樣的採集工具來進行。今天老劉給你們講的是同步mysql增量數據的工具Canal,本篇文章的大綱以下:mysql
老劉爭取用這一篇文章讓你們直接上手 Canal 這個工具,再也不花別的時間來學習。web
因爲 Canal 是用來同步 mysql 中增量數據的,因此老劉先講 mysql 的主備複製原理,以後再講 Canal 的核心知識點。sql
根據這張圖,老劉把 mysql 的主備複製原理分解爲以下流程:數據庫
那麼 mysql 主備複製實現原理就講完了,你們看完這個流程,能不能猜到 Canal 的工做原理?服務器
Canal 的工做原理就是它模擬 MySQL slave 的交互協議,把本身假裝爲 MySQL slave,向 MySQL master 發動 dump 協議。MySQL master 收到 dump 請求後,就會開始推送 binlog 給 Canal。最後 Canal 就會解析 binlog 對象。架構
Canal,美[kəˈnæl],是這樣讀的,意思是水道/管道/渠道,主要用途就是用來同步 MySQL 中的增量數據(能夠理解爲實時數據),是阿里巴巴旗下的一款純 Java 開發的開源項目。併發
server 表明一個 canal 運行實例,對應於一個 JVM。 instance 對應於一個數據隊列,1 個 canal server 對應 1..n 個 instance instance 下的子模塊:框架
到如今 Canal 的基本概念就講完了,那接下來就要講 Canal 如何同步 mysql 的增量數據。編輯器
咱們用 Canal 同步 mysql 增量數據的前提是 mysql 的 binlog 是開啓的,阿里雲的 mysql 數據庫是默認開啓 binlog 的,可是若是咱們是本身安裝的 mysql 須要手動開啓 binlog 日誌功能。
先找到 mysql 的配置文件:
etc/my.cnf
server-id=1
log-bin=mysql-bin
binlog-format=ROW
這裏有一個知識點是關於 binlog 的格式,老劉給你們講講。
binlog 的格式有三種:STATEMENT、ROW、MIXED
ROW 模式(通常就用它)
日誌會記錄每一行數據被修改的形式,不會記錄執行 SQL 語句的上下文相關信息,只記錄要修改的數據,哪條數據被修改了,修改爲了什麼樣子,只有 value,不會有 SQL 多表關聯的狀況。
優勢:它僅僅只須要記錄哪條數據被修改了,修改爲什麼樣子了,因此它的日誌內容會很是清楚地記錄下每一行數據修改的細節,很是容易理解。
缺點:ROW 模式下,特別是數據添加的狀況下,全部執行的語句都會記錄到日誌中,都將以每行記錄的修改來記錄,這樣會產生大量的日誌內容。
STATEMENT 模式
每條會修改數據的 SQL 語句都會被記錄下來。
缺點:因爲它是記錄的執行語句,因此,爲了讓這些語句在 slave 端也能正確執行,那他還必須記錄每條語句在執行過程當中的一些相關信息,也就是上下文信息,以保證全部語句在 slave 端被執行的時候可以獲得和在 master 端執行時候相同的結果。
但目前例如 step()函數在有些版本中就不能被正確複製,在存儲過程當中使用了 last-insert-id()函數,可能會使 slave 和 master 上獲得不一致的 id,就是會出現數據不一致的狀況,ROW 模式下就沒有。
MIXED 模式
以上兩種模式都使用。
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,須要修改爲本身的數據庫信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,須要修改爲本身的數據庫信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
其中,canal.instance.connectionCharset 表明數據庫的編碼方式對應到 java 中的編碼類型,好比 UTF-8,GBK,ISO-8859-1。
sh bin/startup.sh
關閉使用 bin/stop.sh
觀察日誌
通常使用 cat 查看 canal/canal.log、example/example.log
啓動客戶端
在 IDEA 中業務代碼,mysql 中若是有增量數據就拉取過來,在 IDEA 控制檯打印出來
在 pom.xml 文件中添加:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.12</version>
</dependency>
添加客戶端代碼:
public class Demo {
public static void main(String[] args) {
//建立鏈接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop03", 11111),
"example", "", "");
connector.connect();
//訂閱
connector.subscribe();
connector.rollback();
int batchSize = 1000;
int emptyCount = 0;
int totalEmptyCount = 100;
while (totalEmptyCount > emptyCount) {
Message msg = connector.getWithoutAck(batchSize);
long id = msg.getId();
List<CanalEntry.Entry> entries = msg.getEntries();
if(id == -1 || entries.size() == 0){
emptyCount++;
System.out.println("emptyCount : " + emptyCount);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}else{
emptyCount = 0;
printEntry(entries);
}
connector.ack(id);
}
}
// batch -> entries -> rowchange - rowdata -> cols
private static void printEntry(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries){
if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){
continue;
}
CanalEntry.RowChange rowChange = null;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
CanalEntry.EventType eventType = rowChange.getEventType();
System.out.println(entry.getHeader().getLogfileName()+" __ " +
entry.getHeader().getSchemaName() + " __ " + eventType);
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for(CanalEntry.RowData rowData : rowDatasList){
for(CanalEntry.Column column: rowData.getAfterColumnsList()){
System.out.println(column.getName() + " - " +
column.getValue() + " - " +
column.getUpdated());
}
}
}
}
}
在大數據領域不少框架都會有 HA 機制,Canal 的 HA 分爲兩部分,Canal server 和 Canal client 分別有對應的 HA 實現:
整個 HA 機制的控制主要是依賴了 ZooKeeper 的幾個特性,ZooKeeper 這裏就不講了。
Canal Server:
Canal HA 的配置,並把數據實時同步到 kafka 中。
canal.zkServers = hadoop02:2181,hadoop03:2181,hadoop04:2181
canal.serverMode = kafka
canal.mq.servers = hadoop02:9092,hadoop03:9092,hadoop04:9092
canal.instance.mysql.slaveId = 790 /兩臺canal server的slaveID惟一
canal.mq.topic = canal_log //指定將數據發送到kafka的topic
講完了 Canal 工具,如今給你們簡單總結下目前常見的數據採集工具,不會涉及架構知識,只是簡單總結,讓你們有個印象。
常見的數據採集工具備:DataX、Flume、Canal、Sqoop、LogStash 等。
DataX 是阿里巴巴開源的一個異構數據源離線同步工具,異構數據源離線同步指的是將源端數據同步到目的端,可是端與端的數據源類型種類繁多,在沒有 DataX 以前,端與端的鏈路將組成一個複雜的網狀結構,很是零散沒法把同步核心邏輯抽象出來。
爲了解決異構數據源同步問題,DataX 將複雜的網狀的同步鏈路變成了星型數據鏈路,DataX 做爲中間傳輸載體負責鏈接各類數據源。
因此,當須要接入一個新的數據源的時候,只須要將此數據源對接到 DataX,就能夠跟已有的數據源作到無縫數據同步。
DataX自己做爲離線數據同步框架,採用Framework+plugin架構構建。將數據源讀取和寫入抽象成爲Reader/Writer插件,歸入到整個同步框架中。
DataX的核心架構以下圖:
核心模塊介紹:
Flume主要應用的場景是同步日誌數據,主要包含三個組件:Source、Channel、Sink。
Flume最大的優勢就是官網提供了豐富的Source、Channel、Sink,根據不一樣的業務需求,咱們能夠在官網查找相關配置。另外,Flume還提供了自定義這些組件的接口。
Logstash就是一根具有實時數據傳輸能力的管道,負責將數據信息從管道的輸入端傳輸到管道的輸出端;與此同時這根管道還可讓你根據本身的需求在中間加上過濾網,Logstash提供了不少功能強大的過濾網來知足各類應用場景。
Logstash是由JRuby編寫,使用基於消息的簡單架構,在JVM上運行。在管道內的數據流稱之爲event,它分爲inputs階段、filters階段、outputs階段。
Sqoop是Hadoop和關係型數據庫之間傳送數據的一種工具,它是用來從關係型數據庫如MySQL到Hadoop的HDFS從Hadoop文件系統導出數據到關係型數據庫。Sqoop底層用的仍是MapReducer,用的時候必定要注意數據傾斜。
老劉本篇文章主要講述了Canal工具的核心知識點及其數據採集工具的對比,其中數據採集工具只是大體講了講概念和應用,目的也是讓你們有個印象。老劉敢作保證看完這篇文章基本等於入門,剩下的就是練習了。
好啦,同步mysql增量數據的工具Canal的內容就講完了,儘管當前水平可能不及各位大佬,但老劉會努力變得更加優秀,讓各位小夥伴自學今後不求人!
若是有相關問題,聯繫公衆號:努力的老劉。文章都看到這了,點贊關注支持一波!