關於kafka中consumer subscribe與asssign的理解

結論先行

凡是 一、具備相同group.id的consumer;二、將offset提交到kafka保存(不分手動和自動提交) 的consumer共享/共同維護 kafka中該group.id對應的offset。java

那麼就會有下面的影響:apache

一、broker中保存的是該group.id下全部consumer中最新提交的offsetapi

二、當有新的consumer啓動的時候,其具體的流程應該是:bash

2-一、去broker查看是否有該`group.id`的存儲信息,有的話從該`offset`開始消費,沒有的話根據`auto.offset.reset`項配置。

2-二、consumer開始從2-1中的offset開始消費,並不斷向broker提交offset更新broker中保存的該`group.id`對應的offset值。可是consumer在正常消費過程當中就再也不從kafka拉取最新的offset,只是向kafka不斷的更新offset。
複製代碼

三、強烈建議不要混用assignsubscribe方法。session

四、seek函數能夠顯示指定offset,爲<TopicPartition, offset>粒度,單純的seek並不會提交到到broker,須要同consumer commit一次才能生效,在enable.auto.commit=true的狀況下是在poll的時候進行commit。函數

因此:oop

強烈推薦只使用subscribeapi.測試

高端用戶慎用assignapi.ui

測試

assign

兩個consumer相同配置,均採用assign方法,查看seek是否會相互影響。this

首先,兩個consumer之間會有影響,共享一套offset。

public void bothAssign() {
        KafkaConsumer consumer_a = new KafkaConsumer(props);
        KafkaConsumer consumer_b = new KafkaConsumer(props);
        ConsumerRecords<String, String> msgs;

        TopicPartition tp = new TopicPartition(topic, 0);
        // 正常消費 ok 互不影響
        consumer_a.assign(Arrays.asList(tp));
        consumer_b.assign(Arrays.asList(tp));
        int a=0, b=0;

        while (a++ < 10) {
            msgs = consumer_a.poll(100L);
            for (ConsumerRecord<String, String> record : msgs) {
                logger.info(" A : offset {}, value {}.", record.offset(), record.value());
            }
        }

        while (b++ < 10) {
            msgs = consumer_b.poll(100L);
            for (ConsumerRecord<String, String> record : msgs) {
                logger.info(" B : offset {}, value {}.", record.offset(), record.value());
            }
        }
    }

# output
2018-12-20 13:43:22 [INFO ] 2018-12-20 13:43:22,306(290) --> [main] org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:109): Kafka version : 1.0.1  
2018-12-20 13:43:22 [INFO ] 2018-12-20 13:43:22,307(291) --> [main] org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:110): Kafka commitId : c0518aa65f25317e  
2018-12-20 13:43:22 [INFO ] 2018-12-20 13:43:22,622(606) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341): [Consumer clientId=consumer-1, groupId=seekTest24] Discovered group coordinator g1-bdp-cdhtest-01.dns.guazi.com:9092 (id: 2147483233 rack: null)  
2018-12-20 13:43:22 [INFO ] 2018-12-20 13:43:22,764(748) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:44):  A : offset 24, value 24.  
2018-12-20 13:43:22 [INFO ] 2018-12-20 13:43:22,766(750) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:44):  A : offset 25, value 25.  
2018-12-20 13:43:22 [INFO ] 2018-12-20 13:43:22,767(751) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:44):  A : offset 26, value 26.  
2018-12-20 13:43:22 [INFO ] 2018-12-20 13:43:22,768(752) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:44):  A : offset 27, value 27.  
2018-12-20 13:43:22 [INFO ] 2018-12-20 13:43:22,769(753) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:44):  A : offset 28, value 28.  
2018-12-20 13:43:22 [INFO ] 2018-12-20 13:43:22,770(754) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:44):  A : offset 29, value 29.  
2018-12-20 13:43:22 [INFO ] 2018-12-20 13:43:22,771(755) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:44):  A : offset 30, value 30.  
2018-12-20 13:43:22 [INFO ] 2018-12-20 13:43:22,772(756) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:44):  A : offset 31, value 31.  
2018-12-20 13:43:22 [INFO ] 2018-12-20 13:43:22,774(758) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:44):  A : offset 32, value 32.  
2018-12-20 13:43:22 [INFO ] 2018-12-20 13:43:22,872(856) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341): [Consumer clientId=consumer-2, groupId=seekTest24] Discovered group coordinator g1-bdp-cdhtest-01.dns.guazi.com:9092 (id: 2147483233 rack: null)  
2018-12-20 13:43:22 [INFO ] 2018-12-20 13:43:22,914(898) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:51):  B : offset 32, value 32.  
2018-12-20 13:43:22 [INFO ] 2018-12-20 13:43:22,914(898) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:51):  B : offset 33, value 33.  
2018-12-20 13:43:22 [INFO ] 2018-12-20 13:43:22,915(899) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:51):  B : offset 34, value 34.  
2018-12-20 13:43:22 [INFO ] 2018-12-20 13:43:22,915(899) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:51):  B : offset 35, value 35.  
2018-12-20 13:43:22 [INFO ] 2018-12-20 13:43:22,915(899) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:51):  B : offset 36, value 36.  
2018-12-20 13:43:22 [INFO ] 2018-12-20 13:43:22,915(899) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:51):  B : offset 37, value 37.  
2018-12-20 13:43:22 [INFO ] 2018-12-20 13:43:22,916(900) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:51):  B : offset 38, value 38.  
2018-12-20 13:43:22 [INFO ] 2018-12-20 13:43:22,917(901) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:51):  B : offset 39, value 39.  
2018-12-20 13:43:22 [INFO ] 2018-12-20 13:43:22,917(901) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:51):  B : offset 40, value 40.  
複製代碼

