storm kafkaSpout 踩坑問題記錄! offset問題!

整合kafka和storm例子網上不少,自行查找apache

 

問題描述:api

  kafka是以前早就搭建好的,新建的storm集羣要消費kafka的主題,因爲kafka中已經記錄了不少消息,storm消費時從最開始消費服務器

 

問題解決:socket

  下面是摘自官網的一段話:fetch

How KafkaSpout stores offsets of a Kafka topic and recovers in case of failures

As shown in the above KafkaConfig properties, you can control from where in the Kafka topic the spout begins to read by setting KafkaConfig.startOffsetTime as follows:spa

  1. kafka.api.OffsetRequest.EarliestTime(): read from the beginning of the topic (i.e. from the oldest messages onwards)
  2. kafka.api.OffsetRequest.LatestTime(): read from the end of the topic (i.e. any new messsages that are being written to the topic)
  3. A Unix timestamp aka seconds since the epoch (e.g. via System.currentTimeMillis()): see How do I accurately get offsets of messages for a certain timestamp using OffsetRequest? in the Kafka FAQ

As the topology runs the Kafka spout keeps track of the offsets it has read and emitted by storing state information under the ZooKeeper path SpoutConfig.zkRoot+ "/" + SpoutConfig.id. In the case of failures it recovers from the last written offset in ZooKeeper.code

Important: When re-deploying a topology make sure that the settings for SpoutConfig.zkRoot and SpoutConfig.id were not modified, otherwise the spout will not be able to read its previous consumer state information (i.e. the offsets) from ZooKeeper -- which may lead to unexpected behavior and/or to data loss, depending on your use case.orm

This means that when a topology has run once the setting KafkaConfig.startOffsetTime will not have an effect for subsequent runs of the topology because now the topology will rely on the consumer state information (offsets) in ZooKeeper to determine from where it should begin (more precisely: resume) reading. If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should set the parameter KafkaConfig.ignoreZkOffsets to true. If true, the spout will always begin reading from the offset defined by KafkaConfig.startOffsetTime as described above.對象

 

  這段話的包含的內容大概有,經過SpoutConfig對象的startOffsetTime字段設置消費進度,默認值是kafka.api.OffsetRequest.EarliestTime(),也就是從最先的消息開始消費,若是想從最新的消息開始消費須要手動設置成kafka.api.OffsetRequest.LatestTime()。另外還有一個問題是,這個字段只會在第一次消費消息時起做用,以後消費的offset是從zookeeper中記錄的offset開始的(存放消費記錄的地方是SpoutConfig對象的zkroot字段,未驗證)
繼承

  若是想要當前的topology的消費進度接着上一個topology的消費進度繼續消費,那麼不要修改SpoutConfig對象的id。換言之,若是你第一次已經從最先的消息開始消費了,那麼若是不換id的話,它就要從最先的消息一直消費到最新的消息,這個時候若是想要跳過中間的消息直接從最新的消息開始消費,那麼修改SpoutConfig對象的id就能夠了

 

  下面是SpoutConfig對象的一些字段的含義,實際上是繼承的KafkaConfig的字段,可看源碼

  public int fetchSizeBytes = 1024 * 1024; //發給Kafka的每一個FetchRequest中,用此指定想要的response中總的消息的大小
    public int socketTimeoutMs = 10000;//與Kafka broker的鏈接的socket超時時間
    public int fetchMaxWait = 10000;   //當服務器沒有新消息時,消費者會等待這些時間
    public int bufferSizeBytes = 1024 * 1024;//SimpleConsumer所使用的SocketChannel的讀緩衝區大小
    public MultiScheme scheme = new RawMultiScheme();//從Kafka中取出的byte[],該如何反序列化
    public boolean forceFromStart = false;//是否強制從Kafka中offset最小的開始讀起
    public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();//從什麼時候的offset時間開始讀,默認爲最舊的offset
    public long maxOffsetBehind = Long.MAX_VALUE;//KafkaSpout讀取的進度與目標進度相差多少,相差太多,Spout會丟棄中間的消息    public boolean useStartOffsetTimeIfOffsetOutOfRange = true;//若是所請求的offset對應的消息在Kafka中不存在,是否使用startOffsetTime    public int metricsTimeBucketSizeInSecs = 60;//多長時間統計一次metrics
相關文章
相關標籤/搜索