上一篇主要介紹了Confluent的基本概念,若是對Confluent不瞭解的請回看上篇文章。html
爲了保證系統可靠性,真實生產環境中都會以集羣的方式搭建,以免單機宕機形成的影響。本文以3臺機器,MySQL做爲源/目的數據庫來進行數據庫的轉移實驗。java
整個系統的總體結構以下圖所示,由於每一個組件都是獨立提供服務且都能以集羣的方式進行工做,所以本實驗把每一個服務都分別部署到3臺機器來模擬集羣環境。本系統主要用了Zookeeper、Kafka、Kafka-Connect、Schema-Registry 4種服務,總體架構以下: mysql
整個系統工做流程是Source Connector集羣從源MySQL DB中不斷實時讀取變更數據(增/刪/改)再通過Schema-Registry序列化後插入到Kafka消息隊列中,Sink Connector會不斷從Kafka消息隊列中獲取數據再通過反序列化插入到目的MySQL DB中。本文以CentOS操做系統爲實驗環境。sql
下載JDK1.8 64位,解壓到安裝目錄。數據庫
設置java環境變量,在~/.bashrc文件中增長如下信息。json
export PATH=<path-to-java>/bin:${PATH};
export CLASSPATH=.:<path-to-java>/lib/dt.jar: <path-to-java>/lib/tools.jar
複製代碼
執行source~/.bashrc 使之生效。bootstrap
下載Confluent Community版,下載連接爲https://www.confluent.io/download/,解壓到安裝目錄,添加環境變量。bash
export PATH=<path-to-confluent>/bin:${PATH};
export CLASSPATH=<path-to-confluent>/share/java/*:${CLASSPATH}
複製代碼
執行source~/.bashrc 使之生效。服務器
由於實驗數據庫爲MySQL,所以下載MySQL驅動包。架構
下載jar包,而後放到/java/kafka/目錄下面。
編輯/etc/kafka/zookeeper.properites文件,修改如下配置信息。
完整配置信息能夠參看連接:
docs.confluent.io/current/zoo…
tickTime=2000 #時間單元,毫秒單位 dataDir=/var/lib/zookeeper/ #數據存儲路徑 clientPort=2181 #zookeeper 客戶端監聽端口 initLimit=5 #followers 初始化時間 syncLimit=2 #followers 同步時間 maxClientCnxns=0 #最大client鏈接數,值爲0的時候沒有上限 server.< myid >=< hostname >:< leaderport >:< electionport > 集羣配置 server.1=< IP1>:2888:3888 #server1 地址,修改成自身地址 server.2=< IP2>:2888:3888 #server2 地址 server.3=< IP3>:2888:3888 #server3 地址 autopurge.snapRetainCount=3 #最近的快照保存數目 autopurge.purgeInterval=24 #快照自動清除時間間隔
修改好配置文件後,須要在每臺Zookeeper Server的dataDir目錄下建立myid文件來在集羣中做爲
惟一標示。
例如:
server1: echo 1 > myid
server2: echo 2 > myid
server3: echo 3 > myid
複製代碼
集羣中全部配置信息需保持一致,每次修改配置文件,都必須重啓相應機器的服務才能生效。
編輯/etc/kafka/server.properites文件,修改如下配置信息。
完整配置信息能夠參看連接:
docs.confluent.io/current/kaf…
#集羣地址,修改成本身地址 zookeeper.connect=xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181 broker.id=[1,2,3] #節點標識,每臺機器依次改成1,2,3….依次遞增 log.dirs=/xxx/log/kafka #日誌目錄,修改成自身log目錄 listeners=PLAINTEXT://0.0.0.0:9092 #監聽地址 advertised.listeners=PLAINTEXT://0.0.0.0:9092 #發佈到zookeeper供客戶端鏈接的地址 num.partitions=3 #分區數設置,分區間數據無序,分區內數據有序。若須要保證有序,此處設置爲1 default.replication.factor=3 #消息備份數目默認1不作複製。此處改成2,備份一份。 port=9092 #服務端口,默認9092
集羣中除broker.id外全部配置信息需保持一致,每次修改配置文件,都必須重啓相應機器的服務才能生效。
編輯/etc/schema-registry/schema-registry.properites 文件,修改如下配置信息。
完整配置信息能夠參看連接
docs.confluent.io/current/sch…
listeners=http://0.0.0.0:8081 #監聽地址 host.name=172.21.101.186 #主機地址,改成機器自己地址 port #端口號,默認8081 #zookeeper 集羣列表,改成機器相應地址
kafkastore.connection.url=xxx.xxx.xxx.xxx: 2181,xxx.xxx.xxx.xxx: 2181,xxx.xxx.xxx.xxx:2181
集羣中除host.name設置爲自己ip外,全部配置信息均需保持一致,每次修改配置文件,都必須重啓相應機器的服務才能生效。
編輯/etc/schema-registry/connect-avro-distributed.properties.properites 文件,修改如下配置信息。
完整參數參考連接:
docs.confluent.io/current/con…
group.id=connect-cluster #Connect集羣組標示,集羣中此處全部配置必須相同。 kafka節點列表,host1:port1,host2:port2,...修改成相應機器地址 bootstrap.servers=xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181 key.converter=io.confluent.connect.avro.AvroConverter #使用Avro爲key轉化類 key.converter.schema.registry.url=http:// xxx.xxx.xxx.xxx:8081 #key的schema 訪問URL value.converter= io.confluent.connect.avro.AvroConverter #使用Avro爲value 轉化類 value.Converter.schema.registry.url=http:// xxx.xxx.xxx.xxx:8081 #value的schema 訪問URL config.storage.replication.factor=3 #config信息備份因子,不超過集羣數目 offset.storage.replication.factor=3 #偏移量備份因子,不超過集羣數目 status.storage.replication.factor=3 #任務狀態備份因子,不超過集羣數目
集羣中除schema.registry.url設置爲自己ip外,集羣中全部配置信息均需保持一致,每次修改配置文件,都必須重啓相應機器的服務才能生效。
Kafka Connect主要由兩部分組成,分別是Source Connector 和Sink Connector。Source Connector 負責從數據庫中把信息讀入到Kafka,Sink Connector負責把數據從Kafka 中讀到數據庫。實驗使用的是MySql數據庫,所以咱們主要介紹Confluent JDBC Connector。
完整參數參考連接:
docs.confluent.io/current/con…
如下是各個參數說明,能夠根據實際業務場景來配置各個參數。
connection.url #數據庫鏈接地址
connection.user #數據庫用戶名 connection.password #數據庫密碼 table.whitelist #查詢表白名單 connection.attempts #數據庫鏈接次數 numeric.precision.mapping #是否根據進度判斷數據類型 table.blacklist #表黑名單 connection.backoff.ms #數據庫嘗試鏈接間隔時間 schema.pattern #查詢表時使用的schema mode #更新表時候用的模式,主要有如下4種模式. Bulk:每次都所有查詢。 timestamp:根據時間戳字段是否變化來檢測數據是否增長或更新。 incrementing :用自增加字段來檢測數據是否有增長,不能檢測數據變化和刪除。 timestamp+incrementing :根據時間戳和自增加字段來檢查新增更新數據,根據自增加字段來標示惟一的流數據。 incrementing.column.name #用來判斷是否有新增數據的自增加字段名稱,該字段值不容許爲空值。 timestamp.column.name #用來判斷數據是否有新增和更新的時間戳字段,該字段值不容許爲空值 validate.non.null #設置是否檢測數據庫中自增加和時間戳字段不容許爲空值,若是檢查失敗connector就中止啓動。 query #設置數據查詢語句,若是設置了就不進行全表輪詢,而是隻是用此sql語句去提取數據 query.condition #設置自定義查詢條件,會拼接到where語句後面。(非自帶,修改代碼添加此參數) poll.interval.ms #數據輪詢時間間隔,對數據庫執行查詢語句的時間間隔,此參數對數據庫性能有影響 batch.max.rows # connector每次輪詢獲取數據的最大數,默認100條,和connector獲取數據的性能有關 table.poll.interval.ms #檢查表是否有增長或刪除的時間間隔。 topic.prefix #使用普通查詢時候以topic.prefix +表名 做爲topic.自定義query語句時候以topic.prefix做爲topic,此處寫目的庫的表名 table.types #設置要查詢的表類型,默認爲table,還能夠設置view、system table。 timestamp.delay.interval.ms #延遲轉移時間間隔,能夠延遲數據轉移
如下是各個參數說明,根據實際業務場景來配置各個參數。
connection.url #數據庫鏈接URL connection.user #數據庫鏈接用戶名 connection.password #數據庫密碼。 insert.mode # 插入數據模式 insert,upsert,update。本項目使用upsert模式,在沒有相應主鍵數據時候直接插入,不然進行更新操做。 batch.size #每次插入數據的最大數,和數據庫性能有關 topics #訂閱的主題,也是目的數據庫的表名
table.name.format #插入表面格式化,默認爲${topic} pk.mode #主鍵模式。None:不設置主鍵,kafka:使用kafka座標做爲主鍵,record_key: 使用record 的key字段做爲主鍵,record_value:使用record 的value 字段做爲主鍵。 pk.fields #主鍵字段,以逗號分隔。項目中使用目的表的虛擬主鍵或者惟一約束字段。 fields.whitelist #插入字段白名單,若是爲空則全部字段都使用 auto.create #是否自動建立目的表。 auto.evolve #當表結構發生變化時候,是否自動修改目的表 max.retries #失敗重試次數 retry.backoff.ms #重試時間間隔
在集羣環境下,Connector參數只能經過RESTful接口進行參數配置,經過RESTful接口能夠給任意一個服務器發送配置參數,Connector參數信息會自動轉發給集羣中的其它機器。使用RESTful接口配置參數後,Connector服務無需重啓,便可生效。
下面詳細描述Connector RESTful接口的配置參數。
目前 REST接口只支持Json格式參數,所以請求頭應該以下設置。
Accept: application/json
Content-Type: application/json
複製代碼
接口 | 功能 |
---|---|
GET /connectors | 獲取connectors列表 |
POST /connectors | 建立connector |
GET /connectors/(string:name) | 獲取指定connector 信息 |
PUT /connectors/(string:name)/config | 建立或者修改指定connector 配置 |
GET /connectors/(string:name)/status | 獲取指定connector 運行狀態信息 |
POST /connectors/(string:name)/restart | 重啓指定connector |
PUT /connectors/(string:name)/pause | 暫停指定connector |
PUT /connectors/(string:name)/resume | 重啓指定connector |
DELETE /connectors/(string:name)/ | 刪除指定connector |
GET /connectors/(string:name)/tasks | 獲取指定connector的tasks列表信息 |
GET /connectors/(string:name)/tasks/(int:taskid)/status | 獲取指定task狀態 |
POST /connectors/(string:name)/tasks/(int:taskid)/restart | 重啓指定task |
zookeeper-server-start <path-to-confluent>/etc/kafka/zookeeper.properties
複製代碼
kafka-server-start <path-to-confluent>/etc/kafka/server.properties
複製代碼
schema-registry-start <path-to-confluent>/etc/schema-registry/schema-registry.properties
複製代碼
connect-distributed <path-to-confluent>/etc/schema-registry/connect-avro-distributed.properties
複製代碼
使用Postman對Connector的REST API 接口進行訪問配置。示例以下:
Headers Tab頁面增長如下參數。
在集羣中的機器中依次關閉如下服務,關閉時候,須要保證關閉順序的正確性。
confluent connect stop
複製代碼
執行命令後若提示 connect is [DOWN],則表明connect服務關閉成功。
confluent schema-registry stop
複製代碼
執行命令後若提示 schema-registry is [DOWN],則表明schema-registry服務關閉成功。
confluent kafka stop
複製代碼
執行命令後若提示 kafka is [DOWN],則表明upkafka服務關閉成功。
confluent zookeeper stop
複製代碼
執行命令後若提示 zookeeper is [DOWN],則表明zookeeper服務關閉成功。
經過實驗全部數據都能實時進行轉移並無遺漏。
在破壞性測試中如殺掉服務進程、下線集羣中的某臺機器等異常測試中,集羣都能正常的進行轉移工做。
即便把集羣中全部機器的服務都中止,當服務重啓後,Confluent會把宕機期間的數據轉移過來。能夠看出Confluent確實如官方宣傳所言,是一個高性能,高可靠的系統。
到此,本文也基本結束,但願此教程能幫助全部須要的人。
想要了解更多,關注公衆號:七分熟pizza