其次,二者之間seek互不影響,會從最後一次seek的值處開始消費。

# code
public void bothAssign() {
        KafkaConsumer consumer_a = new KafkaConsumer(props);
        KafkaConsumer consumer_b = new KafkaConsumer(props);
        ConsumerRecords<String, String> msgs;

        TopicPartition tp = new TopicPartition(topic, 0);
        consumer_a.assign(Arrays.asList(tp));
        consumer_b.assign(Arrays.asList(tp));
        int a=0, b=0;

        while (a++ < 10) {
            msgs = consumer_a.poll(100L);
            for (ConsumerRecord<String, String> record : msgs) {
                logger.info(" A : offset {}, value {}.", record.offset(), record.value());
            }
        }

        while (b++ < 10) {
            msgs = consumer_b.poll(100L);
            for (ConsumerRecord<String, String> record : msgs) {
                logger.info(" B : offset {}, value {}.", record.offset(), record.value());
            }
        }

        logger.info("===================================");

        consumer_a.seek(tp, 33);
        consumer_b.seek(tp, 66);
        while (a++ < 20) {
            msgs = consumer_a.poll(100L);
            for (ConsumerRecord<String, String> record : msgs) {
                logger.info(" A : offset {}, value {}.", record.offset(), record.value());
            }
        }

        while (b++ < 20) {
            msgs = consumer_b.poll(100L);
            for (ConsumerRecord<String, String> record : msgs) {
                logger.info(" B : offset {}, value {}.", record.offset(), record.value());
            }
        }
    }
# output 
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,016(243) --> [main] org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:109): Kafka version : 1.0.1  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,016(243) --> [main] org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:110): Kafka commitId : c0518aa65f25317e  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,300(527) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341): [Consumer clientId=consumer-1, groupId=seekTest24] Discovered group coordinator g1-bdp-cdhtest-01.dns.guazi.com:9092 (id: 2147483233 rack: null)  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,408(635) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:43):  A : offset 32, value 32.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,409(636) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:43):  A : offset 33, value 33.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,411(638) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:43):  A : offset 34, value 34.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,412(639) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:43):  A : offset 35, value 35.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,413(640) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:43):  A : offset 36, value 36.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,414(641) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:43):  A : offset 37, value 37.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,414(641) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:43):  A : offset 38, value 38.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,415(642) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:43):  A : offset 39, value 39.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,415(642) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:43):  A : offset 40, value 40.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,494(721) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341): [Consumer clientId=consumer-2, groupId=seekTest24] Discovered group coordinator g1-bdp-cdhtest-01.dns.guazi.com:9092 (id: 2147483233 rack: null)  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,573(800) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:50):  B : offset 39, value 39.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,574(801) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:50):  B : offset 40, value 40.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,575(802) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:50):  B : offset 41, value 41.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,575(802) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:50):  B : offset 42, value 42.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,576(803) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:50):  B : offset 43, value 43.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,576(803) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:50):  B : offset 44, value 44.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,578(805) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:50):  B : offset 45, value 45.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,579(806) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:50):  B : offset 46, value 46.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,580(807) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:50):  B : offset 47, value 47.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,580(807) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:54): ===================================  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,598(825) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:61):  A : offset 66, value 66.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,599(826) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:61):  A : offset 67, value 67.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,600(827) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:61):  A : offset 68, value 68.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,602(829) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:61):  A : offset 69, value 69.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,604(831) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:61):  A : offset 70, value 70.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,605(832) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:61):  A : offset 71, value 71.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,606(833) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:61):  A : offset 72, value 72.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,606(833) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:61):  A : offset 73, value 73.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,608(835) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:61):  A : offset 74, value 74.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,714(941) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:68):  B : offset 70, value 70.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,715(942) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:68):  B : offset 71, value 71.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,716(943) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:68):  B : offset 72, value 72.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,721(948) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:68):  B : offset 73, value 73.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,722(949) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:68):  B : offset 74, value 74.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,722(949) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:68):  B : offset 75, value 75.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,723(950) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:68):  B : offset 76, value 76.  
2018-12-20 13:45:24 [INFO ] 2018-12-20 13:45:24,723(950) --> [main] com.songys.kafka.kafkaSeekTest.bothAssign(kafkaSeekTest.java:68):  B : offset 77, value 77.  
複製代碼

