不少狀況大數據集羣須要獲取業務數據,用於分析。一般有兩種方式:java
第一種能夠是在業務中編寫代碼,將以爲須要發送的數據發送到消息隊列,最終落地到大數據集羣。mysql
第二種則是經過數據同步的方式,將關係型數據同步到大數據集羣,能夠是存儲在 hdfs 上,使用 hive 進行分析,或者是直接存儲到 hbase 中。sql
其中數據同步又能夠大體分爲兩種:增量同步、CRUD 同步。數據庫
增量同步是隻將關係型數據庫中新增的數據進行同步,對於修改、刪除操做不進行同步,這種同步方式適用於那些一旦生成就不會變更的數據。 CRUD 同步則是數據的增、刪、改都須要進行同步,保證兩個庫中的數據一致性。瀏覽器
本文不講 binlog + Canal + 消息隊列 + JAR 實現數據實時同步的方案,也不講使用 Sqoop 進行離線同步。而是講解如何使用 Streamsets 零代碼完成整個實時同步流程。關於 Streamsets 具體是什麼,以及能作哪些其餘的事情,你們能夠前往 Streamsets 官網進行了解。從筆者瞭解的信息,在數據同步方面 Streamsets 十分好用。bash
要實現 mysql 數據的實時同步,首先咱們須要打開其 binlog 模式,具體怎麼操做網上有不少教程,這裏就不進行闡述了。oop
那麼,如今就直接進入正題吧。大數據
Streamsets 能夠直接從官網下載: archives.streamsets.comui
這裏安裝的是 Core Tarball 格式,固然你也能夠直接選擇下載 Full Tarball、Cloudera Parcel 或者其餘格式。下載 Core Tarball 的好處是體積小,後期須要什麼庫的時候能夠自行在 Streamsets Web 頁進行下載。相對於 Core Tarball,Full Tarball 默認幫你下載了不少庫,可是文件體積相對較大(>4G),而且可能不少庫咱們暫時使用不到。spa
或者你能夠直接使用這個連接進行下載:archives.streamsets.com/datacollect…
Streamsets Core Tarball 下載好後,直接解壓就可使用,很是方便。
tar xvzf streamsets-datacollector-core-3.7.1.tgz
cd streamsets-datacollector-3.7.1/bin/
./streamsets dc
複製代碼
Java 1.8 detected; adding $SDC_JAVA8_OPTS of "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Djdk.nio.maxCachedBufferSize=262144" to $SDC_JAVA_OPTS
Configuration of maximum open file limit is too low: 1024 (expected at least 32768). Please consult https://goo.gl/6dmjXd
複製代碼
若是在運行的時候遇到上面的報錯,修改操做系統的 open files 限制數量便可。
#vi /etc/security/limits.conf
複製代碼
添加兩行內容:
運行 'ulimit -n' 既能夠看到 open files 設置值已生效。
Streamsets 擁有一個 Web 頁,默認端口是 18630。瀏覽器中輸入 ip:18630 便可進入 streamsets 的頁面,默認用戶名、密碼都是 admin。
由於須要將 mysql 的數據實時同步到 hbase 中,可是下載的 Core Tarball 中沒有 MySQL Binary Log 以及 hbase 兩個 stage library,因此在 create new pipeline 以前須要先安裝它們。
安裝 MySQL Binary Log 庫
安裝 Hbase 庫,這裏注意一下,hbase 庫位於 CDH 中,因此選擇一個 CDH 版本進行安裝
安裝好後在 Installed Stage Libraries 中就能看到已經安裝了 MySQL Binary Log 和 Hbase
建立一個 MySQL Binary Log
設置 mysql 的鏈接參數(Hostname, Port 以及 Server ID),這裏的 Server ID 與 mysql 配置文件(通常是 /etc/my.cnf)中的 server-id 保持一致
設置 mysql 的用戶名、密碼
其餘設置:咱們在 Include Tables 欄設置了兩張表(表與表之間用逗號隔開),意思是監控這兩張表的數據變化,其餘表不關心。
建立一個 Stream Selector,並將剛剛建立的 MySQL Binary Log 指向這個 Stream Selector。 設置過濾條件, 好比說 ${record:value("/Table")=='cartype'} 就是過濾 cartype 表。
能夠看到 Stream Selector 有兩個出口(1 和 2),後面咱們將會看到: 1 輸出到 Hbase, 2 數據到 Trash
分別建立 Hbase 和 Trash,鏈接到 Stream Selector 上
配置 Hbase
Trash 無需進行配置
點擊右上角的「眼鏡」,驗證整個流程是否有問題。
這裏報錯:"java.lang.RuntimeException:Unable to get driver instance for jdbcUrl"。這個報錯的緣由是缺乏 mysql 鏈接的 jar 包。解決起來也很簡單,下載一個 jar 包而後放到 streamsets 指定的目錄下。我這邊的完整目錄是:/opt/streamsets/streamsets-datacollector-3.7.1/streamsets-libs/streamsets-datacollector-mysql-binlog-lib/lib/mysql-connector-java-5.1.26-bin.jar, mysql-connector-java-5.1.26-bin.jar 就是我下載的 jar 包。
還有一點就是事先要將 Hbase 中相對應的表建立好,否則驗證會提示錯誤。
接着在頁面上重啓 streamsets 便可。
從新驗證,發現成功了。
點擊右上角播放標籤,啓動流程,這樣整個流程就已經完成了(數據已經在進行實時同步),查看各個 Stage 既能夠看到有多少數據流入,多少數據流出。也能夠直接進入 hbase 數據庫中查看是否有數據生成。
以上就是如何使用 Streamsets 實時同步 mysql 數據到 hbase 中的整個操做流程。你們確定發現了,整個流程沒有編寫任何的代碼,相對於 binlog + Canal + 消息隊列 + JAR 的方案是否是高效一些呢。固然任何方案都會有優缺點,Streamsets 這種方案的更多實際體驗還須要更多的觀察。