<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.itcast</groupId> <artifactId>KafkaDemo</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>1.0.0</version> </dependency> </dependencies> </project>
#請求時候須要驗證 acks=all #請求失敗時候須要重試 retries=0 #內存緩存區大小 buffer.memory=33554432 #分區類 partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner #broker地址 bootstrap.servers=192.168.25.151:9092,192.168.25.152:9092,192.168.25.153:9092 #指定消息key序列化方式 key.serializer=org.apache.kafka.common.serialization.StringSerializer #指定消息自己的序列化方式 value.serializer=org.apache.kafka.common.serialization.StringSerializer
#每一個消費者分配獨立的組號 group.id=test #若是value合法,則自動提交偏移量 enable.auto.commit=true #設置多久一次更新被消費消息的偏移量 auto.commit.interval.ms=1000 #設置會話響應的時間,超過這個時間kafka能夠選擇放棄消費或者消費下一條消息 session.timeout.ms=30000 #指定消息key序列化方式 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer #指定消息自己的序列化方式 value.deserializer=org.apache.kafka.common.serialization.StringDeserializer #broker地址 bootstrap.servers=192.168.25.151:9092,192.168.25.152:9092,192.168.25.153:9092
1 package cn.itcast.kafka; 2 3 import java.io.IOException; 4 import java.io.InputStream; 5 import java.util.Properties; 6 import java.util.UUID; 7 8 import org.apache.kafka.clients.producer.KafkaProducer; 9 import org.apache.kafka.clients.producer.Producer; 10 import org.apache.kafka.clients.producer.ProducerRecord; 11 12 public class KafkaProducerSimple { 13 public static void main(String[] args) throws IOException { 14 Properties properties = new Properties(); 15 InputStream inStream = KafkaProducerSimple.class.getClassLoader().getResourceAsStream("producer.properties"); 16 17 properties.load(inStream); 18 19 Producer<String, String> producer = new KafkaProducer<>(properties); 20 String TOPIC = "orderMq6"; 21 for (int messageNo = 1; messageNo < 10000; messageNo++) { 22 producer.send(new ProducerRecord<String, String>(TOPIC,messageNo + "", UUID.randomUUID() + "itcast")); 23 } 24 } 25 }
1 package cn.itcast.kafka; 2 3 import java.io.InputStream; 4 import java.util.Arrays; 5 import java.util.Properties; 6 7 import org.apache.kafka.clients.consumer.Consumer; 8 import org.apache.kafka.clients.consumer.ConsumerRecord; 9 import org.apache.kafka.clients.consumer.ConsumerRecords; 10 import org.apache.kafka.clients.consumer.KafkaConsumer; 11 12 public class KafkaConsumerSimple { 13 14 public static void main(String[] args) throws Exception { 15 Properties properties = new Properties(); 16 InputStream inStream = KafkaConsumerSimple.class.getClassLoader().getResourceAsStream("consumer.properties"); 17 properties.load(inStream); 18 Consumer<String, String> consumer = new KafkaConsumer<>(properties); 19 consumer.subscribe(Arrays.asList("orderMq6")); 20 while (true) { 21 ConsumerRecords<String, String> records = consumer.poll(100); 22 if (records.count() > 0) { 23 for (ConsumerRecord<String, String> record : records) { 24 System.out.println(record.value()); 25 } 26 27 } 28 } 29 } 30 }
以上代碼若是執行超時,必須在本地host文件中配置broker的hostname和ip的映射。java