第一步:使用./kafka-topics.sh 命令建立topic及partitions 分區數java
./kafka-topics.sh --create--zookepper "172.16.49.173:2181" --topic "producer_test" --partitions 10 replication-factor 3
第二步:實現org.apache.kafka.clients.producer.Partitioner
分區接口,以實現自定義的消息分區apache
import java.util.List; import java.util.Map; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MyPartition implements Partitioner { private static Logger LOG = LoggerFactory.getLogger(MyPartition.class); public MyPartition() { // TODO Auto-generated constructor stub } @Override public void configure(Map<String, ?> configs) { // TODO Auto-generated method stub } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // TODO Auto-generated method stub List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); int partitionNum = 0; try { partitionNum = Integer.parseInt((String) key); } catch (Exception e) { partitionNum = key.hashCode() ; } LOG.info("the message sendTo topic:"+ topic+" and the partitionNum:"+ partitionNum); return Math.abs(partitionNum % numPartitions); } @Override public void close() { // TODO Auto-generated method stub } }
第三步:編寫 producerbootstrap
import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PartitionTest { private static Logger LOG = LoggerFactory.getLogger(PartitionTest.class); public static void main(String[] args) { // TODO Auto-generated method stub Properties props = new Properties(); props.put("bootstrap.servers", "172.16.49.173:9092;172.16.49.173:9093"); props.put("retries", 0); // props.put("batch.size", 16384); props.put("linger.ms", 1); // props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("partitioner.class", "com.goodix.kafka.MyPartition"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); ProducerRecord<String, String> record = new ProducerRecord<String, String>("producer_test", "2223132132", "test23_60"); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { // TODO Auto-generated method stub if (e != null) LOG.error("the producer has a error:" + e.getMessage()); else { LOG.info("The offset of the record we just sent is: " + metadata.offset()); LOG.info("The partition of the record we just sent is: " + metadata.partition()); } } }); try { Thread.sleep(1000); producer.close(); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } }
備註: 要先用命令建立topic及partitions 分區數;不然在自定義的分區中若是有大於1的狀況下,發送數據消息到kafka時會報
expired due to timeout while requesting metadata from brokers
錯誤bash