基本思路:在kafka生產者生產消息時,把相同userId的消息落在同一個分區/partitionjava
public void sendTopic1(String tpoic, String userId, String message) { Properties props = new Properties(); //集羣地址,多個服務器用","分隔 props.put("bootstrap.servers", servers); //key、value的序列化,此處以字符串爲例,使用kafka已有的序列化類 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("request.required.acks", "all"); //建立生產者 int partitionNum = 0; if (StringUtils.isBlank(userId)) { //以前介紹過 Key 是能夠傳空值的 partitionNum = new Random().nextInt(11); //隨機 } else { //取 % partitionNum = Math.abs((userId.hashCode()) % 11); } log.info("發送topic的partition索引:{}", partitionNum); Producer<String, String> producer = new KafkaProducer<>(props); ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(tpoic, partitionNum, userId, message); producer.send(producerRecord); producer.close(); }