使用java客戶端, kafkaproducer, kafkaconsumer進行kafka的鏈接java
注: 0.10 版本以後, 鏈接kafka只須要brokerip便可, 不須要zookeeper的信息apache
1, kafka 配置信息json
{ "producer": { "bootstrap.servers": "10.183.93.127:9093,10.183.93.128:9093,10.183.93.130:9093", "key.serializer": "org.apache.kafka.common.serialization.StringSerializer", "value.serializer": "org.apache.kafka.common.serialization.StringSerializer", "max.request.size": "10485760", "batch.size": "163840", "buffer.memory": "536870912", "max.block.ms": "500", "retries": "3", "acks": "1", }, "cosumer": { "bootstrap.servers": "10.183.93.127:9093,10.183.93.128:9093,10.183.93.130:9093", "group.id": "test222", "session.timeout.ms": "30000", "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer" } }
2, kafka utils, 用來讀取kafka的配置信息bootstrap
package com.wenbronk.kafka; import com.alibaba.fastjson.JSON; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import org.junit.Test; import java.io.FileNotFoundException; import java.io.FileReader; import java.util.Map; import java.util.Properties; public class KafkaUtils { @Test public void test() throws FileNotFoundException { getConfig("producer"); // fastJSON(); } public static JsonObject getConfig(String name) throws FileNotFoundException { JsonParser parser = new JsonParser(); JsonElement parse = parser.parse(new FileReader("src/main/resources/kafka")); JsonObject jsonObject = parse.getAsJsonObject().getAsJsonObject(name); System.out.println(jsonObject); return jsonObject; } public static Properties getProperties(String sourceName) throws FileNotFoundException { JsonObject config = KafkaUtils.getConfig(sourceName); Properties properties = new Properties(); for (Map.Entry<String, JsonElement> entry : config.entrySet()) { properties.put(entry.getKey(), entry.getValue().getAsString()); } return properties; } // public static void fastJSON() throws FileNotFoundException { // Object o = JSON.toJSON(new FileReader("src/main/resources/kafka")); // System.out.println(o); // } }
3, kafka producersession
package com.wenbronk.kafka; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.junit.Test; import javax.swing.text.StyledEditorKit; import java.io.FileNotFoundException; import java.util.*; import java.util.stream.IntStream; /** * 消息提供者 */ public class KafkaProducerMain { @Test public void send() throws Exception { HashMap<String, String> map = new HashMap<>(); map.put("http_zhixin", "send message to kafka from producer"); for (int i = 0; i < 3; i++ ) { sendMessage(map); } // sendMessage(map); } /** * 消息發送 */ public void sendMessage(Map<String, String> topicMsg) throws FileNotFoundException { Properties properties = KafkaUtils.getProperties("producer"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); for (Map.Entry<String, String> entry : topicMsg.entrySet()) { String topic = entry.getKey(); String message = entry.getValue(); ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, message); // 發送 // producer.send(record, new CallBackFuntion(topic, message)); producer.send(record, (recordMetadata, e) -> { if (e != null) { System.err.println(topic + ": " + message + "--消息發送失敗"); }else { System.err.println(topic + ": " + message + "--消息發送成功"); } }); } producer.flush(); producer.close(); } }
回掉函數可寫匿名內部類, 也可寫外部類經過新建的方式運行ide
package com.wenbronk.kafka; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; /** * 回掉函數 */ public class CallBackFuntion implements Callback { private String topic; private String message; public CallBackFuntion(String topic, String message) { this.topic = topic; this.message = message; } @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { System.out.println(topic + ": " + message + "--消息發送失敗"); }else { System.out.println(topic + ": " + message + "--消息發送成功"); } } }
4, kafka consumer函數
package com.wenbronk.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.junit.Test; import java.io.FileNotFoundException; import java.util.*; public class KafkaConsumerMain { /** * 自動提交offset */ public void commitAuto(List<String> topics) throws FileNotFoundException { Properties props = KafkaUtils.getProperties("cosumer"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(topics); while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) System.err.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } /** * 手動提交offset * * @throws FileNotFoundException */ public void commitControl(List<String> topics) throws FileNotFoundException { Properties props = KafkaUtils.getProperties("cosumer"); props.put("enable.auto.commit", "false"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(topics); final int minBatchSize = 2; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); // 阻塞同步提交 consumer.commitSync(); buffer.clear(); } } } /** * 手動設置分區 */ public void setOffSet(List<String> topics) throws FileNotFoundException { Properties props = KafkaUtils.getProperties("cosumer"); props.put("enable.auto.commit", "false"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(topics); while (true) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); // 處理每一個分區消息後, 提交偏移量 for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } /** * 手動設置消息offset */ public void setSeek(List<String> topics) throws FileNotFoundException { Properties props = KafkaUtils.getProperties("cosumer"); props.put("enable.auto.commit", "false"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(topics); consumer.seek(new TopicPartition("http_zhixin", 0), 797670770); ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.err.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); consumer.commitSync(); } } @Test public void test() throws FileNotFoundException { ArrayList<String> topics = new ArrayList<>(); topics.add("http_zhixin"); // commitAuto(topics); // commitControl(topics); // setOffSet(topics); setSeek(topics); } /** * doSomethings */ private void insertIntoDb(List<ConsumerRecord<String, String>> buffer) { buffer.stream().map(x -> x.value()).forEach(System.err::println); } }
kafka 處於同一組的消費者, 不能夠重複讀取消息, 0.11版本中加入了事物控制this