kafka+storm結合存在的一些問題與解決方法

  在配置kafka和storm的時候, 常常的會出現一些問題, 主要在如下幾個:html

  1.  打jar包上去storm集羣的時候會出現jar包衝突,相似於log4j或者sf4j的報錯信息.java

  2. kafka本地Java生產者和消費者沒法消費數據express

  3. kafkaSpout的declareFields究竟是什麼apache

  下面咱們結合kafka_2.11-0.10.1.0 + apache-storm-1.1.0來詳細的說明這三個問題.api

  1.  打jar包上去storm集羣的時候會出現jar包衝突,相似於log4j或者sf4j的報錯信息.

SLF4J: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError.
SLF4J: See also http: //www.slf4j.org/codes.html#log4jDelegationLoop for more details.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
5370 [Thread- 14 -newKafka] ERROR backtype.storm.util - Async loop died!
java.lang.NoClassDefFoundError: Could not initialize class org.apache.log4j.Log4jLoggerFactory
     at org.apache.log4j.Logger.getLogger(Logger.java: 39 ) ~[log4j-over-slf4j- 1.6 . 6 .jar: 1.6 . 6 ]
     at kafka.utils.Logging$ class .logger(Logging.scala: 24 ) ~[kafka_2. 10 - 0.8 . 2.1 .jar:na]
     at kafka.consumer.SimpleConsumer.logger$lzycompute(SimpleConsumer.scala: 30 ) ~[kafka_2. 10 - 0.8 . 2.1 .jar:na]
     at kafka.consumer.SimpleConsumer.logger(SimpleConsumer.scala: 30 ) ~[kafka_2. 10 - 0.8 . 2.1 .jar:na]
     at kafka.utils.Logging$ class .info(Logging.scala: 67 ) ~[kafka_2. 10 - 0.8 . 2.1 .jar:na]
     at kafka.consumer.SimpleConsumer.info(SimpleConsumer.scala: 30 ) ~[kafka_2. 10 - 0.8 . 2.1 .jar:na]
     at kafka.consumer.SimpleConsumer.liftedTree1$ 1 (SimpleConsumer.scala: 74 ) ~[kafka_2. 10 - 0.8 . 2.1 .jar:na]
     at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala: 68 ) ~[kafka_2. 10 - 0.8 . 2.1 .jar:na]
     at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala: 127 ) ~[kafka_2. 10 - 0.8 . 2.1 .jar:na]
     at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala: 79 ) ~[kafka_2. 10 - 0.8 . 2.1 .jar:na]
     at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java: 77 ) ~[storm-kafka- 0.9 . 3 .jar: 0.9 . 3 ]
     at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java: 67 ) ~[storm-kafka- 0.9 . 3 .jar: 0.9 . 3 ]
     at storm.kafka.PartitionManager.<init>(PartitionManager.java: 83 ) ~[storm-kafka- 0.9 . 3 .jar: 0.9 . 3 ]
     at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java: 98 ) ~[storm-kafka- 0.9 . 3 .jar: 0.9 . 3 ]
     at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java: 69 ) ~[storm-kafka- 0.9 . 3 .jar: 0.9 . 3 ]
     at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java: 135 ) ~[storm-kafka- 0.9 . 3 .jar: 0.9 . 3 ]
     at backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj: 565 ) ~[storm-core- 0.9 . 3 .jar: 0.9 . 3 ]
     at backtype.storm.util$async_loop$fn__464.invoke(util.clj: 463 ) ~[storm-core- 0.9 . 3 .jar: 0.9 . 3 ]
     at clojure.lang.AFn.run(AFn.java: 24 ) [clojure- 1.5 . 1 .jar:na]
     at java.lang.Thread.run(Thread.java: 744 ) [na: 1.7 .0_45]

  緣由:KafkaSpout 代碼裏(storm.kafka.KafkaSpout)使用了 slf4j 的包,而 Kafka 系統自己(kafka.consumer.SimpleConsumer)卻使用了 apache 的包.服務器

  解決辦法:在依賴定義中去除問題依賴包app

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2. 10 </artifactId>
      <version> 0.10 . 1.1 </version>
      <exclusions>
        <exclusion>
          <groupId>org.apache.zookeeper</groupId>
          <artifactId>zookeeper</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
        <exclusion>
          <groupId>log4j</groupId>
          <artifactId>log4j</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

  還有相似於zk和kafkaClient包衝突的狀況:less

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
7630 [Thread- 16 -spout-executor[ 3 3 ]] INFO  o.a.s.k.PartitionManager - Read partition information from: /test-topic/ 04680174 -656f-41ad-ad6f-2976d28b2d24/partition_0  --> null
7663 [Thread- 16 -spout-executor[ 3 3 ]] INFO  k.c.SimpleConsumer - Reconnect due to error:
java.lang.NoSuchMethodError: org.apache.kafka.common.network.NetworkSend.<init>(Ljava/lang/String;[Ljava/nio/ByteBuffer;)V
         at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala: 41 ) ~[kafka_2. 11 - 0.10 . 0.1 .jar:?]
         at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala: 44 ) ~[kafka_2. 11 - 0.10 . 0.1 .jar:?]
         at kafka.network.BlockingChannel.send(BlockingChannel.scala: 112 ) ~[kafka_2. 11 - 0.10 . 0.1 .jar:?]
         at kafka.consumer.SimpleConsumer.liftedTree1$ 1 (SimpleConsumer.scala: 85 ) [kafka_2. 11 - 0.10 . 0.1 .jar:?]
         at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala: 83 ) [kafka_2. 11 - 0.10 . 0.1 .jar:?]
         at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala: 149 ) [kafka_2. 11 - 0.10 . 0.1 .jar:?]
         at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala: 79 ) [kafka_2. 11 - 0.10 . 0.1 .jar:?]
         at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java: 75 ) [storm-kafka- 1.0 . 2 .jar: 1.0 . 2 ]
         at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java: 65 ) [storm-kafka- 1.0 . 2 .jar: 1.0 . 2 ]
         at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java: 103 ) [storm-kafka- 1.0 . 2 .jar: 1.0 . 2 ]
         at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java: 98 ) [storm-kafka- 1.0 . 2 .jar: 1.0 . 2 ]
         at org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java: 69 ) [storm-kafka- 1.0 . 2 .jar: 1.0 . 2 ]
         at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java: 129 ) [storm-kafka- 1.0 . 2 .jar: 1.0 . 2 ]
         at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj: 648 ) [storm-core- 1.0 . 2 .jar: 1.0 . 2 ]
         at org.apache.storm.util$async_loop$fn__624.invoke(util.clj: 484 ) [storm-core- 1.0 . 2 .jar: 1.0 . 2 ]
         at clojure.lang.AFn.run(AFn.java: 22 ) [clojure- 1.7 . 0 .jar:?]
         at java.lang.Thread.run(Unknown Source) [?: 1.8 .0_111]