說明,二者具備相同的client.id的狀況下會報錯。

subscribe

兩個consumer相同配置,均採用subscribe方法,。

首先,兩個consumer之間共享一套offset,互相影響,不能同時對同一個topic-partition進行seek。

#subscribe單並未poll以前進行seek則會報錯.
Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition seek_test_sys-0
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)
	at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1339)
	at com.songys.kafka.kafkaSeekTest.bothSubscribe(kafkaSeekTest.java:90)
	at com.songys.kafka.kafkaSeekTest.main(kafkaSeekTest.java:112)
複製代碼

assign和subscribe

首先,assign會去影響subscribe,反過來也是,因此不要混用

public void assignAndSubscribe() {
        KafkaConsumer consumer_e = new KafkaConsumer(props);
        KafkaConsumer consumer_f = new KafkaConsumer(props);
        ConsumerRecords<String, String> msgs;
        TopicPartition tp = new TopicPartition(topic, 0);

        consumer_e.assign(Arrays.asList(tp));
        consumer_f.subscribe(Arrays.asList(topic));
        int e=0, f=0;

        while (e++ < 10) {
            msgs = consumer_e.poll(100L);
            for (ConsumerRecord<String, String> record : msgs) {
                logger.info(" E : offset {}, value {}.", record.offset(), record.value());
            }
        }

        while (f++ < 10) {
            msgs = consumer_f.poll(100L);
            for (ConsumerRecord<String, String> record : msgs) {
                logger.info(" F : offset {}, value {}.", record.offset(), record.value());
            }
        }

    }

# output
2018-12-20 12:10:21 [INFO ] 2018-12-20 12:10:21,717(358) --> [main] org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:109): Kafka version : 1.0.1  
2018-12-20 12:10:21 [INFO ] 2018-12-20 12:10:21,717(358) --> [main] org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:110): Kafka commitId : c0518aa65f25317e  
2018-12-20 12:10:22 [INFO ] 2018-12-20 12:10:22,244(885) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341): [Consumer clientId=consumer-1, groupId=seekTest19] Discovered group coordinator g1-bdp-cdhtest-01.dns.guazi.com:9092 (id: 2147483233 rack: null)  
2018-12-20 12:10:22 [INFO ] 2018-12-20 12:10:22,422(1063) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:142):  E : offset 14, value 14.  
2018-12-20 12:10:22 [INFO ] 2018-12-20 12:10:22,423(1064) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:142):  E : offset 15, value 15.  
2018-12-20 12:10:22 [INFO ] 2018-12-20 12:10:22,424(1065) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:142):  E : offset 16, value 16.  
2018-12-20 12:10:22 [INFO ] 2018-12-20 12:10:22,426(1067) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:142):  E : offset 17, value 17.  
2018-12-20 12:10:22 [INFO ] 2018-12-20 12:10:22,426(1067) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:142):  E : offset 18, value 18.  
2018-12-20 12:10:22 [INFO ] 2018-12-20 12:10:22,427(1068) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:142):  E : offset 19, value 19.  
2018-12-20 12:10:22 [INFO ] 2018-12-20 12:10:22,428(1069) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:142):  E : offset 20, value 20.  
2018-12-20 12:10:22 [INFO ] 2018-12-20 12:10:22,429(1070) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:142):  E : offset 21, value 21.  
2018-12-20 12:10:22 [INFO ] 2018-12-20 12:10:22,430(1071) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:142):  E : offset 22, value 22.  
2018-12-20 12:10:22 [INFO ] 2018-12-20 12:10:22,564(1205) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341): [Consumer clientId=consumer-2, groupId=seekTest19] Discovered group coordinator g1-bdp-cdhtest-01.dns.guazi.com:9092 (id: 2147483233 rack: null)  
2018-12-20 12:10:22 [INFO ] 2018-12-20 12:10:22,571(1212) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341): [Consumer clientId=consumer-2, groupId=seekTest19] Revoking previously assigned partitions []  
2018-12-20 12:10:22 [INFO ] 2018-12-20 12:10:22,572(1213) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:336): [Consumer clientId=consumer-2, groupId=seekTest19] (Re-)joining group  
2018-12-20 12:10:25 [INFO ] 2018-12-20 12:10:25,644(4285) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341): [Consumer clientId=consumer-2, groupId=seekTest19] Successfully joined group with generation 3  
2018-12-20 12:10:25 [INFO ] 2018-12-20 12:10:25,645(4286) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341): [Consumer clientId=consumer-2, groupId=seekTest19] Setting newly assigned partitions [seek_test_sys-0]  
2018-12-20 12:10:25 [INFO ] 2018-12-20 12:10:25,767(4408) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:150):  F : offset 22, value 22.  
2018-12-20 12:10:25 [INFO ] 2018-12-20 12:10:25,768(4409) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:150):  F : offset 23, value 23.  
2018-12-20 12:10:25 [INFO ] 2018-12-20 12:10:25,769(4410) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:150):  F : offset 24, value 24.  
2018-12-20 12:10:25 [INFO ] 2018-12-20 12:10:25,770(4411) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:150):  F : offset 25, value 25.  
2018-12-20 12:10:25 [INFO ] 2018-12-20 12:10:25,770(4411) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:150):  F : offset 26, value 26.  
2018-12-20 12:10:25 [INFO ] 2018-12-20 12:10:25,771(4412) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:150):  F : offset 27, value 27.  
2018-12-20 12:10:25 [INFO ] 2018-12-20 12:10:25,772(4413) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:150):  F : offset 28, value 28.  
2018-12-20 12:10:25 [INFO ] 2018-12-20 12:10:25,772(4413) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:150):  F : offset 29, value 29.  
複製代碼

