語言:java (spring boot),單臺kafka
場景:同一個組,兩個消費者同時消費一個topicjava
實現過程:spring
首先修改這個topic的partitions ./kafka-topics.sh --bootstrap-server localhost:9092 --alter --partitions 2 --topic topic-name 而後修改項目配置文件 spring: kafka: listener: concurrency: 2
生產者代碼bootstrap
@SpringBootTest class ProducerApplicationTests { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Test void contextLoads() { for (int i = 1; i <= 1000; i++) { kafkaTemplate.send("test_0105_jcy", String.valueOf(i)); System.out.println(i); } } }
消費者代碼(X2)spa
@Slf4j @SpringBootApplication public class Customer1Application { @KafkaListener(topics = "test_0105_jcy") public void consumerListener(String msg) throws InterruptedException { log.info(msg); Thread.sleep(1 * 1000); } public static void main(String[] args) { SpringApplication.run(Customer1Application.class, args); } }
項目結構
code
生產者發送1000條消息,兩個消費者消費狀況
server
根據默認策略將消息發送到兩個partitions中,若是有特殊要求的話,能夠經過重載的send方法指定partitions,send(String topic, Integer partition, K key, V data),也能夠經過自定義分區器來實現。kafka