Mysql 流增量寫入 Hdfs(一) --從 mysql 到 kafka

一. 概述

在大數據的靜態數據處理中,目前廣泛採用的是用 Spark + Hdfs (Hive / Hbase) 的技術架構來對數據進行處理。mysql

但有時候有其餘的需求,須要從其餘不一樣數據源不間斷得采集數據,而後存儲到 Hdfs 中進行處理。而追加(append)這種操做在 Hdfs 裏面明顯是比較麻煩的一件事。所幸有了 Storm 這麼個流數據處理這樣的東西問世,能夠幫咱們解決這些問題。git

不過光有 Storm 還不夠,咱們還須要其餘中間件來協助咱們,讓全部其餘數據源都歸於一個通道。這樣就能實現不一樣數據源以及 Hhdfs 之間的解耦。而這個中間件 Kafka 無疑是一個很好的選擇。github

這樣咱們就可讓 Mysql 的增量數據不停得拋出到 Kafka ,然後再讓 storm 不停得從 Kafka 對應的 Topic 讀取數據並寫入到 Hdfs 中。redis

二. 基本知識

2.1 Mysql binlog 介紹

binlog 即 Mysql 的二進制日誌。它能夠說是 Mysql 最重要的日誌了,它記錄了全部的DDL和DML(除了數據查詢語句)語句,以事件形式記錄,還包含語句所執行的消耗的時間,MySQL的二進制日誌是事務安全型的。sql

上面所說的提到了 DDL 和 DML ,可能有些同窗不瞭解,這裏順便說一下:數據庫

  • DDL:數據定義語言DDL用來建立數據庫中的各類對象-----表、視圖、索引、同義詞、聚簇等如:CREATE TABLE/VIEW/INDEX/SYN/CLUSTER...
  • DML:數據操縱語言DML主要有三種形式:插入(INSERT), 更新(UPDATE),以及刪除(DELETE)。

在 Mysql 中,binlog 默認是不開啓的,由於有大約 1% (官方說法)的性能損耗,若是要手動開啓,流程以下:編程

  1. vi編輯打開mysql配置文件:
vi /usr/local/mysql/etc/my.cnf

在[mysqld] 區塊設置/添加以下,json

log-bin=mysql-bin

注意必定要在 [mysqld] 下。 2. 重啓 Mysqlbootstrap

pkill mysqld
/usr/local/mysql/bin/mysqld_safe --user=mysql &

2.2 kafka

這裏只對 Kafka 作一個基本的介紹,更多的內容能夠度娘一波。安全

<img src="https://img2018.cnblogs.com/blog/1011838/201812/1011838-20181206075114483-251042842.png" width="70%">

上面的圖片是 kafka 官方的一個圖片,咱們目前只須要關注 Producers 和 Consumers 就好了。

Kafka 是一個分佈式發佈-訂閱消息系統。分佈式方面由 Zookeeper 進行協同處理。消息訂閱其實說白了吧,就是一個隊列,分爲消費者和生產者,就像上圖中的內容,有數據源充當 Producer 生產數據到 kafka 中,而有數據充當 Consumers ,消費 kafka 中的數據。

<img src="https://img2018.cnblogs.com/blog/1011838/201812/1011838-20181206075127662-202539931.png" width="70%">

上圖中的 offset 指的是數據的寫入以及消費的位置的信息,這是由 Zookeeper 管理的。也就是說,當 Consumers 重啓或是怎樣,須要從新從 kafka 讀取消息時,總不能讓它從頭開始消費數據吧,這時候就須要有個記錄能告訴你從哪裏開始從新讀取。這就是 offset 。

kafka 中還有一個相當重要的概念,那就是 topic 。不過這個其實仍是很好理解的,好比你要訂閱一些消息,你確定是不會訂閱全部消息的吧,你只須要訂閱你感興趣的主題,好比攝影,編程,搞笑這些主題。而這裏主題的概念其實和 topic 是同樣的。總之,能夠將 topic 歸結爲通道,kafka 中有不少個通道,不一樣的 Producer 向其中一個通道生產數據,也就是拋數據進去這個通道,Comsumers 不停得消費通道中的數據。

而咱們要作的就是將 Mysql binlog 產生的數據拋到 kafka 中充看成生產者,而後由 storm 充當消費者,不停得消費數據並寫入到 Hdfs 中。

至於怎麼將 binlog 的數據拋到 kafka ,別急,下面咱們就來介紹。

2.3 maxwell

maxwell 這個工具能夠很方便得監聽 Mysql 的 binlog ,**而後每當 binlog 發生變化時,就會以 json 格式拋出對應的變化數據到 Kafka 中。**好比當向 mysql 一張表中插入一條語句的時候,maxwell 就會馬上監聽到 binlog 中有對應的記錄增長,而後將一些信息包括插入的數據都轉化成 json 格式,而後拋到 kafka 指定的 topic 中。

下載地址在這裏能夠找到。

除了 Kafka 外,其實 maxwell 還支持寫入到其餘各類中間件,好比 redis。 同時 maxwell 是比較輕量級的工具,只須要在 mysql 中新建一個數據庫供它記錄一些信息,而後就能夠直接運行。

三. 使用 maxwell 監聽 binlog

接下來咱們將的是若是使用 maxwell ,讓它監聽 mysql 的 binlog 並拋到 kafka 中。maxwell 主要有兩種運行方式。一種是使用配置文件,另外一種則是在命令行中添加參數的方式運行。這裏追求方便,只使用命令行的方式進行演示。

這裏介紹一下簡單的將數據拋到 kafka 的命令行腳本吧。

bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' \
   --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=maxwell --port=3306

各項參數說明以下

  • user:mysql 用戶名
  • password:mysql 密碼
  • host:Mysql 地址
  • producer:指定寫入的中間件類型,好比還有 redies
  • kafka.bootstrap.servers:kafka 的地址
  • kafka_topic:指明寫入到 kafka 哪一個 topic
  • port:mysql 端口

啓動以後,maxwell 便開始工做了,固然若是你想要讓這條命令能夠在後臺運行的話,可使用 Linux 的 nohup 命令,這裏就很少贅述,有須要百度便可。

這樣配置的話一般會將整個數據庫的增刪改都給拋到 kafka ,但這樣的需求顯然不常見,更常見的應該是具體監聽對某個庫的操做,或是某個表的操做。

在升級到 1.9.2(最新版本)後,maxwell 爲咱們提供這樣一個參數,讓咱們能夠輕鬆實現上述需求:--filter

這個參數一般包含兩個配置項,exclude 和 include。意思就是讓你指定排除哪些和包含哪些。好比我只想監聽 Adatabase 庫下的 Atable 表的變化。我能夠這樣。

--filter='exclude: *.*, include: Adatabase.Atable'

這樣咱們就能夠輕鬆實現監聽 mysql binlog 的變化,並能夠定製本身的需求。

OK,這一章咱們介紹了 mysql binlog ,kafka 以及 maxwell 的一些內容,下一篇咱們將會看到 storm 如何寫入 hdfs 以及定製一些策略。see you~~


歡迎關注公衆號哈爾的數據城堡,裏面有數據,代碼,以及深度的思考。

相關文章
相關標籤/搜索