數據庫實時轉移之Confluent環境搭建(二)

1.前言

上一篇主要介紹了Confluent的基本概念,若是對Confluent不瞭解的請回看上篇文章。html

2.系統架構

爲了保證系統可靠性,真實生產環境中都會以集羣的方式搭建,以免單機宕機形成的影響。本文以3臺機器,MySQL做爲源/目的數據庫來進行數據庫的轉移實驗。java

整個系統的總體結構以下圖所示,由於每一個組件都是獨立提供服務且都能以集羣的方式進行工做,所以本實驗把每一個服務都分別部署到3臺機器來模擬集羣環境。本系統主要用了Zookeeper、Kafka、Kafka-Connect、Schema-Registry 4種服務,總體架構以下: mysql

在這裏插入圖片描述
整個系統工做流程是Source Connector集羣從源MySQL DB中不斷實時讀取變更數據(增/刪/改)再通過Schema-Registry序列化後插入到Kafka消息隊列中,Sink Connector會不斷從Kafka消息隊列中獲取數據再通過反序列化插入到目的MySQL DB中。

3.Confluent 安裝

本文以CentOS操做系統爲實驗環境。sql

3.1 安裝JDK1.8

下載JDK1.8 64位,解壓到安裝目錄。數據庫

設置java環境變量,在~/.bashrc文件中增長如下信息。json

export PATH=<path-to-java>/bin:${PATH}; 
 export CLASSPATH=.:<path-to-java>/lib/dt.jar: <path-to-java>/lib/tools.jar
複製代碼

執行source~/.bashrc 使之生效。bootstrap

3.2 Confluent 安裝

下載Confluent Community版,下載連接爲https://www.confluent.io/download/,解壓到安裝目錄,添加環境變量。bash

