Apache Kafka系列(五) Kafka Connect及FileConnector示例

一. Kafka Connect簡介

  Kafka是一個使用愈來愈廣的消息系統,尤爲是在大數據開發中(實時數據處理和分析)。爲什麼集成其餘系統和解耦應用,常常使用Producer來發送消息到Broker,並使用Consumer來消費Broker中的消息。Kafka Connect是到0.9版本才提供的並極大的簡化了其餘系統與Kafka的集成。Kafka Connect運用用戶快速定義並實現各類Connector(File,Jdbc,Hdfs等),這些功能讓大批量數據導入/導出Kafka很方便。html

             

如圖中所示,左側的Sources負責從其餘異構系統中讀取數據並導入到Kafka中;右側的Sinks是把Kafka中的數據寫入到其餘的系統中。react

二. 各類Kafka Connector

  Kafka Connector不少,包括開源和商業版本的。以下列表中是經常使用的開源Connectorgit

Connectors References
Jdbc Source, Sink
Elastic Search Sink1, Sink2, Sink3
Cassandra Source1, Source 2, Sink1, Sink2 
MongoDB Source
HBase Sink
Syslog Source
MQTT (Source) Source
Twitter (Source) Source, Sink
S3 Sink1, Sink2

  商業版的能夠經過Confluent.io得到github

三. 示例

3.1 FileConnector Demo

 本例演示如何使用Kafka Connect把Source(test.txt)轉爲流數據再寫入到Destination(test.sink.txt)中。以下圖所示:mongodb

          

      本例使用到了兩個Connector:apache

  • FileStreamSource:從test.txt中讀取併發布到Broker中
  • FileStreamSink:從Broker中讀取數據並寫入到test.sink.txt文件中

  其中的Source使用到的配置文件是${KAFKA_HOME}/config/connect-file-source.propertiesjson

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test

  其中的Sink使用到的配置文件是${KAFKA_HOME}/config/connect-file-sink.propertiesbootstrap

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test

  Broker使用到的配置文件是${KAFKA_HOME}/config/connect-standalone.properties多線程

bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=trueinternal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

 

3.2 運行Demo

  須要熟悉Kafka的一些命令行,參考本系列以前的文章Apache Kafka系列(二) 命令行工具(CLI)併發

 3.2.1 啓動Kafka Broker

[root@localhost bin]# cd /opt/kafka_2.11-0.11.0.0/
[root@localhost kafka_2.11-0.11.0.0]# ls
bin  config  libs  LICENSE  logs  NOTICE  site-docs
[root@localhost kafka_2.11-0.11.0.0]# ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
[root@localhost kafka_2.11-0.11.0.0]# ./bin/kafka-server-start.sh ./config/server.properties &

3.2.2 啓動Source Connector和Sink Connector

[root@localhost kafka_2.11-0.11.0.0]# ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties 

3.3.3 打開console-consumer

./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic connect-test

3.3.4 寫入到test.txt文件中,並觀察3.3.3中的變化

[root@Server4 kafka_2.12-0.11.0.0]# echo 'firest line' >> test.txt
[root@Server4 kafka_2.12-0.11.0.0]# echo 'second line' >> test.txt
3.3.3中打開的窗口輸出以下
{"schema":{"type":"string","optional":false},"payload":"firest line"}
{"schema":{"type":"string","optional":false},"payload":"second line"}

3.3.5 查看test.sink.txt

[root@Server4 kafka_2.12-0.11.0.0]# cat test.sink.txt 
firest line
second line

 

四. 結論

本例僅僅演示了Kafka自帶的File Connector,後續文章會完成JndiConnector,HdfsConnector,而且會使用CDC(Changed Data Capture)集成Kafka來完成一個ETL的例子

 PS:

相比編譯過Kafka-Manager都知道各類坑,通過了3個小時的努力,我終於把Kafka-Manager編譯經過並打包了,而且新增了Kafka0.11.0版本支持。

附下載地址: 連接: https://pan.baidu.com/s/1miiMsAk 密碼: 866q

相關文章
相關標籤/搜索