1.下載插件包git
Flume和Kafka插件包下載:https://github.com/beyondj2ee/flumeng-kafka-plugingithub
2.複製jar包apache
複製插件包中的jar包到flume/lib中ui
(刪掉不一樣版本相同jar包,需刪除scala-compiler-z.9.2.jar包,不然flume啓動會出現問題)spa
複製kafka/libs中的jar包到flume/lib中插件
vi /opt/flume/conf/hw.confscala
agent.sources = s1 agent.channels = c1 agent.sinks = k1 agent.sources.s1.type=exec agent.sources.s1.command=tail -F /opt/log/debug.log agent.sources.s1.channels=c1 agent.channels.c1.type=memory agent.channels.c1.capacity=10000 agent.channels.c1.transactionCapacity=100 #設置Kafka接收器 agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink #設置Kafka的broker地址和端口號 agent.sinks.k1.brokerList=127.0.0.1:9092 #設置Kafka的Topic agent.sinks.k1.topic=testKJ1 #設置序列化方式 agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder agent.sinks.k1.channel=c1
bin/zookeeper-server-start.sh config/zookeeper.propertiesdebug
bin/kafka-server-start.sh config/server.properties日誌
bin/kafka-console-consumer.sh -zookeeper localhost:2181 --from-beginning --topic testKJ1code
bin/flume-ng agent -n agent -c conf -f conf/hw.conf -Dflume.root.logger=INFO,console
echo "nihaoa dashagua">>/opt/log/debug.log
####################################################################################
監控目錄
vi /opt/flume/conf/dir.conf
a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type = spooldir a1.sources.s1.channels = c1 a1.sources.s1.spoolDir = /opt/log/ a1.sources.s1.fileHeader = true a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = Dirtopic a1.sinks.k1.brokerList = 127.0.0.1:9092 a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 20 a1.sinks.k1.channel = c1
監控目錄寫入HDFS
vi /opt/flume/conf/dirhdfs.conf
a1.sources = s1 a1.channels = c1 a1.sinks = k2 a1.sources.s1.type = spooldir a1.sources.s1.channels = c1 a1.sources.s1.spoolDir = /opt/log/ a1.sources.s1.fileHeader = true a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000 a1.sinks.k2.topic = Dirtopic a1.sinks.k2.type=hdfs a1.sinks.k2.channel=c1 a1.sinks.k2.hdfs.path= /tmp/dirhdfs a1.sinks.k2.hdfs.filePrefix=events- a1.sinks.k2.hdfs.round=true a1.sinks.k2.hdfs.roundValue=10 a1.sinks.k2.hdfs.roundUnit=minute
監控目錄寫入HDFS(按日分目錄、按小時分文件)
如/user/hue/logput/20170521下,event-20170521.1495378401218.log
a1.sources = s1 a1.channels = c1 a1.sinks = k2 a1.sources.s1.type = spooldir a1.sources.s1.channels = c1 a1.sources.s1.spoolDir = /opt/log/ a1.sources.s1.fileHeader = true a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000 a1.sinks.k2.topic = Dirtopic a1.sinks.k2.type=hdfs a1.sinks.k2.channel=c1 a1.sinks.k2.hdfs.useLocalTimeStamp = true a1.sinks.k2.hdfs.path= /user/hue/logput/%Y%m%d a1.sinks.k2.hdfs.filePrefix=events-%Y%m%d a1.sinks.k2.hdfs.fileSuffix=.log a1.sinks.k2.hdfs.round=true a1.sinks.k2.hdfs.roundValue=10 a1.sinks.k2.hdfs.roundUnit=minute