export PATH=<path-to-confluent>/bin:${PATH};
export CLASSPATH=<path-to-confluent>/share/java/*:${CLASSPATH}
複製代碼

執行source~/.bashrc 使之生效。服務器

3.3 安裝mysql-connector-jdbc.jar

由於實驗數據庫爲MySQL,所以下載MySQL驅動包。架構

下載jar包,而後放到/java/kafka/目錄下面。

4.服務配置

4.1 Zookeeper 配置

編輯/etc/kafka/zookeeper.properites文件,修改如下配置信息。

完整配置信息能夠參看連接:

docs.confluent.io/current/zoo…

tickTime=2000 #時間單元,毫秒單位 dataDir=/var/lib/zookeeper/ #數據存儲路徑 clientPort=2181 #zookeeper 客戶端監聽端口 initLimit=5 #followers 初始化時間 syncLimit=2 #followers 同步時間 maxClientCnxns=0 #最大client鏈接數,值爲0的時候沒有上限 server.< myid >=< hostname >:< leaderport >:< electionport > 集羣配置 server.1=< IP1>:2888:3888 #server1 地址,修改成自身地址 server.2=< IP2>:2888:3888 #server2 地址 server.3=< IP3>:2888:3888 #server3 地址 autopurge.snapRetainCount=3 #最近的快照保存數目 autopurge.purgeInterval=24 #快照自動清除時間間隔

修改好配置文件後,須要在每臺Zookeeper Server的dataDir目錄下建立myid文件來在集羣中做爲

惟一標示。

例如:

server1:  echo 1 > myid
server2:  echo 2 > myid
server3:  echo 3 > myid
複製代碼

集羣中全部配置信息需保持一致,每次修改配置文件,都必須重啓相應機器的服務才能生效。

4.2 Kafka 配置

編輯/etc/kafka/server.properites文件,修改如下配置信息。

完整配置信息能夠參看連接:

docs.confluent.io/current/kaf…

#集羣地址,修改成本身地址 zookeeper.connect=xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181 broker.id=[1,2,3] #節點標識,每臺機器依次改成1,2,3….依次遞增 log.dirs=/xxx/log/kafka #日誌目錄,修改成自身log目錄 listeners=PLAINTEXT://0.0.0.0:9092 #監聽地址 advertised.listeners=PLAINTEXT://0.0.0.0:9092 #發佈到zookeeper供客戶端鏈接的地址 num.partitions=3 #分區數設置,分區間數據無序,分區內數據有序。若須要保證有序,此處設置爲1 default.replication.factor=3 #消息備份數目默認1不作複製。此處改成2,備份一份。 port=9092 #服務端口,默認9092

集羣中除broker.id外全部配置信息需保持一致,每次修改配置文件,都必須重啓相應機器的服務才能生效。

4.3 Schema Registry 配置

編輯/etc/schema-registry/schema-registry.properites 文件,修改如下配置信息。

完整配置信息能夠參看連接

docs.confluent.io/current/sch…

listeners=http://0.0.0.0:8081 #監聽地址 host.name=172.21.101.186 #主機地址,改成機器自己地址 port #端口號,默認8081 #zookeeper 集羣列表,改成機器相應地址
kafkastore.connection.url=xxx.xxx.xxx.xxx: 2181,xxx.xxx.xxx.xxx: 2181,xxx.xxx.xxx.xxx:2181

集羣中除host.name設置爲自己ip外,全部配置信息均需保持一致,每次修改配置文件,都必須重啓相應機器的服務才能生效。

4.4 Kafka-Connect

4.4.1 Connect 配置

編輯/etc/schema-registry/connect-avro-distributed.properties.properites 文件,修改如下配置信息。

完整參數參考連接:

docs.confluent.io/current/con…

group.id=connect-cluster #Connect集羣組標示,集羣中此處全部配置必須相同。 kafka節點列表,host1:port1,host2:port2,...修改成相應機器地址 bootstrap.servers=xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181 key.converter=io.confluent.connect.avro.AvroConverter #使用Avro爲key轉化類 key.converter.schema.registry.url=http:// xxx.xxx.xxx.xxx:8081 #key的schema 訪問URL value.converter= io.confluent.connect.avro.AvroConverter #使用Avro爲value 轉化類 value.Converter.schema.registry.url=http:// xxx.xxx.xxx.xxx:8081 #value的schema 訪問URL config.storage.replication.factor=3 #config信息備份因子,不超過集羣數目 offset.storage.replication.factor=3 #偏移量備份因子,不超過集羣數目 status.storage.replication.factor=3 #任務狀態備份因子,不超過集羣數目

集羣中除schema.registry.url設置爲自己ip外,集羣中全部配置信息均需保持一致,每次修改配置文件,都必須重啓相應機器的服務才能生效。

4.4.2 Connect-JDBC

Kafka Connect主要由兩部分組成,分別是Source Connector 和Sink Connector。Source Connector 負責從數據庫中把信息讀入到Kafka,Sink Connector負責把數據從Kafka 中讀到數據庫。實驗使用的是MySql數據庫,所以咱們主要介紹Confluent JDBC Connector。

完整參數參考連接:

docs.confluent.io/current/con…

4.4.2.1 JDBC Source Connector

如下是各個參數說明,能夠根據實際業務場景來配置各個參數。

connection.url #數據庫鏈接地址

connection.user #數據庫用戶名 connection.password #數據庫密碼 table.whitelist #查詢表白名單 connection.attempts #數據庫鏈接次數 numeric.precision.mapping #是否根據進度判斷數據類型 table.blacklist #表黑名單 connection.backoff.ms #數據庫嘗試鏈接間隔時間 schema.pattern #查詢表時使用的schema mode #更新表時候用的模式,主要有如下4種模式. Bulk:每次都所有查詢。 timestamp:根據時間戳字段是否變化來檢測數據是否增長或更新。 incrementing :用自增加字段來檢測數據是否有增長,不能檢測數據變化和刪除。 timestamp+incrementing :根據時間戳和自增加字段來檢查新增更新數據,根據自增加字段來標示惟一的流數據。 incrementing.column.name #用來判斷是否有新增數據的自增加字段名稱,該字段值不容許爲空值。 timestamp.column.name #用來判斷數據是否有新增和更新的時間戳字段,該字段值不容許爲空值 validate.non.null #設置是否檢測數據庫中自增加和時間戳字段不容許爲空值,若是檢查失敗connector就中止啓動。 query #設置數據查詢語句,若是設置了就不進行全表輪詢,而是隻是用此sql語句去提取數據 query.condition #設置自定義查詢條件,會拼接到where語句後面。(非自帶,修改代碼添加此參數) poll.interval.ms #數據輪詢時間間隔,對數據庫執行查詢語句的時間間隔,此參數對數據庫性能有影響 batch.max.rows # connector每次輪詢獲取數據的最大數,默認100條,和connector獲取數據的性能有關 table.poll.interval.ms #檢查表是否有增長或刪除的時間間隔。 topic.prefix #使用普通查詢時候以topic.prefix +表名 做爲topic.自定義query語句時候以topic.prefix做爲topic,此處寫目的庫的表名 table.types #設置要查詢的表類型,默認爲table,還能夠設置view、system table。 timestamp.delay.interval.ms #延遲轉移時間間隔,能夠延遲數據轉移

4.4.2.2 JDBC Sink Connector

如下是各個參數說明,根據實際業務場景來配置各個參數。

connection.url #數據庫鏈接URL connection.user #數據庫鏈接用戶名 connection.password #數據庫密碼。 insert.mode # 插入數據模式 insert,upsert,update。本項目使用upsert模式,在沒有相應主鍵數據時候直接插入,不然進行更新操做。 batch.size #每次插入數據的最大數,和數據庫性能有關 topics #訂閱的主題,也是目的數據庫的表名
table.name.format #插入表面格式化,默認爲${topic} pk.mode #主鍵模式。None:不設置主鍵,kafka:使用kafka座標做爲主鍵,record_key: 使用record 的key字段做爲主鍵,record_value:使用record 的value 字段做爲主鍵。 pk.fields #主鍵字段,以逗號分隔。項目中使用目的表的虛擬主鍵或者惟一約束字段。 fields.whitelist #插入字段白名單,若是爲空則全部字段都使用 auto.create #是否自動建立目的表。 auto.evolve #當表結構發生變化時候,是否自動修改目的表 max.retries #失敗重試次數 retry.backoff.ms #重試時間間隔

4.4.3 Connector REST API

在集羣環境下,Connector參數只能經過RESTful接口進行參數配置,經過RESTful接口能夠給任意一個服務器發送配置參數,Connector參數信息會自動轉發給集羣中的其它機器。使用RESTful接口配置參數後,Connector服務無需重啓,便可生效。

下面詳細描述Connector RESTful接口的配置參數。

4.3.3.1 RESTful Header

目前 REST接口只支持Json格式參數,所以請求頭應該以下設置。

Accept: application/json
Content-Type: application/json 
複製代碼
4.3.3.2 RESTful URL
接口 功能
GET /connectors 獲取connectors列表
POST /connectors 建立connector
GET /connectors/(string:name) 獲取指定connector 信息
PUT /connectors/(string:name)/config 建立或者修改指定connector 配置
GET /connectors/(string:name)/status 獲取指定connector 運行狀態信息
POST /connectors/(string:name)/restart 重啓指定connector
PUT /connectors/(string:name)/pause 暫停指定connector
PUT /connectors/(string:name)/resume 重啓指定connector
DELETE /connectors/(string:name)/ 刪除指定connector
GET /connectors/(string:name)/tasks 獲取指定connector的tasks列表信息
GET /connectors/(string:name)/tasks/(int:taskid)/status 獲取指定task狀態
POST /connectors/(string:name)/tasks/(int:taskid)/restart 重啓指定task

5. 服務啓動

5.1 啓動Zookeeper

zookeeper-server-start  <path-to-confluent>/etc/kafka/zookeeper.properties 
複製代碼

5.2 啓動 Kafka

kafka-server-start      <path-to-confluent>/etc/kafka/server.properties
複製代碼

5.3 啓動Schema-registry-start

schema-registry-start   <path-to-confluent>/etc/schema-registry/schema-registry.properties
複製代碼

5.4 啓動Connector

connect-distributed     <path-to-confluent>/etc/schema-registry/connect-avro-distributed.properties
複製代碼

6. 啓動Connector任務

使用Postman對Connector的REST API 接口進行訪問配置。示例以下:

6.1 Header 設置

Headers Tab頁面增長如下參數。

在這裏插入圖片描述

6.2 配置Source Connector

在這裏插入圖片描述
發送請求後便可完成任務的配置。

6.3 配置Sink Connector

在這裏插入圖片描述
配置完畢點擊Send按鈕,即完成了Source、Sink Connector任務的配置和啓動。此時若向Source 數據庫添加或更改數據,目的數據庫會實時更新過來。

7.服務中止

在集羣中的機器中依次關閉如下服務,關閉時候,須要保證關閉順序的正確性。

7.1 關閉Connect-jdbc

confluent connect stop
複製代碼

執行命令後若提示 connect is [DOWN],則表明connect服務關閉成功。

7.2 關閉Schema-registry

confluent schema-registry stop
複製代碼

執行命令後若提示 schema-registry is [DOWN],則表明schema-registry服務關閉成功。

7.3 關閉Kafka

confluent kafka stop
複製代碼

執行命令後若提示 kafka is [DOWN],則表明upkafka服務關閉成功。

7.4 關閉Zookeeper

confluent zookeeper stop
複製代碼

執行命令後若提示 zookeeper is [DOWN],則表明zookeeper服務關閉成功。

8. 小結

經過實驗全部數據都能實時進行轉移並無遺漏。

在破壞性測試中如殺掉服務進程、下線集羣中的某臺機器等異常測試中,集羣都能正常的進行轉移工做。

即便把集羣中全部機器的服務都中止,當服務重啓後,Confluent會把宕機期間的數據轉移過來。能夠看出Confluent確實如官方宣傳所言,是一個高性能,高可靠的系統。

到此,本文也基本結束,但願此教程能幫助全部須要的人。

想要了解更多,關注公衆號:七分熟pizza

在這裏插入圖片描述
相關文章
相關標籤/搜索