flume+kafka+spark streaming整合

1.安裝好flume
2.安裝好kafka
3.安裝好spark
4.流程說明:
  日誌文件->flume->kafka->spark streaming
  flume輸入:文件
  flume輸出:kafka的輸入
  kafka輸出:spark 輸入
5.整合步驟:
  (1).將插件jar拷貝到flume的lib目錄下
    a. flumeng-kafka-plugin.jar
    b. metrics-annotation-2.2.0.jar

  (2).將配置文件producer.properties拷貝到flume的conf目錄下
    配置文件內容以下:
      #agentsection
      producer.sources=s
      producer.channels=c
      producer.sinks=rnode

      #sourcesection
      producer.sources.s.type=exec
      producer.sources.s.command=tail -f -n+1 /opt/apache-flume-1.6.0/data/testFlumeKafka.txt
      producer.sources.s.channels=capache

      # Eachsink's type must be defined
      producer.sinks.r.type=org.apache.flume.plugins.KafkaSink
      producer.sinks.r.metadata.broker.list=namenode:19092,datanode1:19092,datanode2:19092
      producer.sinks.r.partition.key=0
      producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
      producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
      producer.sinks.r.request.required.acks=0
      producer.sinks.r.max.message.size=1000000
      producer.sinks.r.producer.type=sync
      producer.sinks.r.custom.encoding=UTF-8
      producer.sinks.r.custom.topic.name=test //需建好對應topicui

      #Specifythe channel the sink should use
      producer.sinks.r.channel=cspa

      # Eachchannel's type is defined.
      producer.channels.c.type=memory
      producer.channels.c.capacity=1000
      producer.channels.c.transactionCapacity=100

    (3).啓動flume-ng
      命令以下:flume-ng agent -c . -f /opt/apache-flume-1.6.0/conf/producer.conf -n producer

    (4).啓動kafka-server
      命令以下:bin/kafka-server-start.sh config/server.properties

    (5).啓動kafka-consumer(默認已經建立了test topic)
      命令以下:bin/kafka-console-consumer.sh --zookeeper namenode:12181,datanode1:12181,datanode2:12181 --topic test --from-beginning

    (6).啓動spark
      命令以下:sbin/start-all.sh

    (7).運行spark streaming Demo
      命令以下:run-example org.apache.spark.examples.streaming.JavaKafkaWordCount namenode:12181 test-consumer-group test 3 >> test.log

    (8).在對應的日誌文件中輸入內容,則能夠在test.log文件看到單詞計數的結果插件

相關文章
相關標籤/搜索