flume寫kafka topic覆蓋問題fix

結構:
html

nginx-flume->kafka->flume->kafka(由於牽扯到跨機房問題,在兩個kafka之間加了個flume,蛋疼。。)java


現象:nginx

在第二層,寫入kafka的topic和讀取的kafka的topic相同,手動設定的sink topic不生效web


打開debug日誌:apache

source實例化:api

21 Apr 2015 19:24:03,146 INFO [conf-file-poller-0] (org.apache.flume.source.DefaultSourceFactory.create:41) - Creating instance of source kafka1, type org.apache.flume.source.kafka.KafkaSource
21 Apr 2015 19:24:03,146 DEBUG [conf-file-poller-0] (org.apache.flume.source.DefaultSourceFactory.getClass:61)  - Source type org.apache.flume.source.kafka.KafkaSource is a custom type
21 Apr 2015 19:24:03,152 INFO  [conf-file-poller-0] (org.apache.flume.source.kafka.KafkaSourceUtil.getKafkaProperties:37)  - context={ parameters:{topic=bigdata_api_ele_me_access, batchDurationMillis=5000, groupId=nginx, zookeeperConnect=xxx, channels=bigdata_api_ele_me_access-channel4, batchSize=2000, type=org.apache.flume.source.kafka.KafkaSource} }

sink實例化:ide

21 Apr 2015 19:24:03,185 INFO  [conf-file-poller-0] (org.apache.flume.sink.DefaultSinkFactory.create:42)  - Creating instance of sink: web-sink2, type: org.apache.flume.sink.kafka.KafkaSink
21 Apr 2015 19:24:03,185 DEBUG [conf-file-poller-0] (org.apache.flume.sink.DefaultSinkFactory.getClass:63)  - Sink type org.apache.flume.sink.kafka.KafkaSink is a custom type
21 Apr 2015 19:24:03,190 DEBUG [conf-file-poller-0] (org.apache.flume.sink.kafka.KafkaSink.configure:220)  - Using batch size: 2000
21 Apr 2015 19:24:03,190 INFO  [conf-file-poller-0] (org.apache.flume.sink.kafka.KafkaSink.configure:229)  - Using the static topic: nginx-access this may be over-ridden by event headers
21 Apr 2015 19:24:03,191 INFO  [conf-file-poller-0] (org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties:34)  - context={ parameters:{topic=nginx-access, brokerList=1xxx, requiredAcks=1, batchSize=2000, type=org.apache.flume.sink.kafka.KafkaSink, channel=bigdata_api_ele_me_access-channel4} }
21 Apr 2015 19:24:03,191 DEBUG [conf-file-poller-0] (org.apache.flume.sink.kafka.KafkaSink.configure:236)  - Kafka producer properties: {metadata.broker.list=192.168.101.43:9092,192.168.101.44:9092,192.168.101.45:9092, request.required.acks=1, key.serializer.class=kafka.serializer.StringEncoder, serializer.class=kafka.serializer.DefaultEncoder}

能夠看到建立sink和source實例的時候配置上下文context中topic是按設置的來的,可是看到日誌中有下面一段:ui

Using the static topic: nginx-access this may be over-ridden by event headers

分析KafkaSink源碼:this

org.apache.flume.sink.kafka.KafkaSink.process方法中:debug

  public static final String KEY_HDR = "key";
  public static final String TOPIC_HDR = "topic";
  ...
        if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
          eventTopic = topic;
        } //eventTopic的取值,會從header中獲取,若是header中沒有才會使用配置的topic
        ...
        eventKey = headers.get(KEY_HDR);
        ...
        KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>
          (eventTopic, eventKey, eventBody);
        messageList.add(data);

其中topic的取值在configure中:

    topic = context.getString(KafkaSinkConstants.TOPIC,
      KafkaSinkConstants.DEFAULT_TOPIC); //經過flume的配置獲取topic,若是沒有設置topic按默認default-flume-topic處理
    if (topic.equals(KafkaSinkConstants.DEFAULT_TOPIC)) {
      logger.warn("The Property 'topic' is not set. " +
        "Using the default topic name: " +
        KafkaSinkConstants.DEFAULT_TOPIC);
    } else {
      logger.info("Using the static topic: " + topic +
        " this may be over-ridden by event headers"); //這裏提示可能會被header覆蓋
    }

header的來源:

1)kafka中的數據是沒有header的概念的

2)flume中的消息分header/body概念

這種結構下,數據由kafkasource進入flume,添加了header信息,而後流入到kafkasink

kafkasource中header的添加處理在org.apache.flume.source.kafka.KafkaSource.process方法中:

        if (iterStatus) {
          // get next message
          MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
          kafkaMessage = messageAndMetadata.message();
          kafkaKey = messageAndMetadata.key();
          // Add headers to event (topic, timestamp, and key)
          headers = new HashMap<String, String>();
          headers.put(KafkaSourceConstants.TIMESTAMP,
                  String.valueOf(System.currentTimeMillis()));
          headers.put(KafkaSourceConstants.TOPIC, topic);

由於kafka中不須要header,註釋掉org.apache.flume.sink.kafka.KafkaSink.process中這幾段代碼便可:

        /*
        if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
          eventTopic = topic;
        }
        */
        eventTopic = topic; //增長這一段,不然會有npe錯誤
相關文章
相關標籤/搜索