kafka 消費者負載均衡實現

語言:java (spring boot),單臺kafka
場景:同一個組,兩個消費者同時消費一個topicjava

實現過程:spring

首先修改這個topic的partitions

./kafka-topics.sh --bootstrap-server localhost:9092 --alter --partitions 2 --topic topic-name


而後修改項目配置文件

spring:
  kafka:
    listener:
      concurrency: 2

生產者代碼bootstrap

@SpringBootTest
class ProducerApplicationTests {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Test
    void contextLoads()  {
        for (int i = 1; i <= 1000; i++) {
            kafkaTemplate.send("test_0105_jcy", String.valueOf(i));
            System.out.println(i);
        }
    }

}

消費者代碼(X2)spa

@Slf4j
@SpringBootApplication
public class Customer1Application {

    @KafkaListener(topics = "test_0105_jcy")
    public void consumerListener(String msg) throws InterruptedException {
        log.info(msg);
        Thread.sleep(1 * 1000);
    }


    public static void main(String[] args) {
        SpringApplication.run(Customer1Application.class, args);
    }

}

項目結構
image.pngcode

生產者發送1000條消息,兩個消費者消費狀況
image.png
image.pngserver

根據默認策略將消息發送到兩個partitions中,若是有特殊要求的話,能夠經過重載的send方法指定partitions,send(String topic, Integer partition, K key, V data),也能夠經過自定義分區器來實現。kafka

相關文章
相關標籤/搜索