7672 [Thread- 16 -spout-executor[ 3 3 ]] ERROR o.a.s.util - Async loop died!
java.lang.NoSuchMethodError: org.apache.kafka.common.network.NetworkSend.<init>(Ljava/lang/String;[Ljava/nio/ByteBuffer;)V
         at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala: 41 ) ~[kafka_2. 11 - 0.10 . 0.1 .jar:?]
         at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala: 44 ) ~[kafka_2. 11 - 0.10 . 0.1 .jar:?]
         at kafka.network.BlockingChannel.send(BlockingChannel.scala: 112 ) ~[kafka_2. 11 - 0.10 . 0.1 .jar:?]
         at kafka.consumer.SimpleConsumer.liftedTree1$ 1 (SimpleConsumer.scala: 98 ) ~[kafka_2. 11 - 0.10 . 0.1 .jar:?]
         at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala: 83 ) ~[kafka_2. 11 - 0.10 . 0.1 .jar:?]
         at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala: 149 ) ~[kafka_2. 11 - 0.10 . 0.1 .jar:?]
         at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala: 79 ) ~[kafka_2. 11 - 0.10 . 0.1 .jar:?]
         at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java: 75 ) ~[storm-kafka- 1.0 . 2 .jar: 1.0 . 2 ]
         at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java: 65 ) ~[storm-kafka- 1.0 . 2 .jar: 1.0 . 2 ]
         at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java: 103 ) ~[storm-kafka- 1.0 . 2 .jar: 1.0 . 2 ]
         at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java: 98 ) ~[storm-kafka- 1.0 . 2 .jar: 1.0 . 2 ]
         at org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java: 69 ) ~[storm-kafka- 1.0 . 2 .jar: 1.0 . 2 ]
         at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java: 129 ) ~[storm-kafka- 1.0 . 2 .jar: 1.0 . 2 ]
         at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj: 648 ) ~[storm-core- 1.0 . 2 .jar: 1.0 . 2 ]
         at org.apache.storm.util$async_loop$fn__624.invoke(util.clj: 484 ) [storm-core- 1.0 . 2 .jar: 1.0 . 2 ]
         at clojure.lang.AFn.run(AFn.java: 22 ) [clojure- 1.7 . 0 .jar:?]
         at java.lang.Thread.run(Unknown Source) [?: 1.8 .0_111]
