Java操做Kafka執行不成功

使用kafka-clients操做kafka始終不成功,緣由不清楚,下面貼出相關代碼及配置,請懂得指點一下,謝謝!html

環境及依賴

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.0</version>
</dependency>

JDK版本爲1.八、Kafka版本爲2.12-0.10.2.0,服務器使用CentOS-7構建。java

測試代碼

  • TestBase.javaapache

public class TestBase {

    protected Logger log = LoggerFactory.getLogger(this.getClass());

    protected String kafka_server = "192.168.60.160:9092" ;

    protected String topic = "zlikun_topic";

}
  • ProducerTest.java服務器

public class ProducerTest extends TestBase {

    protected Properties props = new Properties();

    @Before
    public void init() {

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG ,MyPartitioner.class) ;
    }

    @Test
    public void test() throws InterruptedException {

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 發送消息
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>(topic, Integer.toString(i), Integer.toString(i)), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.printf("offset = %d ,partition = %d \n", recordMetadata.offset() ,recordMetadata.partition());
                    } else {
                        log.error("send error !" ,e);
                    }
                }
            });
        }

        TimeUnit.SECONDS.sleep(3);
        producer.close();

    }

}
  • ConsumerTest.javaide

public class ConsumerTest extends TestBase {

    private Properties props = new Properties();

    @Before
    public void init() {
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server);
        props.put(ConsumerConfig.GROUP_ID_CONFIG ,"zlikun") ;
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    }

    @Test
    public void test() {

        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));
//        consumer.assign(Arrays.asList(new TopicPartition(topic, 1)));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }

    }

}

問題

# 測試topic爲手動建立
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic zlikun_topic

控制檯輸出信息post

[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time

Java操做Kafka執行不成功 >> java

這個答案描述的挺清楚的:
http://www.goodpm.net/postreply/java/1010000008863969/Java操做Kafka執行不成功.html
相關文章
相關標籤/搜索