本文經過實操Kafka的API來理解topic、partition等相關概念,我將經過搭建一個Kafka集羣來實現它。html
Kafka集羣依賴於ZooKeeper對齊Broker進行協調管理,因此咱們也須要考慮搭建一個ZooKeeper集羣。java
主機規劃node
主機名稱 | 角色 | IP 地址 | 基礎軟件 |
---|---|---|---|
node01 | kafka 集羣節點 | 192.168.242.118 | JDK1.8 |
node02 | kafka 集羣節點、ZooKeeper 集羣節點 | 192.168.242.117 | JDK1.8 |
node03 | kafka 集羣節點、ZooKeeper 集羣節點 | 192.168.242.116 | JDK1.8 |
node04 | ZooKeeper 集羣節點 | 192.168.242.115 | JDK1.8 |
其中:git
/etc/hosts
,添加以下內容:192.168.242.118 node01
192.168.242.117 node02
192.168.242.116 node03
192.168.242.115 node04
複製代碼
附 ZooKeeper 集羣搭建過程:apache
在node02:bootstrap
tar -zxf apache-zookeeper-3.5.8-bin.tar.gzapi
mv apache-zookeeper-3.5.8-bin /usr/local/zookeeper服務器
cd /usr/local/zookeeper/confmarkdown
cp zoo_example.cfg zoo.cfg架構
vi zoo.cfg
設置 dataDir=/var/zookeeper 末尾添加: server.1=node02:2888:3888 server.2=node03:2888:3888 server.3=node04:2888:3888
mkdir -p /var/zookeeper
echo 1 > /var/zookeeper/myid
vi /etc/profile
export JAVA_HOME=/usr/local/java export ZK_HOME=/usr/local/zookeeper export PATH= JAVA_HOME/bin:$ZK_HOME/bin
source /etc/profile
scp分發zk相關配置到node0三、node04
scp -r /usr/local/zookeeper/ root@node03:/usr/local/ scp /usr/local/zookeeper/conf/zoo.cfg root@node03:/usr/local/zookeeper/conf/ scp /etc/profile root@node03:/etc
scp -r /usr/local/zookeeper/ root@node04:/usr/local/ scp /usr/local/zookeeper/conf/zoo.cfg root@node04:/usr/local/zookeeper/conf/ scp /etc/profile root@node04:/etc
- 在node03
mkdir -p /var/zookeeper echo 2 > /var/zookeeper/myid source /etc/profile
- 在node04
mkdir -p /var/zookeeper echo 3 > /var/zookeeper/myid source /etc/profile
- 啓動zk集羣,在node02/node03/node04三個節點均執行
zkServer.sh start
ZK集羣搭建完成後,可用zkServer.sh status
查看ZK集羣狀態:
node02,follower:
node03,leader:
node04,leader:
解壓完Kafka安裝文件後,修改配置文件config/server.properties
:
broker.id=0
listeners=PLAINTEXT://node01:9092
log.dirs=/var/kafka-logs
zookeeper.connect=node02:2181,node03:2181,node04:2181/kafka
複製代碼
與ZK集羣搭建同樣,使用SCP分發,注意修改 broker.id
和 listeners
。
這裏值得注意的是ZK鏈接配置項要帶上/kafka。
凡是使用 ZooKeeper 的技術,通常按照項目部門之類的加一個節點路徑,不要在 ZK 根節點建立本身的東西,防止難以維護。
配置Kafka環境變量,方便使用Kafka命令,編輯文件/etc/profile
:
export JAVA_HOME=/usr/local/java
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$JAVA_HOME/bin:$KAFKA_HOME/bin
複製代碼
每臺Kafka集羣節點執行命令kafka-server-start.sh
# 前臺啓動
kafka-server-start.sh $KAFKA_HOME/config/server.properties
# 後臺啓動
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
# 查看topic
kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --list
複製代碼
啓動Kafka以後,再來看一下 ZK 節點:
[zk: localhost:2181(CONNECTED) 7] ls /
[kafka, zookeeper]
[zk: localhost:2181(CONNECTED) 5] ls /kafka
[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification]
複製代碼
多了個 kafka 節點,這是能夠想到爲何以前的配置文件zookeeper.connect=node02:2181,node03:2181,node04:2181/kafka
這裏最後要加個/kafka
了,就是 kafka 啓動以後生成了不少內容,若是都放到 zk 根節點將很難維護。
Kafka 生成的一些內容:
[zk: localhost:2181(CONNECTED) 8] ls /kafka/cluster
[id]
[zk: localhost:2181(CONNECTED) 9] get /kafka/cluster/id
{"version":"1","id":"7V2aCgVnQhuPdkdryBXt4w"}
[zk: localhost:2181(CONNECTED) 10] ls /kafka/con
config consumers controller controller_epoch
[zk: localhost:2181(CONNECTED) 10] ls /kafka/controller
controller controller_epoch
[zk: localhost:2181(CONNECTED) 10] ls /kafka/controller
[]
[zk: localhost:2181(CONNECTED) 11] get /kafka/controller
controller controller_epoch
[zk: localhost:2181(CONNECTED) 11] get /kafka/controller
{"version":1,"brokerid":0,"timestamp":"1608979966643"}
複製代碼
本文代碼倉庫:gitee.com/xblzer/kafk…
Topic的管理相關和Producer生產消息的API很是簡單,這裏不作特別說明了,代碼中有註釋,下面從Consumer相關的API開始展開說明。
訂閱模式,必須設置消費者組,去掉消費者組
註釋掉
// props.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");
複製代碼
執行報錯
org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.
複製代碼
也就是說,訂閱模式能夠用到消費者組的管理機制,在配置消費者的時候必須提供有效的group.id
。
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//消費者組,若是不設置消費者組會報錯
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//訂閱topic名稱是「topic」開頭的topic
consumer.subscribe(Pattern.compile("^topic.*"));
複製代碼
如今咱們只開啓一個Consumer客戶端,能夠看到該消費者對產生的消息所有消費了:
再使用線程池,構造三個消費者線程,模擬不一樣的消費者客戶端(屬於同一消費組)。
也能夠在kafka服務器開幾個命令終端,命令以下
kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic topic01 --group group01 --property print.key=true --property print.value=true --property key.separator=,
這個命令的參數能夠用
kafka-console-consumer.sh --help
查看。
線程池模擬多個消費者客戶端:
/** * 多個線程,不一樣的消費者(屬於同一消費組) */
@Test
@SneakyThrows
public void testKafkaConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//消費者組,若是不設置消費者組會報錯
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");
//開啓三個線程,跑三個consumer客戶端,他們屬於同一消費組「group01」
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
3,
16,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
new ThreadFactoryBuilder().setNameFormat("[消費者‐%d]").build(),
new ThreadPoolExecutor.AbortPolicy()
);
for (int i = 0; i < 3; i++) {
threadPoolExecutor.execute(() -> {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//訂閱topic名稱是「topic」開頭的topic
consumer.subscribe(Pattern.compile("^topic.*"));
//訂閱topic01
// consumer.subscribe(Arrays.asList("topic01"));
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
if (!consumerRecords.isEmpty()) {
Iterator<ConsumerRecord<String, String>> consumerRecordIterator = consumerRecords.iterator();
while (consumerRecordIterator.hasNext()) {
ConsumerRecord<String, String> consumerRecord = consumerRecordIterator.next();
String key = consumerRecord.key();
String value = consumerRecord.value();
//消息所在分區
int partition = consumerRecord.partition();
//消息在所在分區的偏移量
long offset = consumerRecord.offset();
System.out.println("線程" + Thread.currentThread().getName() + "key:" + key + ",value:" + value + ",partition:" + partition + ",offset:" + offset);
}
}
}
});
}
threadPoolExecutor.shutdown();
threadPoolExecutor.awaitTermination(1, TimeUnit.HOURS);
}
複製代碼
運行測試類,
能夠看到Consumer協調器(ConsumerCoordinator
)分配消費分區狀況:
線程名稱 | 消費者 | 分區 |
---|---|---|
消費者-0 | consumer-1 | topic01的分區0,topic02的分區0 |
消費者-1 | consumer-2 | topic01的分區1,topic02的分區1 |
消費者-2 | consumer-3 | topic01的分區2 |
從新生產消息,查看消息消費狀況:
10 條 record 被同一個消費組的三個消費者消費,這個是消費者組的特性之一,組內平分消費分區的消費進行消費,有一個負載均衡的理念在裏面。
當消息中不帶key(key=null)時,將按照輪詢的方式對partition中的消息進行消費:
客戶端宕機
再啓動一個消費者客戶端測試,
控制檯有新的日誌輸出,能夠看到ConsumerCoordinator
從新分配了消費分區:
線程名稱 | 消費者 | 分區 |
---|---|---|
消費者-0 | consumer-1 | topic01的分區1,topic02的分區1 |
消費者-1 | consumer-2 | topic01的分區2 |
消費者-2 | consumer-3 | 沒有分配消費分區 |
新開的線程 | 新開的消費者 | topic01的分區0,topic02的分區0 |
執行一下生產者,看下消費狀況:
消費者消費分配給本身的分區內的消息!
這個時候把新開的那個Consumer斷開,模擬消費者宕機,看Kafka的從新分配:
Kafka消費者組內分區消費負載均衡。
上面演示的是consumer主動訂閱,主動訂閱的狀況下,消費者協調器會協調消費者進行分區消費,有一個負載均衡的理念在裏面。
手動指定分區進行消費的話,就會失去組的特性,assign 方法:
//從開始位置消費
List<TopicPartition> partitions = Arrays.asList(new TopicPartition("topic01", 0));
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
複製代碼
在探究Kafka高性能之道一文中,我已提到了Kafka是如何決定發送消息到topic的哪一個分區的:
Kafka默認的分區策略在DefaultPartitioner
中也有定義:
/** * The default partitioning strategy: * <ul> * <li>If a partition is specified in the record, use it * <li>If no partition is specified but a key is present choose a partition based on a hash of the key * <li>If no partition or key is present choose a partition in a round-robin fashion */
public class DefaultPartitioner implements Partitioner {
//...
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
//...
}
複製代碼
這裏說明了:
上面已經使用API獲得了驗證。
通常狀況下,這種默認的分區策略就知足生產需求了,可是若是有特殊的業務需求,還能夠自定義分區策略,
public void testProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//定義Partitioner
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
//TODO
}
複製代碼
先看一下,ProducerConfig
源碼中關於分區配置的說明:
public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
private static final String PARTITIONER_CLASS_DOC = "Partitioner class that implements the <code>org.apache.kafka.clients.producer.Partitioner</code> interface.";
複製代碼
自定義的Partitioner必須實現org.apache.kafka.clients.producer.Partitioner
接口,這裏自定義一個Partitioner,分區策略也按照DefaultPartitioner
的策略來,只是其實現略有不一樣:
public class MyPartitioner implements Partitioner {
private AtomicInteger counter = new AtomicInteger(0);
/** * 返回分區號 * @param topic topic * @param key key * @param keyBytes key的字節數 * @param value value * @param valueBytes value的字節數 * @param cluster 集羣信息 * @return 分區號 */
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//先獲取集羣的分區數
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int increment = counter.getAndIncrement();
//拿這個值模上分區數
// increment & Integer.MAX_VALUE 保證是個正數
return (increment & Integer.MAX_VALUE) % numPartitions;
} else {
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
@Override
public void close() {
System.out.println("MyPartitioner#close.");
}
@Override
public void configure(Map<String, ?> map) {
System.out.println("MyPartitioner#configure.");
}
}
複製代碼
Producer這裏:
//定義Partitioner
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
//建立生產者
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
複製代碼
運行生產這實例,這裏生產topic02的消息:
能夠看到分區策略走的是咱們自定義的分區策略,消費者:
前面API建立topic02的時候只設置了兩個分區,因此這裏是兩個分區的輪詢。同理能夠驗證消息帶key的分區消費策略。
前面API演示的時候,生產者和消費者有兩個重要的配置,
ProducerConfig
ConsumerConfig
這個是生產者生產消息是須要對key和value進行序列化,消費者消費消息須要對其進行反序列化,前面序列化和反序列化類是StringSerializer
和StringDeserializer
,跟一下源碼,能夠看到他們都實現了規定好的接口(Serializer<String>
和Deserializer<String>
):
生產環境中,咱們發送的消息有時是對象,此時咱們能夠自定義對象序列化類,這樣能夠完成對象消息的傳輸,自定義序列化實現Serializer
和Deserializer
接口便可。
這裏藉助於commons-lang3
包下的SerializationUtils
來進行序列化和反序列化:
//序列化
@Override
public byte[] serialize(String topic, Object data) {
// return new byte[0];
return SerializationUtils.serialize((Serializable) data);
}
複製代碼
//反序列化
@Override
public Object deserialize(String topic, byte[] data) {
System.out.println("自定義反序列化 topic:" + topic);
return SerializationUtils.deserialize(data);
}
複製代碼
生產消息,key是String類型,value是Order對象:
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ObjectSerializer.class.getName());
//建立生產者
KafkaProducer<String, Order> producer = new KafkaProducer<>(props);
複製代碼
消費消息:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ObjectDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");
KafkaConsumer<String, Order> consumer = new KafkaConsumer<>(props);
複製代碼
啓動生產者:
自定義的序列化生效,再啓動消費者,控制檯打印:
成功將Order信息打印出來,自定義反序列化也生效了。
發送數據的時候,能夠經過攔截器拿到數據的一些消息,而後能夠任意擺佈這些數據了(對數據作一些裝飾),好比發送失敗了,咱們能夠經過攔截器把錯誤信息拿到進行分析。
只要在ProducerConfig
中配置INTERCEPTOR_CLASSES_CONFIG
這個配置項就能夠設置攔截起了,和前面的Partitioner
、Serializer
同理,看一下這個配置項的源碼描述:
/** <code>interceptor.classes</code> */
public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to use as interceptors. "
+ "Implementing the <code>org.apache.kafka.clients.producer.ProducerInterceptor</code> interface allows you to intercept (and possibly mutate) the records "
+ "received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.";
複製代碼
這裏說明了默認是沒有攔截器的,自定義攔截器須要實現ProducerInterceptor
接口。
public class MyProducerInterceptor implements ProducerInterceptor {
@Override
public ProducerRecord onSend(ProducerRecord record) {
return new ProducerRecord(record.topic(), record.key(), record.value() + " --- 攔截了。");
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println("metadata:" + metadata + ",exception:" + exception);
}
@Override
public void close() {
System.out.println("MyProducerInterceptor#close");
}
@Override
public void configure(Map<String, ?> configs) {
System.out.println("MyProducerInterceptor#configure");
}
}
複製代碼
運行生產着消費者便可觀察到消息成功攔截。
myid
。KAFKA_HOME/config/server.properties文件
)broker.id
、listeners
、log.dirs
和zookeeper.connect
。