agent選擇html
agent1 exec source + memory channel + avro sinkshell
agent2 avro source + memory channel apache
模擬實際工做中的場景,agent1 爲A機器,agent2 爲B機器。
avro source: 監聽avro端口,而且接收來自外部avro信息,bootstrap
avro sink:通常用於跨節點傳輸,主要綁定數據移動目的地的ip和portapp
在建立agent2配置文件oop
cd /app/flume/flume/confpost
vi test-avro-memory-kafka.conf測試
avro-memory-kafka.sources = avro-source avro-memory-kafka.sinks = kafka-sink avro-memory-kafka.channels = memory-channel avro-memory-kafka.sources.avro-source.type = avro avro-memory-kafka.sources.avro-source.bind= dblab-VirtualBox avro-memory-kafka.sources.avro-source.port=44444 avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink avro-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers = dblab-VirtualBox:9092 avro-memory-kafka.sinks.kafka-sink.kafka.topic = hello_topic avro-memory-kafka.sinks.kafka-sink.batchSize = 5 avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1 avro-memory-kafka.channels.memory-channel.type = memory avro-memory-kafka.sources.avro-source.channels = memory-channel avro-memory-kafka.sinks.kafka-sink.channel = memory-channel
啓動agent2ui
flume-ng agent --name avro-memory-kafka -c conf -f conf/test-avro-memory-kafka.conf -Dflume.root.logger=INFO,console
這裏必定要等agent2的avro-source啓動成功,已經監聽了本身的44444端口,才能去啓動agent1,否則agent1啓動會被拒絕鏈接spa
建立agent1配置文件
cd /app/flume/flume/conf
vi test-exec-memory-avro.conf
exec-memory-avro.sources = exec-source exec-memory-avro.sinks = avro-sink exec-memory-avro.channels = memory-channel exec-memory-avro.sources.exec-source.type = exec exec-memory-avro.sources.exec-source.command = tail -F /home/hadoop/data/data.log exec-memory-avro.sources.exec-source.shell = /bin/sh -c exec-memory-avro.sinks.avro-sink.type = avro exec-memory-avro.sinks.avro-sink.hostname = dblab-VirtualBox exec-memory-avro.sinks.avro-sink.port = 44444 exec-memory-avro.channels.memory-channel.type = memory exec-memory-avro.sources.exec-source.channels = memory-channel exec-memory-avro.sinks.avro-sink.channel = memory-channel
啓動agent2
flume-ng agent --name exec-memory-avro -c conf -f conf/test-exec-memory-avro.conf -Dflume.root.logger=INFO,console
接下來對Kafka進行配置
先啓動Kafka
$ kafka-server-start.sh $KAFKA_HOME/config/server.properties
建立hello_topic
$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
啓動生產者
kafka-console-producer.sh --broker-list localhost:9092 --topic hello_topic
啓動一個Kafka的客戶端來消費,測試是否啓動成功
kafka-console-consumer.sh --zookeeper localhost:2181 --topic hello_topic
向agent1的exec-source監聽的文件中寫數據
查看Kafka的客戶端是否經過flume消費到數據
至此完成Flume整合Kafka完成實時數據採集