Kafka是一個使用愈來愈廣的消息系統,尤爲是在大數據開發中(實時數據處理和分析)。爲什麼集成其餘系統和解耦應用,常常使用Producer來發送消息到Broker,並使用Consumer來消費Broker中的消息。Kafka Connect是到0.9版本才提供的並極大的簡化了其餘系統與Kafka的集成。Kafka Connect運用用戶快速定義並實現各類Connector(File,Jdbc,Hdfs等),這些功能讓大批量數據導入/導出Kafka很方便。html
如圖中所示,左側的Sources負責從其餘異構系統中讀取數據並導入到Kafka中;右側的Sinks是把Kafka中的數據寫入到其餘的系統中。react
Kafka Connector不少,包括開源和商業版本的。以下列表中是經常使用的開源Connectorgit
Connectors | References |
Jdbc | Source, Sink |
Elastic Search | Sink1, Sink2, Sink3 |
Cassandra | Source1, Source 2, Sink1, Sink2 |
MongoDB | Source |
HBase | Sink |
Syslog | Source |
MQTT (Source) | Source |
Twitter (Source) | Source, Sink |
S3 | Sink1, Sink2 |
商業版的能夠經過Confluent.io得到github
本例演示如何使用Kafka Connect把Source(test.txt)轉爲流數據再寫入到Destination(test.sink.txt)中。以下圖所示:mongodb
本例使用到了兩個Connector:apache
其中的Source使用到的配置文件是${KAFKA_HOME}/config/connect-file-source.propertiesjson
name=local-file-source connector.class=FileStreamSource tasks.max=1 file=test.txt topic=connect-test
其中的Sink使用到的配置文件是${KAFKA_HOME}/config/connect-file-sink.propertiesbootstrap
name=local-file-sink connector.class=FileStreamSink tasks.max=1 file=test.sink.txt topics=connect-test
Broker使用到的配置文件是${KAFKA_HOME}/config/connect-standalone.properties多線程
bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=trueinternal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000
須要熟悉Kafka的一些命令行,參考本系列以前的文章Apache Kafka系列(二) 命令行工具(CLI)併發
3.2.1 啓動Kafka Broker
[root@localhost bin]# cd /opt/kafka_2.11-0.11.0.0/ [root@localhost kafka_2.11-0.11.0.0]# ls bin config libs LICENSE logs NOTICE site-docs [root@localhost kafka_2.11-0.11.0.0]# ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
[root@localhost kafka_2.11-0.11.0.0]# ./bin/kafka-server-start.sh ./config/server.properties &
3.2.2 啓動Source Connector和Sink Connector
[root@localhost kafka_2.11-0.11.0.0]# ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
3.3.3 打開console-consumer
./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic connect-test
3.3.4 寫入到test.txt文件中,並觀察3.3.3中的變化
[root@Server4 kafka_2.12-0.11.0.0]# echo 'firest line' >> test.txt [root@Server4 kafka_2.12-0.11.0.0]# echo 'second line' >> test.txt 3.3.3中打開的窗口輸出以下 {"schema":{"type":"string","optional":false},"payload":"firest line"} {"schema":{"type":"string","optional":false},"payload":"second line"}
3.3.5 查看test.sink.txt
[root@Server4 kafka_2.12-0.11.0.0]# cat test.sink.txt firest line second line
本例僅僅演示了Kafka自帶的File Connector,後續文章會完成JndiConnector,HdfsConnector,而且會使用CDC(Changed Data Capture)集成Kafka來完成一個ETL的例子
PS:
相比編譯過Kafka-Manager都知道各類坑,通過了3個小時的努力,我終於把Kafka-Manager編譯經過並打包了,而且新增了Kafka0.11.0版本支持。
附下載地址: 連接: https://pan.baidu.com/s/1miiMsAk 密碼: 866q