Mysql增量寫入Hdfs(一) --將Mysql數據寫入Kafka Topic

一. 概述

在大數據的靜態數據處理中,目前廣泛採用的是用Spark+Hdfs(Hive/Hbase)的技術架構來對數據進行處理。html

但有時候有其餘的需求,須要從其餘不一樣數據源不間斷得采集數據,而後存儲到Hdfs中進行處理。而追加(append)這種操做在Hdfs裏面明顯是比較麻煩的一件事。所幸有了Storm這麼個流數據處理這樣的東西問世,能夠幫咱們解決這些問題。mysql

不過光有Storm還不夠,咱們還須要其餘中間件來協助咱們,讓全部其餘數據源都歸於一個通道。這樣就能實現不一樣數據源以及Hhdfs之間的解耦。而這個中間件Kafka無疑是一個很好的選擇。git

這樣咱們就可讓Mysql的增量數據不停得拋出到Kafka,然後再讓storm不停得從Kafka對應的Topic讀取數據並寫入到Hdfs中。程序員

二.binlog和maxwell介紹

2.1Mysql binlog介紹

binlog即Mysql的二進制日誌。它能夠說是Mysql最重要的日誌了,它記錄了全部的DDL和DML(除了數據查詢語句)語句,以事件形式記錄,還包含語句所執行的消耗的時間,MySQL的二進制日誌是事務安全型的。github

上面所說的提到了DDL和DML,可能有些同窗不瞭解,這裏順便說一下:redis

  • DDL:數據定義語言DDL用來建立數據庫中的各類對象-----表、視圖、索引、同義詞、聚簇等如:CREATETABLE/VIEW/INDEX/SYN/CLUSTER...
  • DML:數據操縱語言DML主要有三種形式:插入(INSERT),更新(UPDATE),以及刪除(DELETE)。

在Mysql中,binlog默認是不開啓的,由於有大約1%(官方說法)的性能損耗,若是要手動開啓,流程以下:算法

  1. vi編輯打開mysql配置文件:
vi /usr/local/mysql/etc/my.cnf

在[mysqld]區塊設置/添加以下,sql

log-bin=mysql-bin

注意必定要在[mysqld]下。數據庫

  1. 重啓Mysql
pkill mysqld
/usr/local/mysql/bin/mysqld_safe --user=mysql &

2.2kafka

這裏只對Kafka作一個基本的介紹,更多的內容能夠度娘一波。編程

上面的圖片是kafka官方的一個圖片,咱們目前只須要關注Producers和Consumers就好了。

Kafka是一個分佈式發佈-訂閱消息系統。分佈式方面由Zookeeper進行協同處理。消息訂閱其實說白了吧,就是一個隊列,分爲消費者和生產者,就像上圖中的內容,有數據源充當Producer生產數據到kafka中,而有數據充當Consumers,消費kafka中的數據。

上圖中的offset指的是數據的寫入以及消費的位置的信息,這是由Zookeeper管理的。也就是說,當Consumers重啓或是怎樣,須要從新從kafka讀取消息時,總不能讓它從頭開始消費數據吧,這時候就須要有個記錄能告訴你從哪裏開始從新讀取。這就是offset。

kafka中還有一個相當重要的概念,那就是topic。不過這個其實仍是很好理解的,好比你要訂閱一些消息,你確定是不會訂閱全部消息的吧,你只須要訂閱你感興趣的主題,好比攝影,編程,搞笑這些主題。而這裏主題的概念其實和topic是同樣的。總之,能夠將topic歸結爲通道,kafka中有不少個通道,不一樣的Producer向其中一個通道生產數據,也就是拋數據進去這個通道,Comsumers不停得消費通道中的數據。

而咱們要作的就是將Mysql binlog產生的數據拋到kafka中充看成生產者,而後由storm充當消費者,不停得消費數據並寫入到Hdfs中。

至於怎麼將binlog的數據拋到kafka,別急,下面咱們就來介紹。

2.3maxwell

maxwell這個工具能夠很方便得監聽Mysql的binlog,而後每當binlog發生變化時,就會以json格式拋出對應的變化數據到Kafka中。好比當向mysql一張表中插入一條語句的時候,maxwell就會馬上監聽到binlog中有對應的記錄增長,而後將一些信息包括插入的數據都轉化成json格式,而後拋到kafka指定的topic中。

下載地址在這裏能夠找到。

除了Kafka外,其實maxwell還支持寫入到其餘各類中間件,好比redis。
同時maxwell是比較輕量級的工具,只須要在mysql中新建一個數據庫供它記錄一些信息,而後就能夠直接運行。

三.使用maxwell監聽binlog

接下來咱們將的是若是使用maxwell,讓它監聽mysql的binlog並拋到kafka中。maxwell主要有兩種運行方式。一種是使用配置文件,另外一種則是在命令行中添加參數的方式運行。這裏追求方便,只使用命令行的方式進行演示。

這裏介紹一下簡單的將數據拋到kafka的命令行腳本吧。

bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' \
   --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=maxwell --port=3306

各項參數說明以下

  • user:mysql用戶名
  • password:mysql密碼
  • host:Mysql地址
  • producer:指定寫入的中間件類型,好比還有redies
  • kafka.bootstrap.servers:kafka的地址
  • kafka_topic:指明寫入到kafka哪一個topic
  • port:mysql端口

啓動以後,maxwell便開始工做了,固然若是你想要讓這條命令能夠在後臺運行的話,可使用Linux的nohup命令,這裏就很少贅述,有須要百度便可。

這樣配置的話一般會將整個數據庫的增刪改都給拋到kafka,但這樣的需求顯然不常見,更常見的應該是具體監聽對某個庫的操做,或是某個表的操做。

在升級到1.9.2(最新版本)後,maxwell爲咱們提供這樣一個參數,讓咱們能夠輕鬆實現上述需求:--filter

這個參數一般包含兩個配置項,exclude和include。意思就是讓你指定排除哪些和包含哪些。好比我只想監聽Adatabase庫下的Atable表的變化。我能夠這樣。

--filter='exclude: *.*, include: Adatabase.Atable'

這樣咱們就能夠輕鬆實現監聽mysqlbinlog的變化,並能夠定製本身的需求。

OK,這一章咱們介紹了mysql binlog,kafka以及maxwell的一些內容,下一篇咱們將會看到storm如何寫入hdfs以及定製一些策略。see you~~

推薦閱讀 :
從分治算法到 MapReduce
一個故事告訴你什麼纔是好的程序員
大數據存儲的進化史 --從 RAID 到 Hadoop Hdfs

相關文章
相關標籤/搜索