Kafka版本0.10.1.1html
producer發送消息後出現以下錯誤消息:java
The producer has a error:Expiring 1 record(s) for testtopic-0 due to 30039 ms has passed since last append
The producer has a error:Expiring 1 record(s) for testtopic-0 due to 30039 ms has passed since last appendapache
修改timeout.ms,batch.size等均無效,仍然出錯。編程
刪掉topic及數據,從新用命令行建立topic後,問題解決。json
緣由:producer發消息到具體topic的時候,若是沒有該topic,kafka會自動建立topic,但可能會出現上面的錯誤。因此topic必定要命令行建立,而後再使用。bootstrap
參考以下文章:api
實踐代碼採用kafka-clients V0.10.0.0 編寫(http://m.blog.csdn.net/article/details?id=51577117)服務器
第一步:使用./kafka-topics.sh 命令建立topic及partitions 分區數session
./kafka-topics.sh --create--zookepper "172.16.49.173:2181" --topic "producer_test" --partitions 10 replication-factor 3
第二步:實現org.apache.kafka.clients.producer.Partitioner
分區接口,以實現自定義的消息分區併發
import java.util.List; import java.util.Map; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MyPartition implements Partitioner { private static Logger LOG = LoggerFactory.getLogger(MyPartition.class); public MyPartition() { // TODO Auto-generated constructor stub } @Override public void configure(Map<String, ?> configs) { // TODO Auto-generated method stub } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // TODO Auto-generated method stub List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); int partitionNum = 0; try { partitionNum = Integer.parseInt((String) key); } catch (Exception e) { partitionNum = key.hashCode() ; } LOG.info("the message sendTo topic:"+ topic+" and the partitionNum:"+ partitionNum); return Math.abs(partitionNum % numPartitions); } @Override public void close() { // TODO Auto-generated method stub } }
第三步:編寫 producer
import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PartitionTest { private static Logger LOG = LoggerFactory.getLogger(PartitionTest.class); public static void main(String[] args) { // TODO Auto-generated method stub Properties props = new Properties(); props.put("bootstrap.servers", "172.16.49.173:9092;172.16.49.173:9093"); props.put("retries", 0); // props.put("batch.size", 16384); props.put("linger.ms", 1); // props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("partitioner.class", "com.goodix.kafka.MyPartition"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); ProducerRecord<String, String> record = new ProducerRecord<String, String>("producer_test", "2223132132", "test23_60"); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { // TODO Auto-generated method stub if (e != null) LOG.error("the producer has a error:" + e.getMessage()); else { LOG.info("The offset of the record we just sent is: " + metadata.offset()); LOG.info("The partition of the record we just sent is: " + metadata.partition()); } } }); try { Thread.sleep(1000); producer.close(); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } }
備註: 要先用命令建立topic及partitions 分區數;不然在自定義的分區中若是有大於1的狀況下,發送數據消息到kafka時會報
expired due to timeout while requesting metadata from brokers
錯誤
第一步:編寫具體處理消息的類
import java.io.UnsupportedEncodingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.message.MessageAndMetadata; public class Consumerwork implements Runnable { private static Logger LOG = LoggerFactory.getLogger(Consumerwork.class); @SuppressWarnings("rawtypes") private KafkaStream m_stream; private int m_threadNumber; @SuppressWarnings("rawtypes") public Consumerwork(KafkaStream a_stream,int a_threadNumber) { // TODO Auto-generated constructor stub m_threadNumber = a_threadNumber; m_stream = a_stream; } @SuppressWarnings("unchecked") @Override public void run() { // TODO Auto-generated method stub ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); while (it.hasNext()) try { MessageAndMetadata<byte[], byte[]> thisMetadata=it.next(); String jsonStr = new String(thisMetadata.message(),"utf-8") ; LOG.info("Thread " + m_threadNumber + ": " +jsonStr); LOG.info("partion"+thisMetadata.partition()+",offset:"+thisMetadata.offset()); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
第二步:編寫啓動Consumer主類
mport java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Scanner; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class ConsumerGroup { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; private static Logger LOG = LoggerFactory.getLogger(ConsumerGroup.class); public ConsumerGroup(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public static void main(String[] args) { Scanner sc = new Scanner(System.in); System.out.println("請輸入zookeeper集羣地址(如zk1:2181,zk2:2181,zk3:2181):"); String zooKeeper = sc.nextLine(); System.out.println("請輸入指定的消費group名稱:"); String groupId = sc.nextLine(); System.out.println("請輸入指定的消費topic名稱:"); String topic = sc.nextLine(); System.out.println("請輸入指定的消費處理線程數:"); int threads = sc.nextInt(); LOG.info("Starting consumer kafka messages with zk:" + zooKeeper + " and the topic is " + topic); ConsumerGroup example = new ConsumerGroup(zooKeeper, groupId, topic); example.run(threads); try { Thread.sleep(1000); } catch (InterruptedException ie) { } // example.shutdown(); } private void shutdown() { // TODO Auto-generated method stub if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { LOG.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } } catch (InterruptedException e) { LOG.info("Interrupted during shutdown, exiting uncleanly"); } } private void run(int a_numThreads) { // TODO Auto-generated method stub Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(a_numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // now launch all the threads // executor = Executors.newFixedThreadPool(a_numThreads); // now create an object to consume the messages // int threadNumber = 0; LOG.info("the streams size is "+streams.size()); for (final KafkaStream stream : streams) { executor.submit(new com.goodix.kafka.oldconsumer.Consumerwork(stream, threadNumber)); // consumer.commitOffsets(); threadNumber++; } } private ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { // TODO Auto-generated method stub Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "60000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); // props.put("rebalance.max.retries", "5"); // props.put("rebalance.backoff.ms", "15000"); return new ConsumerConfig(props); } }
1. topicCountMap.put(topic, new Integer(a_numThreads)) 是告訴Kafka我有多少個線程來處理消息。
(1). 這個線程數必須是小等於topic的partition分區數;能夠經過
./kafka-topics.sh --describe --zookeeper "172.16.49.173:2181" --topic "producer_test"
命令來查看分區的狀況
(2). kafka會根據partition.assignment.strategy指定的分配策略來指定線程消費那些分區的消息;這裏沒有單獨配置該項便是採用的默認值range策略(按照階段平均分配)。好比分區有10個、線程數有3個,則線程 1消費0,1,2,3,線程2消費4,5,6,線程3消費7,8,9。另一種是roundrobin(循環分配策略),官方文檔中寫有使用該策略有兩個前提條件的,因此通常不要去設定。
(3). 通過測試:consumerMap.get(topic).size(),應該是得到的目前該topic有數據的分區數
(4). stream即指的是來自一個或多個服務器上的一個或者多個partition的消息。每個stream都對應一個單線程處理。所以,client可以設置知足本身需求的stream數目。總之,一個stream也許表明了多個服務器partion的消息的聚合,可是每個 partition都只能到一個stream2. Executors.newFixedThreadPool(a_numThreads)是建立一個建立固定容量大小的緩衝池:每次提交一個任務就建立一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,若是某個線程由於執行異常而結束,那麼線程池會補充一個新線程。
3. props.put(「auto.offset.reset」, 「smallest」) 是指定從最小沒有被消費offset開始;若是沒有指定該項則是默認的爲largest,這樣的話該consumer就得不到生產者先產生的消息。
4. 要使用old consumer API須要引用kafka_2.11以及kafka-clients。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.0</version> </dependency>
這是一個更加底層和複雜的API
因爲使用該API須要本身控制的項比較多,也比較複雜,官方給出了一些合適的適用場景,也能夠理解成爲這些場景是High Level Consumer API 不可以作到的
1. 針對一個消息讀取屢次 2. 在一個process中,僅僅處理一個topic中的一個partitions 3. 使用事務,確保每一個消息只被處理一次
1. 必須在程序中跟蹤offset值 2. 必須找出指定Topic Partition中的lead broker 3. 必須處理broker的變更
首先,你必須知道讀哪一個topic的哪一個partition 而後,找到負責該partition的broker leader,從而找到存有該partition副本的那個broker 再者,本身去寫request並fetch數據 最終,還要注意須要識別和處理broker leader的改變
package com.goodix.kafka.oldconsumer; import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.ErrorMapping; import kafka.common.TopicAndPartition; import kafka.javaapi.*; import kafka.javaapi.consumer.SimpleConsumer; import kafka.message.MessageAndOffset; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Scanner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SimpleExample { private static Logger LOG = LoggerFactory.getLogger(SimpleExample.class); public static void main(String args[]) { SimpleExample example = new SimpleExample(); Scanner sc = new Scanner(System.in); System.out.println("請輸入broker節點的ip地址(如172.16.49.173)"); String brokerIp = sc.nextLine(); List<String> seeds = new ArrayList<String>(); seeds.add(brokerIp); System.out.println("請輸入broker節點端口號(如9092)"); int port = Integer.parseInt( sc.nextLine()); System.out.println("請輸入要訂閱的topic名稱(如test)"); String topic = sc.nextLine(); System.out.println("請輸入要訂閱要查找的分區(如0)"); int partition = Integer.parseInt( sc.nextLine()); System.out.println("請輸入最大讀取消息數量(如10000)"); long maxReads = Long.parseLong( sc.nextLine()); try { example.run(maxReads, topic, partition, seeds, port); } catch (Exception e) { LOG.error("Oops:" + e); e.printStackTrace(); } } private List<String> m_replicaBrokers = new ArrayList<String>(); public SimpleExample() { m_replicaBrokers = new ArrayList<String>(); } public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception { // find the meta data about the topic and partition we are interested in //獲取指定Topic partition的元數據 PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition); if (metadata == null) { LOG.error("Can't find metadata for Topic and Partition. Exiting"); return; } if (metadata.leader() == null) { LOG.error("Can't find Leader for Topic and Partition. Exiting"); return; } String leadBroker = metadata.leader().host(); String clientName = "Client_" + a_topic + "_" + a_partition; SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName); int numErrors = 0; while (a_maxReads > 0) { if (consumer == null) { consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); } FetchRequest req = new FetchRequestBuilder() .clientId(clientName) .addFetch(a_topic, a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka .build(); FetchResponse fetchResponse = consumer.fetch(req); if (fetchResponse.hasError()) { numErrors++; // Something went wrong! short code = fetchResponse.errorCode(a_topic, a_partition); LOG.error("Error fetching data from the Broker:" + leadBroker + " Reason: " + code); if (numErrors > 5) break; if (code == ErrorMapping.OffsetOutOfRangeCode()) { // We asked for an invalid offset. For simple case ask for the last element to reset readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName); continue; } consumer.close(); consumer = null; leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port); continue; } numErrors = 0; long numRead = 0; for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) { long currentOffset = messageAndOffset.offset(); if (currentOffset < readOffset) { LOG.error("Found an old offset: " + currentOffset + " Expecting: " + readOffset); continue; } readOffset = messageAndOffset.nextOffset(); ByteBuffer payload = messageAndOffset.message().payload(); byte[] bytes = new byte[payload.limit()]; payload.get(bytes); LOG.info("the messag's offset is :"+String.valueOf(messageAndOffset.offset()) + " and the value is :" + new String(bytes, "UTF-8")); numRead++; a_maxReads--; } if (numRead == 0) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } } } if (consumer != null) consumer.close(); } public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { LOG.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) ); return 0; } long[] offsets = response.offsets(topic, partition); return offsets[0]; } /** * 找一個leader broker * 遍歷每一個broker,取出該topic的metadata,而後再遍歷其中的每一個partition metadata,若是找到咱們要找的partition就返回 * 根據返回的PartitionMetadata.leader().host()找到leader broker * @param a_oldLeader * @param a_topic * @param a_partition * @param a_port * @return * @throws Exception */ private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception { for (int i = 0; i < 3; i++) { boolean goToSleep = false; PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition); if (metadata == null) { goToSleep = true; } else if (metadata.leader() == null) { goToSleep = true; } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { // first time through if the leader hasn't changed give ZooKeeper a second to recover // second time, assume the broker did recover before failover, or it was a non-Broker issue // goToSleep = true; } else { return metadata.leader().host(); } if (goToSleep) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } } } LOG.error("Unable to find new leader after Broker failure. Exiting"); throw new Exception("Unable to find new leader after Broker failure. Exiting"); } /** * * @param a_seedBrokers * @param a_port * @param a_topic * @param a_partition * @return */ private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) { PartitionMetadata returnMetaData = null; loop: for (String seed : a_seedBrokers) { //遍歷每一個broker SimpleConsumer consumer = null; try { // 建立Simple Consumer, consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup"); List<String> topics = Collections.singletonList(a_topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); //發送TopicMetadata Request請求 kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); //取到Topic的Metadata List<TopicMetadata> metaData = resp.topicsMetadata(); //遍歷每一個partition的metadata for (TopicMetadata item : metaData) { for (PartitionMetadata part : item.partitionsMetadata()) { // 判斷是不是要找的partition if (part.partitionId() == a_partition) { returnMetaData = part; //找到就返回 break loop; } } } } catch (Exception e) { LOG.info("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e); } finally { if (consumer != null) consumer.close(); } } if (returnMetaData != null) { m_replicaBrokers.clear(); for (kafka.cluster.BrokerEndPoint replica : returnMetaData.replicas()) { m_replicaBrokers.add(replica.host()); } } return returnMetaData; } }
Properties props = new Properties(); //brokerServer(kafka)ip地址,不須要把全部集羣中的地址都寫上,但是一個或一部分 props.put("bootstrap.servers", "172.16.49.173:9092"); //設置consumer group name,必須設置 props.put("group.id", a_groupId); //設置自動提交偏移量(offset),由auto.commit.interval.ms控制提交頻率 props.put("enable.auto.commit", "true"); //偏移量(offset)提交頻率 props.put("auto.commit.interval.ms", "1000"); //設置使用最開始的offset偏移量爲該group.id的最先。若是不設置,則會是latest即該topic最新一個消息的offset //若是採用latest,消費者只能得道其啓動後,生產者生產的消息 props.put("auto.offset.reset", "earliest"); //設置心跳時間 props.put("session.timeout.ms", "30000"); //設置key以及value的解析(反序列)類 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //訂閱topic consumer.subscribe(Arrays.asList("topic_test")); while (true) { //每次取100條信息 ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); }
須要注意的:
group.id :必須設置
auto.offset.reset:若是想得到消費者啓動前生產者生產的消息,則必須設置爲earliest;若是隻須要得到消費者啓動後生產者生產的消息,則不須要設置該項
enable.auto.commit(默認值爲true):若是使用手動commit offset則須要設置爲false,並再適當的地方調用consumer.commitSync()
,不然每次啓動消費折後都會從頭開始消費信息(在auto.offset.reset=earliest的狀況下);
不少時候,咱們是但願在得到消息並通過一些邏輯處理後,才認爲該消息已被消費,這能夠經過本身控制偏移量提交來實現。
示例1:批量提交偏移量
import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 手動批量提交偏移量 * @author lxh * */ public class ManualOffsetConsumer { private static Logger LOG = LoggerFactory.getLogger(ManualOffsetConsumer.class); public ManualOffsetConsumer() { // TODO Auto-generated constructor stub } public static void main(String[] args) { // TODO Auto-generated method stub Properties props = new Properties(); //props.put("bootstrap.servers", bootstrapServers);//"172.16.49.173:9092;172.16.49.173:9093"); //設置brokerServer(kafka)ip地址 props.put("bootstrap.servers", "172.16.49.173:9092"); //設置consumer group name props.put("group.id","manual_g1"); props.put("enable.auto.commit", "false"); //設置使用最開始的offset偏移量爲該group.id的最先。若是不設置,則會是latest即該topic最新一個消息的offset //若是採用latest,消費者只能得道其啓動後,生產者生產的消息 props.put("auto.offset.reset", "earliest"); // props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props); consumer.subscribe(Arrays.asList("producer_test")); final int minBatchSize = 5; //批量提交數量 List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { LOG.info("consumer message values is "+record.value()+" and the offset is "+ record.offset()); buffer.add(record); } if (buffer.size() >= minBatchSize) { LOG.info("now commit offset"); consumer.commitSync(); buffer.clear(); } } } }
示例2:消費完一個分區後手動提交偏移量
package com.goodix.kafka; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 消費完一個分區後手動提交偏移量 * @author lxh * */ public class ManualCommitPartion { private static Logger LOG = LoggerFactory.getLogger(ManualCommitPartion.class); public ManualCommitPartion() { // TODO Auto-generated constructor stub } public static void main(String[] args) { // TODO Auto-generated method stub Properties props = new Properties(); //props.put("bootstrap.servers", bootstrapServers);//"172.16.49.173:9092;172.16.49.173:9093"); //設置brokerServer(kafka)ip地址 props.put("bootstrap.servers", "172.16.49.173:9092"); //設置consumer group name props.put("group.id","manual_g2"); props.put("enable.auto.commit", "false"); //設置使用最開始的offset偏移量爲該group.id的最先。若是不設置,則會是latest即該topic最新一個消息的offset //若是採用latest,消費者只能得道其啓動後,生產者生產的消息 props.put("auto.offset.reset", "earliest"); // props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props); consumer.subscribe(Arrays.asList("producer_test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { LOG.info("now consumer the message it's offset is :"+record.offset() + " and the value is :" + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); LOG.info("now commit the partition[ "+partition.partition()+"] offset"); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } }
import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 消費指定分區的消息 * @author lxh * */ public class ManualPartion { private static Logger LOG = LoggerFactory.getLogger(ManualPartion.class); public ManualPartion() { // TODO Auto-generated constructor stub } public static void main(String[] args) { Properties props = new Properties(); //設置brokerServer(kafka)ip地址 props.put("bootstrap.servers", "172.16.49.173:9092"); //設置consumer group name props.put("group.id", "manual_g4"); //設置自動提交偏移量(offset),由auto.commit.interval.ms控制提交頻率 props.put("enable.auto.commit", "true"); //偏移量(offset)提交頻率 props.put("auto.commit.interval.ms", "1000"); //設置使用最開始的offset偏移量爲該group.id的最先。若是不設置,則會是latest即該topic最新一個消息的offset //若是採用latest,消費者只能得道其啓動後,生產者生產的消息 props.put("auto.offset.reset", "earliest"); // props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); TopicPartition partition0 = new TopicPartition("producer_test", 0); TopicPartition partition1 = new TopicPartition("producer_test", 1); KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props); consumer.assign(Arrays.asList(partition0, partition1)); while (true) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s \r\n", record.offset(), record.key(), record.value()); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
使用newConsumer API 只須要引用kafka-clients便可
newConsumer API 更加易懂、易用
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.0</version> </dependency>
1. 若是consumer比partition多,是浪費,由於kafka的設計是在一個partition上是不容許併發的,因此consumer數不要大於partition數 2. 若是consumer比partition少,一個consumer會對應於多個partitions,這裏主要合理分配consumer數和partition數,不然會致使partition裏面的數據被取的不均勻。最好partiton數目是consumer數目的整數倍,因此partition數目很重要,好比取24,就很容易設定consumer數目 3. 若是consumer從多個partition讀到數據,不保證數據間的順序性,kafka只保證在一個partition上數據是有序的,但多個partition,根據你讀的順序會有不一樣 4. 增減consumer,broker,partition會致使rebalance,因此rebalance後consumer對應的partition會發生變化 5. High-level接口中獲取不到數據的時候是會block的
參考文章:
Java線程池使用說明
Java併發編程:線程池的使用
如何肯定Kafka的分區數、key和consumer線程數s
Kafka Consumer接口
如何發送15MB的message到KAFKA裏面,StackOverflow的答覆:
Minor changes required for Kafka 0.10 and the new consumer compared to laughing_man's answer:
message.max.bytes
and replica.fetch.max.bytes
. message.max.bytes
has to be smaller than replica.fetch.max.bytes
.max.request.size
to send the larger message.max.partition.fetch.bytes
to receive larger messages. 5down vote |
The idea is to have equal size of message being sent from Kafka Producer to Kafka Broker and then received by Kafka Consumer i.e. Kafka producer --> Kafka Broker --> Kafka Consumer Suppose if the requirement is to send 15MB of message, then the Producer, the Broker and the Consumer, all three, needs to be in sync. Kafka Producer sends 15 MB --> Kafka Broker Allows/Stores 15 MB --> Kafka Consumer receives 15 MB The setting therefore should be A.) On Broker: message.max.bytes=15728640 replica.fetch.max.bytes=15728640 B.) On Consumer: fetch.message.max.bytes=15728640 |