7673 [Thread- 16 -spout-executor[ 3 3 ]] ERROR o.a.s.d.executor -
java.lang.NoSuchMethodError: org.apache.kafka.common.network.NetworkSend.<init>(Ljava/lang/String;[Ljava/nio/ByteBuffer;)V
         at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala: 41 ) ~[kafka_2. 11 - 0.10 . 0.1 .jar:?]
         at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala: 44 ) ~[kafka_2. 11 - 0.10 . 0.1 .jar:?]
         at kafka.network.BlockingChannel.send(BlockingChannel.scala: 112 ) ~[kafka_2. 11 - 0.10 . 0.1 .jar:?]
         at kafka.consumer.SimpleConsumer.liftedTree1$ 1 (SimpleConsumer.scala: 98 ) ~[kafka_2. 11 - 0.10 . 0.1 .jar:?]
         at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala: 83 ) ~[kafka_2. 11 - 0.10 . 0.1 .jar:?]
         at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala: 149 ) ~[kafka_2. 11 - 0.10 . 0.1 .jar:?]
         at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala: 79 ) ~[kafka_2. 11 - 0.10 . 0.1 .jar:?]
         at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java: 75 ) ~[storm-kafka- 1.0 . 2 .jar: 1.0 . 2 ]
         at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java: 65 ) ~[storm-kafka- 1.0 . 2 .jar: 1.0 . 2 ]
         at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java: 103 ) ~[storm-kafka- 1.0 . 2 .jar: 1.0 . 2 ]
         at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java: 98 ) ~[storm-kafka- 1.0 . 2 .jar: 1.0 . 2 ]
         at org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java: 69 ) ~[storm-kafka- 1.0 . 2 .jar: 1.0 . 2 ]
         at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java: 129 ) ~[storm-kafka- 1.0 . 2 .jar: 1.0 . 2 ]
         at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj: 648 ) ~[storm-core- 1.0 . 2 .jar: 1.0 . 2 ]
         at org.apache.storm.util$async_loop$fn__624.invoke(util.clj: 484 ) [storm-core- 1.0 . 2 .jar: 1.0 . 2 ]
         at clojure.lang.AFn.run(AFn.java: 22 ) [clojure- 1.7 . 0 .jar:?]
         at java.lang.Thread.run(Unknown Source) [?: 1.8 .0_111]
7694 [Thread- 16 -spout-executor[ 3 3 ]] ERROR o.a.s.util - Halting process: ( "Worker died" )
java.lang.RuntimeException: ( "Worker died" )
         at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj: 341 ) [storm-core- 1.0 . 2 .jar: 1.0 . 2 ]
         at clojure.lang.RestFn.invoke(RestFn.java: 423 ) [clojure- 1.7 . 0 .jar:?]
         at org.apache.storm.daemon.worker$fn__8659$fn__8660.invoke(worker.clj: 761 ) [storm-core- 1.0 . 2 .jar: 1.0 . 2 ]
         at org.apache.storm.daemon.executor$mk_executor_data$fn__7875$fn__7876.invoke(executor.clj: 274 ) [storm-core- 1.0 . 2 .jar: 1.0 . 2 ]
         at org.apache.storm.util$async_loop$fn__624.invoke(util.clj: 494 ) [storm-core- 1.0 . 2 .jar: 1.0 . 2 ]
         at clojure.lang.AFn.run(AFn.java: 22 ) [clojure- 1.7 . 0 .jar:?]
         at java.lang.Thread.run(Unknown Source) [?: 1.8 .0_111]

  緣由:org.apache.kafka.common.network.NetworkSend 是一個Kafka客戶端庫,kafka 0.9之前,首先初始化這個類,pom.xml中未顯示的聲明Kafka-clients,故致使錯誤。eclipse

    解決辦法:加入Kafka-clients依賴.請參照以上的解決方法, 能夠用eclipse去找衝突的包.socket

