1.安裝好flume
2.安裝好kafka
3.安裝好spark
4.流程說明:
日誌文件->flume->kafka->spark streaming
flume輸入:文件
flume輸出:kafka的輸入
kafka輸出:spark 輸入
5.整合步驟:
(1).將插件jar拷貝到flume的lib目錄下
a. flumeng-kafka-plugin.jar
b. metrics-annotation-2.2.0.jar
(2).將配置文件producer.properties拷貝到flume的conf目錄下
配置文件內容以下:
#agentsection
producer.sources=s
producer.channels=c
producer.sinks=rnode
#sourcesection
producer.sources.s.type=exec
producer.sources.s.command=tail -f -n+1 /opt/apache-flume-1.6.0/data/testFlumeKafka.txt
producer.sources.s.channels=capache
# Eachsink's type must be defined
producer.sinks.r.type=org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=namenode:19092,datanode1:19092,datanode2:19092
producer.sinks.r.partition.key=0
producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=0
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
producer.sinks.r.custom.topic.name=test //需建好對應topicui
#Specifythe channel the sink should use
producer.sinks.r.channel=cspa
# Eachchannel's type is defined.
producer.channels.c.type=memory
producer.channels.c.capacity=1000
producer.channels.c.transactionCapacity=100
(3).啓動flume-ng
命令以下:flume-ng agent -c . -f /opt/apache-flume-1.6.0/conf/producer.conf -n producer
(4).啓動kafka-server
命令以下:bin/kafka-server-start.sh config/server.properties
(5).啓動kafka-consumer(默認已經建立了test topic)
命令以下:bin/kafka-console-consumer.sh --zookeeper namenode:12181,datanode1:12181,datanode2:12181 --topic test --from-beginning
(6).啓動spark
命令以下:sbin/start-all.sh
(7).運行spark streaming Demo
命令以下:run-example org.apache.spark.examples.streaming.JavaKafkaWordCount namenode:12181 test-consumer-group test 3 >> test.log
(8).在對應的日誌文件中輸入內容,則能夠在test.log文件看到單詞計數的結果插件