consumer-subscribe 正常消費,而後seek,從seek處開始消費(必需要消費一次,來更新offset)。consumer-assign會從最後的offset處繼續消費。

可是,consumer-subscribe的seek以後,使用consumer-subscribe則從seek的位置開始消費,正常。

可是若是consumer-subscribe的seek以後直接使用consumer-assign進行消費不會從seek處開始消費,由於沒有提交上去。

consumer-assign正常消費,而後seek,從seek處開始消費(必需要消費一次,來更新offset),consumer-subscribe從最新的offset開始消費。

可是,consumer-assign消費一段,consumer-subscribe會從其後面繼續消費,這時候使用consumer-assign進行seek,其自己會從seek處開始進行消費,可是會出現問題,多是例子寫的很差,形成commit失敗,終歸究竟是維護一個consumer-group的offset,因此確定是應該有影響的。

# code
public void assignAndSubscribe() {
        KafkaConsumer consumer_e = new KafkaConsumer(props);
        KafkaConsumer consumer_f = new KafkaConsumer(props);
        ConsumerRecords<String, String> msgs;
        TopicPartition tp = new TopicPartition(topic, 0);

        consumer_e.assign(Arrays.asList(tp));
        consumer_f.subscribe(Arrays.asList(topic));
        int e=0, f=0;

        while (e++ < 10) {
            msgs = consumer_e.poll(100L);
            for (ConsumerRecord<String, String> record : msgs) {
                logger.info(" E : offset {}, value {}.", record.offset(), record.value());
            }
        }

        while (f++ < 10) {
            msgs = consumer_f.poll(100L);
            for (ConsumerRecord<String, String> record : msgs) {
                logger.info(" F : offset {}, value {}.", record.offset(), record.value());
            }
        }

        consumer_e.seek(tp, 66);

        logger.info("seek");

        while (e++ < 20) {
            msgs = consumer_e.poll(100L);
            for (ConsumerRecord<String, String> record : msgs) {
                logger.info(" E : offset {}, value {}.", record.offset(), record.value());
            }
        }

        while (f++ < 20) {
            msgs = consumer_f.poll(100L);
            for (ConsumerRecord<String, String> record : msgs) {
                logger.info(" F : offset {}, value {}.", record.offset(), record.value());
            }
        }
    }