2. kafka本地Java生產者和消費者沒法消費數據

  這個問題必定要強調一下, 由於以前踩坑的時候的確很惱火, 明明在虛擬機裏面是能夠生產和消費的, 可是本地的JavaApi卻始終沒法訪問.後來不經意間發現說要修改hosts文件.

  本地的JavaApi若是hosts文件沒有相關的ip地址是不會調通的.

  

  另外, 須要在虛擬機的host文件裏面加上172.16.11.224 kafka01.

  將server.config裏面的配置改爲advertised.listeners=PLAINTEXT://kafka01:9092

 

3. kafkaSpout的declareFields究竟是什麼

  這個最開始是在一個kafka+storm熱力圖項目看到的, 老師根據查看kafkaSpout的源碼發現它發送到下一層bolt的時候fileds的名稱是bytes.

  

複製代碼
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.storm.spout; import java.nio.ByteBuffer; import java.util.List; import org.apache.storm.tuple.Fields; import org.apache.storm.utils.Utils; import static org.apache.storm.utils.Utils.tuple; import static java.util.Arrays.asList; public class RawMultiScheme implements MultiScheme { @Override public Iterable<List<Object>> deserialize(ByteBuffer ser) { return asList(tuple(Utils.toByteArray(ser))); } @Override public Fields getOutputFields() { return new Fields("bytes"); } }
複製代碼

 

  並且分組的方法的也是shuffleGrouping, 這就爲難了, 假如我想要在spout開始就按照fields分組呢? 或者在接收的時候不須要bytes字節而是自定義的格式呢?

  這個時候就要更改kafkaSpout的源碼和PartitionManager的相關代碼了.

   

   在這裏也補充一個問題, 就是kafkaSpout有不少配置須要定.

  經過SpoutConfig對象的startOffsetTime字段設置消費進度,默認值是kafka.api.OffsetRequest.EarliestTime(),也就是從最先的消息開始消費,若是想從最新的消息開始消費須要手動設置成kafka.api.OffsetRequest.LatestTime()。另外還有一個問題是,這個字段只會在第一次消費消息時起做用,以後消費的offset是從zookeeper中記錄的offset開始的(存放消費記錄的地方是SpoutConfig對象的zkroot字段,未驗證)

  若是想要當前的topology的消費進度接着上一個topology的消費進度繼續消費,那麼不要修改SpoutConfig對象的id。換言之,若是你第一次已經從最先的消息開始消費了,那麼若是不換id的話,它就要從最先的消息一直消費到最新的消息,這個時候若是想要跳過中間的消息直接從最新的消息開始消費,那麼修改SpoutConfig對象的id就能夠了

  下面是SpoutConfig對象的一些字段的含義,實際上是繼承的KafkaConfig的字段,可看源碼

 

複製代碼
public int fetchSizeBytes = 1024 * 1024; //發給Kafka的每一個FetchRequest中,用此指定想要的response中總的消息的大小 public int socketTimeoutMs = 10000;//與Kafka broker的鏈接的socket超時時間
public int fetchMaxWait = 10000; //當服務器沒有新消息時,消費者會等待這些時間 public int bufferSizeBytes = 1024 * 1024;//SimpleConsumer所使用的SocketChannel的讀緩衝區大小 public MultiScheme scheme = new RawMultiScheme();//從Kafka中取出的byte[],該如何反序列化 public boolean forceFromStart = false;//是否強制從Kafka中offset最小的開始讀起 public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();//從什麼時候的offset時間開始讀,默認爲最舊的offset public long maxOffsetBehind = Long.MAX_VALUE;//KafkaSpout讀取的進度與目標進度相差多少,相差太多,Spout會丟棄中間的消息 public boolean useStartOffsetTimeIfOffsetOutOfRange = true;//若是所請求的offset對應的消息在Kafka中不存在,是否使用startOffsetTime
public int metricsTimeBucketSizeInSecs = 60;//多長時間統計一次metrics
複製代碼
相關文章
相關標籤/搜索