Kafka 發佈訂閱實現

    依賴包:html

<dependency>
    <groupId>org.apache.kafka</groupId>

    <artifactId>kafka_2.10</artifactId>
    <version>0.9.0.0</version>
</dependency>

    貌似新版本0.9修改Consumer相關的api,和0.9之前的實現方式不一樣,統一了consumer API的實現,詳情可見http://kafka.apache.org/documentation.html#consumerapigit

    生產者的實現:github

public class SimpleProducer {

    public void send() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.52.128:9092");
        //每次請求成功提交,保證消息發送成功
        props.put("acks", "all");
        //重試次數
        props.put("retries", 1);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //消息發送到某個分區上,默認org.apache.kafka.clients.producer.internals.DefaultPartitioner
//        props.put("partitioner.class", "com.test.kafka.simple.SimplePartitioner");

        Producer<String, String> producer = new KafkaProducer(props);
        for(int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>("mykafka", "mykafka" + Integer.toString(i), "hello kafka " + Integer.toString(i)));

        }
        producer.close();
    }

    public static void main(String[] args) {
        new SimpleProducer().send();
    }

}

    消費者實現:spring

public class SimpleConsumer {

    private static final Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);

    public void poll() {
        KafkaConsumer<String, String> consumer = null;
        try {
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.52.128:9092");
            //設置按期提交offset,也能夠手動調用KafkaConsumer.commitSync()方法提交
            props.put("enable.auto.commit", true);
            props.put("auto.commit.interval.ms", "5000");
            //心跳檢測,檢測session鏈接,間隔時間應該小於session-time-out,建議配置不大於1/3 session-time-out
            props.put("heartbeat.interval.ms", "5000");
            props.put("session.timeout.ms", "30000");
            props.put("group.id", "test-consumer-group");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            consumer = new KafkaConsumer<String, String>(props);

            Map<String, List<PartitionInfo>> topics = consumer.listTopics();
            consumer.subscribe(Arrays.asList("mykafka"));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {

                    System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
                    System.out.println();
                }

            }
        } finally {
            consumer.close();
        }
    }

    public static void main(String[] args) {
        new SimpleConsumer().poll();
    }

}

    其中能夠自定義本身的序列化的實現以及消息發送到partitioner的方式:apache

public class SimplePartitioner implements Partitioner {

    public SimplePartitioner (VerifiableProperties props) {

    }

    @Override
    public int partition(Object o, int i) {
        int partition = 0;
        String stringKey = (String) o;
        int offset = stringKey.lastIndexOf('.');
        if (offset > 0) {
            partition = Integer.parseInt( stringKey.substring(offset+1)) % i;
        }
        return partition;
    }
}

 

    以上是基於官方demo的例子實現的;spring也提供了spring-integration-kafka對Kafka的集成,詳情可參見https://github.com/spring-projects/spring-integration-kafkabootstrap

相關文章
相關標籤/搜索