# output 
2018-12-20 13:16:13 [INFO ] 2018-12-20 13:16:13,066(322) --> [main] org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:109): Kafka version : 1.0.1  
2018-12-20 13:16:13 [INFO ] 2018-12-20 13:16:13,066(322) --> [main] org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:110): Kafka commitId : c0518aa65f25317e  
2018-12-20 13:16:13 [INFO ] 2018-12-20 13:16:13,504(760) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341): [Consumer clientId=consumer-1, groupId=seekTest24] Discovered group coordinator g1-bdp-cdhtest-01.dns.guazi.com:9092 (id: 2147483233 rack: null)  
2018-12-20 13:16:13 [INFO ] 2018-12-20 13:16:13,625(881) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:142):  E : offset 0, value 0.  
2018-12-20 13:16:13 [INFO ] 2018-12-20 13:16:13,630(886) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:142):  E : offset 1, value 1.  
2018-12-20 13:16:13 [INFO ] 2018-12-20 13:16:13,644(900) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:142):  E : offset 2, value 2.  
2018-12-20 13:16:13 [INFO ] 2018-12-20 13:16:13,662(918) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:142):  E : offset 3, value 3.  
2018-12-20 13:16:13 [INFO ] 2018-12-20 13:16:13,667(923) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:142):  E : offset 4, value 4.  
2018-12-20 13:16:13 [INFO ] 2018-12-20 13:16:13,672(928) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:142):  E : offset 5, value 5.  
2018-12-20 13:16:13 [INFO ] 2018-12-20 13:16:13,674(930) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:142):  E : offset 6, value 6.  
2018-12-20 13:16:13 [INFO ] 2018-12-20 13:16:13,679(935) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:142):  E : offset 7, value 7.  
2018-12-20 13:16:13 [INFO ] 2018-12-20 13:16:13,681(937) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:142):  E : offset 8, value 8.  
2018-12-20 13:16:13 [INFO ] 2018-12-20 13:16:13,709(965) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341): [Consumer clientId=consumer-2, groupId=seekTest24] Discovered group coordinator g1-bdp-cdhtest-01.dns.guazi.com:9092 (id: 2147483233 rack: null)  
2018-12-20 13:16:13 [INFO ] 2018-12-20 13:16:13,712(968) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341): [Consumer clientId=consumer-2, groupId=seekTest24] Revoking previously assigned partitions []  
2018-12-20 13:16:13 [INFO ] 2018-12-20 13:16:13,713(969) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:336): [Consumer clientId=consumer-2, groupId=seekTest24] (Re-)joining group  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,851(4107) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341): [Consumer clientId=consumer-2, groupId=seekTest24] Successfully joined group with generation 1  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,852(4108) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341): [Consumer clientId=consumer-2, groupId=seekTest24] Setting newly assigned partitions [seek_test_sys-0]  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,881(4137) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:149):  F : offset 8, value 8.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,884(4140) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:149):  F : offset 9, value 9.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,885(4141) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:149):  F : offset 10, value 10.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,886(4142) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:149):  F : offset 11, value 11.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,886(4142) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:149):  F : offset 12, value 12.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,886(4142) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:149):  F : offset 13, value 13.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,887(4143) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:149):  F : offset 14, value 14.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,887(4143) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:149):  F : offset 15, value 15.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,887(4143) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:149):  F : offset 16, value 16.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,888(4144) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:155): seek  
2018-12-20 13:16:16 [ERROR] 2018-12-20 13:16:16,894(4150) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.error(LogContext.java:301): [Consumer clientId=consumer-1, groupId=seekTest24] Offset commit failed on partition seek_test_sys-0 at offset 66: The coordinator is not aware of this member.  
2018-12-20 13:16:16 [WARN ] 2018-12-20 13:16:16,895(4151) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.warn(LogContext.java:246): [Consumer clientId=consumer-1, groupId=seekTest24] Asynchronous auto-commit of offsets {seek_test_sys-0=OffsetAndMetadata{offset=66, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.  
2018-12-20 13:16:16 [ERROR] 2018-12-20 13:16:16,896(4152) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.error(LogContext.java:301): [Consumer clientId=consumer-1, groupId=seekTest24] Offset commit failed on partition seek_test_sys-0 at offset 66: The coordinator is not aware of this member.  
2018-12-20 13:16:16 [ERROR] 2018-12-20 13:16:16,897(4153) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.error(LogContext.java:301): [Consumer clientId=consumer-1, groupId=seekTest24] Offset commit failed on partition seek_test_sys-0 at offset 66: The coordinator is not aware of this member.  
2018-12-20 13:16:16 [WARN ] 2018-12-20 13:16:16,897(4153) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.warn(LogContext.java:246): [Consumer clientId=consumer-1, groupId=seekTest24] Asynchronous auto-commit of offsets {seek_test_sys-0=OffsetAndMetadata{offset=66, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.  
2018-12-20 13:16:16 [WARN ] 2018-12-20 13:16:16,897(4153) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.warn(LogContext.java:246): [Consumer clientId=consumer-1, groupId=seekTest24] Asynchronous auto-commit of offsets {seek_test_sys-0=OffsetAndMetadata{offset=66, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.  
2018-12-20 13:16:16 [ERROR] 2018-12-20 13:16:16,898(4154) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.error(LogContext.java:301): [Consumer clientId=consumer-1, groupId=seekTest24] Offset commit failed on partition seek_test_sys-0 at offset 66: The coordinator is not aware of this member.  
2018-12-20 13:16:16 [ERROR] 2018-12-20 13:16:16,898(4154) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.error(LogContext.java:301): [Consumer clientId=consumer-1, groupId=seekTest24] Offset commit failed on partition seek_test_sys-0 at offset 66: The coordinator is not aware of this member.  
2018-12-20 13:16:16 [WARN ] 2018-12-20 13:16:16,898(4154) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.warn(LogContext.java:246): [Consumer clientId=consumer-1, groupId=seekTest24] Asynchronous auto-commit of offsets {seek_test_sys-0=OffsetAndMetadata{offset=66, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.  
2018-12-20 13:16:16 [WARN ] 2018-12-20 13:16:16,899(4155) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.warn(LogContext.java:246): [Consumer clientId=consumer-1, groupId=seekTest24] Asynchronous auto-commit of offsets {seek_test_sys-0=OffsetAndMetadata{offset=66, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.  
2018-12-20 13:16:16 [ERROR] 2018-12-20 13:16:16,899(4155) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.error(LogContext.java:301): [Consumer clientId=consumer-1, groupId=seekTest24] Offset commit failed on partition seek_test_sys-0 at offset 66: The coordinator is not aware of this member.  
2018-12-20 13:16:16 [WARN ] 2018-12-20 13:16:16,899(4155) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.warn(LogContext.java:246): [Consumer clientId=consumer-1, groupId=seekTest24] Asynchronous auto-commit of offsets {seek_test_sys-0=OffsetAndMetadata{offset=66, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.  
2018-12-20 13:16:16 [ERROR] 2018-12-20 13:16:16,901(4157) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.error(LogContext.java:301): [Consumer clientId=consumer-1, groupId=seekTest24] Offset commit failed on partition seek_test_sys-0 at offset 66: The coordinator is not aware of this member.  
2018-12-20 13:16:16 [WARN ] 2018-12-20 13:16:16,902(4158) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.warn(LogContext.java:246): [Consumer clientId=consumer-1, groupId=seekTest24] Asynchronous auto-commit of offsets {seek_test_sys-0=OffsetAndMetadata{offset=66, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.  
2018-12-20 13:16:16 [ERROR] 2018-12-20 13:16:16,904(4160) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.error(LogContext.java:301): [Consumer clientId=consumer-1, groupId=seekTest24] Offset commit failed on partition seek_test_sys-0 at offset 66: The coordinator is not aware of this member.  
2018-12-20 13:16:16 [ERROR] 2018-12-20 13:16:16,905(4161) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.error(LogContext.java:301): [Consumer clientId=consumer-1, groupId=seekTest24] Offset commit failed on partition seek_test_sys-0 at offset 66: The coordinator is not aware of this member.  
2018-12-20 13:16:16 [WARN ] 2018-12-20 13:16:16,906(4162) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.warn(LogContext.java:246): [Consumer clientId=consumer-1, groupId=seekTest24] Asynchronous auto-commit of offsets {seek_test_sys-0=OffsetAndMetadata{offset=66, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.  
2018-12-20 13:16:16 [WARN ] 2018-12-20 13:16:16,906(4162) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.warn(LogContext.java:246): [Consumer clientId=consumer-1, groupId=seekTest24] Asynchronous auto-commit of offsets {seek_test_sys-0=OffsetAndMetadata{offset=66, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.  
2018-12-20 13:16:16 [ERROR] 2018-12-20 13:16:16,906(4162) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.error(LogContext.java:301): [Consumer clientId=consumer-1, groupId=seekTest24] Offset commit failed on partition seek_test_sys-0 at offset 66: The coordinator is not aware of this member.  
2018-12-20 13:16:16 [ERROR] 2018-12-20 13:16:16,907(4163) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.error(LogContext.java:301): [Consumer clientId=consumer-1, groupId=seekTest24] Offset commit failed on partition seek_test_sys-0 at offset 66: The coordinator is not aware of this member.  
2018-12-20 13:16:16 [WARN ] 2018-12-20 13:16:16,907(4163) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.warn(LogContext.java:246): [Consumer clientId=consumer-1, groupId=seekTest24] Asynchronous auto-commit of offsets {seek_test_sys-0=OffsetAndMetadata{offset=66, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.  
2018-12-20 13:16:16 [WARN ] 2018-12-20 13:16:16,907(4163) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.warn(LogContext.java:246): [Consumer clientId=consumer-1, groupId=seekTest24] Asynchronous auto-commit of offsets {seek_test_sys-0=OffsetAndMetadata{offset=66, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.  
2018-12-20 13:16:16 [ERROR] 2018-12-20 13:16:16,908(4164) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.error(LogContext.java:301): [Consumer clientId=consumer-1, groupId=seekTest24] Offset commit failed on partition seek_test_sys-0 at offset 66: The coordinator is not aware of this member.  
2018-12-20 13:16:16 [ERROR] 2018-12-20 13:16:16,908(4164) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.error(LogContext.java:301): [Consumer clientId=consumer-1, groupId=seekTest24] Offset commit failed on partition seek_test_sys-0 at offset 66: The coordinator is not aware of this member.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,909(4165) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:160):  E : offset 66, value 66.  
2018-12-20 13:16:16 [WARN ] 2018-12-20 13:16:16,909(4165) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.warn(LogContext.java:246): [Consumer clientId=consumer-1, groupId=seekTest24] Asynchronous auto-commit of offsets {seek_test_sys-0=OffsetAndMetadata{offset=66, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.  
2018-12-20 13:16:16 [WARN ] 2018-12-20 13:16:16,909(4165) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.warn(LogContext.java:246): [Consumer clientId=consumer-1, groupId=seekTest24] Asynchronous auto-commit of offsets {seek_test_sys-0=OffsetAndMetadata{offset=66, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.  
2018-12-20 13:16:16 [ERROR] 2018-12-20 13:16:16,910(4166) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.error(LogContext.java:301): [Consumer clientId=consumer-1, groupId=seekTest24] Offset commit failed on partition seek_test_sys-0 at offset 66: The coordinator is not aware of this member.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,910(4166) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:160):  E : offset 67, value 67.  
2018-12-20 13:16:16 [WARN ] 2018-12-20 13:16:16,910(4166) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.warn(LogContext.java:246): [Consumer clientId=consumer-1, groupId=seekTest24] Asynchronous auto-commit of offsets {seek_test_sys-0=OffsetAndMetadata{offset=66, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,911(4167) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:160):  E : offset 68, value 68.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,911(4167) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:160):  E : offset 69, value 69.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,912(4168) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:160):  E : offset 70, value 70.  
2018-12-20 13:16:16 [ERROR] 2018-12-20 13:16:16,912(4168) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.error(LogContext.java:301): [Consumer clientId=consumer-1, groupId=seekTest24] Offset commit failed on partition seek_test_sys-0 at offset 66: The coordinator is not aware of this member.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,913(4169) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:160):  E : offset 71, value 71.  
2018-12-20 13:16:16 [WARN ] 2018-12-20 13:16:16,913(4169) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.warn(LogContext.java:246): [Consumer clientId=consumer-1, groupId=seekTest24] Asynchronous auto-commit of offsets {seek_test_sys-0=OffsetAndMetadata{offset=66, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,914(4170) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:160):  E : offset 72, value 72.  
2018-12-20 13:16:16 [ERROR] 2018-12-20 13:16:16,915(4171) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.error(LogContext.java:301): [Consumer clientId=consumer-1, groupId=seekTest24] Offset commit failed on partition seek_test_sys-0 at offset 67: The coordinator is not aware of this member.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,915(4171) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:160):  E : offset 73, value 73.  
2018-12-20 13:16:16 [WARN ] 2018-12-20 13:16:16,915(4171) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.warn(LogContext.java:246): [Consumer clientId=consumer-1, groupId=seekTest24] Asynchronous auto-commit of offsets {seek_test_sys-0=OffsetAndMetadata{offset=67, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.  
2018-12-20 13:16:16 [ERROR] 2018-12-20 13:16:16,915(4171) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.error(LogContext.java:301): [Consumer clientId=consumer-1, groupId=seekTest24] Offset commit failed on partition seek_test_sys-0 at offset 68: The coordinator is not aware of this member.  
2018-12-20 13:16:16 [ERROR] 2018-12-20 13:16:16,916(4172) --> [main] org.apache.kafka.common.utils.LogContext$KafkaLogger.error(LogContext.java:301): [Consumer clientId=consumer-1, groupId=seekTest24] Offset commit failed on partition seek_test_sys-0 at offset 69: The coordinator is not aware of this member.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,916(4172) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:160):  E : offset 74, value 74.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,917(4173) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:167):  F : offset 17, value 17.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,918(4174) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:167):  F : offset 18, value 18.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,918(4174) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:167):  F : offset 19, value 19.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,919(4175) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:167):  F : offset 20, value 20.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,919(4175) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:167):  F : offset 21, value 21.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,919(4175) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:167):  F : offset 22, value 22.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,920(4176) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:167):  F : offset 23, value 23.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,920(4176) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:167):  F : offset 24, value 24.  
2018-12-20 13:16:16 [INFO ] 2018-12-20 13:16:16,921(4177) --> [main] com.songys.kafka.kafkaSeekTest.assignAndSubscribe(kafkaSeekTest.java:167):  F : offset 25, value 25.  
複製代碼

總體驗證測試

三個線程,每一個線程一個consumer,配置相同:

Thread-1 / 2 :消費者 assign 相同的topic-partition,進行消費。

Thread-3 :消費者不進行任何消費,不斷拉取kafka中該group.id上一次commit的offset值。 結果:

兩個線程Thread-1 和 2中的consumer各自消費了一份全量數據,沒有什麼問題。

可是Thread-3中的consumer拉到的值部分以下:

2018-12-25 17:25:57 [INFO ] 2018-12-25 17:25:57,967(505) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 0
2018-12-25 17:25:57 [INFO ] 2018-12-25 17:25:57,999(537) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 0
2018-12-25 17:25:58 [INFO ] 2018-12-25 17:25:58,046(584) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 0
2018-12-25 17:25:58 [INFO ] 2018-12-25 17:25:58,098(636) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 0
2018-12-25 17:25:58 [INFO ] 2018-12-25 17:25:58,230(768) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 0
2018-12-25 17:25:58 [INFO ] 2018-12-25 17:25:58,247(785) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 0
2018-12-25 17:25:58 [INFO ] 2018-12-25 17:25:58,356(894) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 0
2018-12-25 17:25:58 [INFO ] 2018-12-25 17:25:58,458(996) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 0
2018-12-25 17:25:58 [INFO ] 2018-12-25 17:25:58,478(1016) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 212
2018-12-25 17:25:58 [INFO ] 2018-12-25 17:25:58,532(1070) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 0
2018-12-25 17:25:58 [INFO ] 2018-12-25 17:25:58,546(1084) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 181
2018-12-25 17:25:58 [INFO ] 2018-12-25 17:25:58,569(1107) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 213
2018-12-25 17:25:58 [INFO ] 2018-12-25 17:25:58,583(1121) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 252
2018-12-25 17:25:58 [INFO ] 2018-12-25 17:25:58,614(1152) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 261
2018-12-25 17:25:58 [INFO ] 2018-12-25 17:25:58,629(1167) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 268
2018-12-25 17:25:58 [INFO ] 2018-12-25 17:25:58,638(1176) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 275
2018-12-25 17:25:58 [INFO ] 2018-12-25 17:25:58,655(1193) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 283
2018-12-25 17:25:58 [INFO ] 2018-12-25 17:25:58,667(1205) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 324
2018-12-25 17:25:58 [INFO ] 2018-12-25 17:25:58,677(1215) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 332
2018-12-25 17:25:58 [INFO ] 2018-12-25 17:25:58,713(1251) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 356
2018-12-25 17:25:58 [INFO ] 2018-12-25 17:25:58,817(1355) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 499
2018-12-25 17:25:58 [INFO ] 2018-12-25 17:25:58,854(1392) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 693
2018-12-25 17:25:58 [INFO ] 2018-12-25 17:25:58,962(1500) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 850
2018-12-25 17:25:58 [INFO ] 2018-12-25 17:25:58,991(1529) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 1242
2018-12-25 17:25:59 [INFO ] 2018-12-25 17:25:59,006(1544) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 949
2018-12-25 17:25:59 [INFO ] 2018-12-25 17:25:59,020(1558) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 971
2018-12-25 17:25:59 [INFO ] 2018-12-25 17:25:59,030(1568) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 1006
2018-12-25 17:25:59 [INFO ] 2018-12-25 17:25:59,038(1576) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 1076
2018-12-25 17:25:59 [INFO ] 2018-12-25 17:25:59,076(1614) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 1085
2018-12-25 17:25:59 [INFO ] 2018-12-25 17:25:59,090(1628) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 1282
2018-12-25 17:25:59 [INFO ] 2018-12-25 17:25:59,110(1648) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 1104
2018-12-25 17:25:59 [INFO ] 2018-12-25 17:25:59,134(1672) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 1299
2018-12-25 17:25:59 [INFO ] 2018-12-25 17:25:59,270(1808) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 1303
2018-12-25 17:25:59 [INFO ] 2018-12-25 17:25:59,463(2001) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 1515
2018-12-25 17:25:59 [INFO ] 2018-12-25 17:25:59,600(2138) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 1805
2018-12-25 17:25:59 [INFO ] 2018-12-25 17:25:59,692(2230) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 2300
2018-12-25 17:25:59 [INFO ] 2018-12-25 17:25:59,803(2341) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 2569
2018-12-25 17:25:59 [INFO ] 2018-12-25 17:25:59,902(2440) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 2923
2018-12-25 17:26:00 [INFO ] 2018-12-25 17:26:00,074(2612) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 2613
2018-12-25 17:26:00 [INFO ] 2018-12-25 17:26:00,097(2635) --> [Thread-2] com.ymxz.kafka.KafkaOffsetAndMeta.run(KafkaOffsetAndMeta.java:35): the offset last committed : offset 2861

複製代碼

能夠看到兩個consumer提交到broker的offset互相影響,可是沒有影響到各自的消費,只是kafka broker上保存的offset值不知道究竟是哪一個consumer提交的。

相關文章
相關標籤/搜索