Kafka系列三 java API操做

使用java API操做kafka

1.pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>cn.itcast</groupId>
    <artifactId>KafkaDemo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>1.0.0</version>
        </dependency>
    </dependencies>
</project>

 2.producer和consumer配置文件

  2.1producer.properties

#請求時候須要驗證
acks=all
#請求失敗時候須要重試
retries=0
#內存緩存區大小
buffer.memory=33554432
#分區類
partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner
#broker地址
bootstrap.servers=192.168.25.151:9092,192.168.25.152:9092,192.168.25.153:9092
#指定消息key序列化方式
key.serializer=org.apache.kafka.common.serialization.StringSerializer
#指定消息自己的序列化方式
value.serializer=org.apache.kafka.common.serialization.StringSerializer

  2.2consumer.properties

#每一個消費者分配獨立的組號
group.id=test
#若是value合法,則自動提交偏移量
enable.auto.commit=true
#設置多久一次更新被消費消息的偏移量
auto.commit.interval.ms=1000
#設置會話響應的時間,超過這個時間kafka能夠選擇放棄消費或者消費下一條消息
session.timeout.ms=30000
#指定消息key序列化方式
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#指定消息自己的序列化方式
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#broker地址
bootstrap.servers=192.168.25.151:9092,192.168.25.152:9092,192.168.25.153:9092

3.生產者和消費者代碼

  3.1 KafkaProducerSimple.java

 1 package cn.itcast.kafka;
 2 
 3 import java.io.IOException;
 4 import java.io.InputStream;
 5 import java.util.Properties;
 6 import java.util.UUID;
 7 
 8 import org.apache.kafka.clients.producer.KafkaProducer;
 9 import org.apache.kafka.clients.producer.Producer;
10 import org.apache.kafka.clients.producer.ProducerRecord;
11 
12 public class KafkaProducerSimple {
13     public static void main(String[] args) throws IOException {
14         Properties properties = new Properties();
15         InputStream inStream = KafkaProducerSimple.class.getClassLoader().getResourceAsStream("producer.properties");
16 
17         properties.load(inStream);
18 
19         Producer<String, String> producer = new KafkaProducer<>(properties);
20         String TOPIC = "orderMq6";
21         for (int messageNo = 1; messageNo < 10000; messageNo++) {
22             producer.send(new ProducerRecord<String, String>(TOPIC,messageNo + "", UUID.randomUUID() + "itcast"));
23         }
24     }
25 }

  3.2 KafkaConsumerSimple.java

 1 package cn.itcast.kafka;
 2 
 3 import java.io.InputStream;
 4 import java.util.Arrays;
 5 import java.util.Properties;
 6 
 7 import org.apache.kafka.clients.consumer.Consumer;
 8 import org.apache.kafka.clients.consumer.ConsumerRecord;
 9 import org.apache.kafka.clients.consumer.ConsumerRecords;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 
12 public class KafkaConsumerSimple {
13 
14     public static void main(String[] args) throws Exception {
15         Properties properties = new Properties();
16         InputStream inStream = KafkaConsumerSimple.class.getClassLoader().getResourceAsStream("consumer.properties");
17         properties.load(inStream);
18         Consumer<String, String> consumer = new KafkaConsumer<>(properties);
19         consumer.subscribe(Arrays.asList("orderMq6"));
20         while (true) {
21             ConsumerRecords<String, String> records = consumer.poll(100);
22             if (records.count() > 0) {
23                 for (ConsumerRecord<String, String> record : records) {
24                     System.out.println(record.value());
25                 }
26 
27             }
28         }
29     }
30 }

 

  以上代碼若是執行超時,必須在本地host文件中配置broker的hostname和ip的映射。java

相關文章
相關標籤/搜索