使用kafka-clients
操做kafka
始終不成功,緣由不清楚,下面貼出相關代碼及配置,請懂得指點一下,謝謝!html
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version> </dependency>
JDK
版本爲1.八、Kafka
版本爲2.12-0.10.2.0
,服務器使用CentOS-7
構建。java
TestBase.javaapache
public class TestBase { protected Logger log = LoggerFactory.getLogger(this.getClass()); protected String kafka_server = "192.168.60.160:9092" ; protected String topic = "zlikun_topic"; }
ProducerTest.java服務器
public class ProducerTest extends TestBase { protected Properties props = new Properties(); @Before public void init() { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG ,MyPartitioner.class) ; } @Test public void test() throws InterruptedException { KafkaProducer<String, String> producer = new KafkaProducer<>(props); // 發送消息 for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<String, String>(topic, Integer.toString(i), Integer.toString(i)), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) { System.out.printf("offset = %d ,partition = %d \n", recordMetadata.offset() ,recordMetadata.partition()); } else { log.error("send error !" ,e); } } }); } TimeUnit.SECONDS.sleep(3); producer.close(); } }
ConsumerTest.javaide
public class ConsumerTest extends TestBase { private Properties props = new Properties(); @Before public void init() { props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server); props.put(ConsumerConfig.GROUP_ID_CONFIG ,"zlikun") ; props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); } @Test public void test() { Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); // consumer.assign(Arrays.asList(new TopicPartition(topic, 1))); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } }
# 測試topic爲手動建立 $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic zlikun_topic
控制檯輸出信息post
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error ! org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error ! org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error ! org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error ! org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error ! org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error ! org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error ! org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error ! org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error ! org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error ! org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time