Kafka 0.9+Zookeeper3.4.6集羣搭建、配置,新Client API的使用要點,高可用性測試,以及各類坑 (轉載)

Kafka 0.9版本對java client的api作出了較大調整,本文主要總結了Kafka 0.9在集羣搭建、高可用性、新API方面的相關過程和細節,以及本人在安裝調試過程當中踩出的各類坑。html

 

關於Kafka的結構、功能、特色、適用場景等,網上處處都是,我就再也不贅述了,直接進入正文java

 

Kafka 0.9集羣安裝配置node

 

操做系統:CentOS 6.5shell

 

 

1. 安裝Java環境apache

    Zookeeper和Kafka的運行都須要Java環境,因此先安裝JRE,Kafka默認使用G1垃圾回收器,若是不更改垃圾回收器,官方推薦使用 7u51以上版本的JRE。若是你使用老版本的JRE,須要更改Kafka的啓動腳本,指定G1之外的垃圾回收器。bootstrap

    Java環境的安裝過程在此不贅述了。api

 

 

2. Zookeeper集羣搭建安全

    Kafka依賴Zookeeper管理自身集羣(Broker、Offset、Producer、Consumer等),因此先要安裝 Zookeeper。天然,爲了達到高可用的目的,Zookeeper自身也不能是單點,接下來就介紹如何搭建一個最小的Zookeeper集羣(3個 zk節點)網絡

    此處選用Zookeeper的版本是3.4.6,此爲Kafka0.9中推薦的Zookeeper版本。session

   

    首先解壓

tar -xzvf zookeeper-3.4.6.tar.gz

 

    進入zookeeper的conf目錄,將zoo_sample.cfg複製一份,命名爲zoo.cfg,此即爲Zookeeper的配置文件

 

cp zoo_sample.cfg zoo.cfg

 

    編輯zoo.cfg

# The number of milliseconds of each tick 
tickTime=2000 
# The number of ticks that the initial 
# synchronization phase can take 
initLimit=10 
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement 
syncLimit=5 
# the directory where the snapshot is stored. 
dataDir=/data/zk/zk0/data 
dataLogDir=/data/zk/zk0/logs 
# the port at which the clients will connect 
clientPort=2181 
server.0=10.0.0.100:4001:4002 
server.1=10.0.0.101:4001:4002 
server.2=10.0.0.102:4001:4002

 

 

  • dataDir和dataLogDir的路徑須要在啓動前建立好
  • clientPort爲zookeeper的服務端口
  • server.0/1/2爲zk集羣中三個node的信息,定義格式爲hostname:port1:port2,其中port1是node間通訊使用的端口,port2是node選舉使用的端口,需確保三臺主機的這兩個端口都是互通的

    在另外兩臺主機上執行一樣的操做,安裝並配置zookeeper

    分別在三臺主機的dataDir路徑下建立一個文件名爲myid的文件,文件內容爲該zk節點的編號。例如在第一臺主機上創建的myid文件內容是0,第二臺是1。

 

    接下來,啓動三臺主機上的zookeeper服務:

bin/zkServer.sh start

    3個節點都啓動完成後,可依次執行以下命令查看集羣狀態:

bin/zkServer.sh status

    命令輸出以下:

    Mode: leader 或 Mode: follower

    3個節點中,應有1個leader和兩個follower

 

 

    驗證zookeeper集羣高可用性:

    假設目前3個zk節點中,server0爲leader,server1和server2爲follower

    咱們停掉server0上的zookeeper服務:

bin/zkServer.sh stop

    再到server1和server2上查看集羣狀態,會發現此時server1(也有多是server2)爲leader,另外一個爲follower。

 

    再次啓動server0的zookeeper服務,運行zkServer.sh status檢查,發現新啓動的server0也爲follower

    至此,zookeeper集羣的安裝和高可用性驗證完成。

 

    附:Zookeeper默認會將控制檯信息輸出到啓動路徑下的zookeeper.out中,顯然在生產環境中咱們不能容許Zookeeper這樣作,經過以下方法,可讓Zookeeper輸出按尺寸切分的日誌文件:

    修改conf/log4j.properties文件,將

    zookeeper.root.logger=INFO, CONSOLE

    改成

    zookeeper.root.logger=INFO, ROLLINGFILE

    修改bin/zkEnv.sh文件,將

    ZOO_LOG4J_PROP="INFO,CONSOLE"

    改成

    ZOO_LOG4J_PROP="INFO,ROLLINGFILE"

    而後重啓zookeeper,就ok了

 

 

3. Kafka集羣搭建

    此例中,咱們會安裝配置一個有兩個Broker組成的Kafka集羣,並在其上建立一個兩個分區的Topic

    本例中使用Kafka最新版本0.9.0.1

 

    首先解壓

tar -xzvf kafka_2.11-0.9.0.1.tgz

    編輯config/server.properties文件,下面列出關鍵的參數

 

#此Broker的ID,集羣中每一個Broker的ID不可相同 
broker.id=0 
#◇◇器,端口號與port一致便可 
listeners=PLAINTEXT://:9092 
#Broker◇◇的端口 
port=9092 
#Broker的Hostname,填主機IP便可 
host.name=10.0.0.100 
#向Producer和Consumer建議鏈接的Hostname和port (此處有坑,具體見後) 
advertised.host.name=10.0.0.100 
advertised.port=9092 
#進行IO的線程數,應大於主機磁盤數 
num.io.threads=8 
#消息文件存儲的路徑 
log.dirs=/data/kafka-logs 
#消息文件清理週期,即清理x小時前的消息記錄 
log.retention.hours=168 
#每一個Topic默認的分區數,通常在建立Topic時都會指定分區數,因此這個配成1就好了 
num.partitions=1 
#Zookeeper鏈接串,此處填寫上一節中安裝的三個zk節點的ip和端口便可 
zookeeper.connect=10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181

 

 

    配置項的詳細說明請見官方文檔:http://kafka.apache.org/documentation.html#brokerconfigs

 

    此處的坑:

按 照官方文檔的說法,advertised.host.name和advertised.port這兩個參數用於定義集羣向Producer和 Consumer廣播的節點host和port,若是不定義的話,會默認使用host.name和port的定義。但在實際應用中,我發現若是不定義 advertised.host.name參數,使用Java客戶端從遠端鏈接集羣時,會發生鏈接超時,拋出異 常:org.apache.kafka.common.errors.TimeoutException: Batch Expired 

通過debug發現,鏈接到集羣是成功的,但鏈接到集羣后更新回來的集羣meta信息倒是錯誤的:  
可以看到,metadata中的Cluster信息,節點的hostname是iZ25wuzqk91Z這樣的一串數字,而不是實際的ip地址 10.0.0.100和101。iZ25wuzqk91Z實際上是遠端主機的hostname,這說明在沒有配置advertised.host.name 的狀況下,Kafka並無像官方文檔宣稱的那樣改成廣播咱們配置的host.name,而是廣播了主機配置的hostname。遠端的客戶端並無配置 hosts,因此天然是鏈接不上這個hostname的。要解決這一問題,把host.name和advertised.host.name都配置成絕對 的ip地址就能夠了。


 

    接下來,咱們在另外一臺主機也完成Kafka的安裝和配置,而後在兩臺主機上分別啓動Kafka:

bin/kafka-server-start.sh -daemon config/server.properties 

 

    此處的坑:

官方給出的後臺啓動kafka的方法是:
bin/kafka-server-start.sh config/server.properties & 

    但用這種方式啓動後,只要斷開Shell或登出,Kafka服務就會自動shutdown,不知是OS的問題仍是SSH的問題仍是Kafka本身的問題,總之我改用-daemon方式啓動Kafka纔不會在斷開shell後自動shutdown。

 

 

    接下來,咱們建立一個名爲test,擁有兩個分區,兩個副本的Topic:

bin/kafka-topics.sh --create --zookeeper 10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181 --replication-factor 2 --partitions 2 --topic test

 

    建立完成後,使用以下命令查看Topic狀態:

bin/kafka-topics.sh --describe --zookeeper 10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181 --topic test

 

    輸出:

Topic:test PartitionCount:2 ReplicationFactor:2 Configs: 
     Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1 
     Topic: test Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1

 

    解讀:test這個topic,當前有2個分區,分別爲0和1,分區0的Leader是1(這個1是broker.id),分區0有兩個 Replica(副本),分別是1和0,這兩個副本中,Isr(In-sync)的是0和1。分區2的Leader是0,也有兩個Replica,一樣也 是兩個replica都是in-sync狀態

 

 

至此,Kafka 0.9集羣的搭建工做就完成了,接下來咱們將介紹新的Java API的使用,以及集羣高可用性的驗證測試。


 

4. 使用Kafka的Producer API來完成消息的推送

 

1) Kafka 0.9.0.1的java client依賴:

	<dependency>
	    <groupId>org.apache.kafka</groupId>
	    <artifactId>kafka-clients</artifactId>
	    <version>0.9.0.1</version>
	</dependency>

 

2) 寫一個KafkaUtil工具類,用於構造Kafka Client

public class KafkaUtil {
	private static KafkaProducer<String, String> kp;

	public static KafkaProducer<String, String> getProducer() {
		if (kp == null) {
			Properties props = new Properties();
			props.put("bootstrap.servers", "10.0.0.100:9092,10.0.0.101:9092");
			props.put("acks", "1");
			props.put("retries", 0);
			props.put("batch.size", 16384);
			props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
			props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
			kp = new KafkaProducer<String, String>(props);
		}
		return kp;
	}
}

  KafkaProducer<K,V>的K表明每條消息的key類型,V表明消息類型。消息的key用於決定此條消息由哪個partition接收,因此咱們須要保證每條消息的key是不一樣的。

  Producer端的經常使用配置

  • bootstrap.servers:Kafka集羣鏈接串,能夠由多個host:port組成
  • acks:broker消息確認的模式,有三種:
    0:不進行消息接收確認,即Client端發送完成後不會等待Broker的確認
    1:由Leader確認,Leader接收到消息後會當即返回確認信息
    all:集羣完整確認,Leader會等待全部in-sync的follower節點都確認收到消息後,再返回確認信息
    咱們能夠根據消息的重要程度,設置不一樣的確認模式。默認爲1
  • retries:發送失敗時Producer端的重試次數,默認爲0
  • batch.size:當同時有大量消息要向同一個分區發送時,Producer端會將消息打包後進行批量發送。若是設置爲0,則每條消息都DuLi發送。默認爲16384字節
  • linger.ms:發送消息前等待的毫秒數,與batch.size配合使用。在消息負載不高的狀況下,配置linger.ms可以讓Producer在發送消息前等待必定時間,以積累更多的消息打包發送,達到節省網絡資源的目的。默認爲0
  • key.serializer/value.serializer:消息key/value的序列器Class,根據key和value的類型決定
  • buffer.memory:消息緩衝池大小。還沒有被髮送的消息會保存在Producer的內存中,若是消息產生的速度大於消息發送的速度,那麼緩衝池滿後發送消息的請求會被阻塞。默認33554432字節(32MB)

  更多的Producer配置見官網:http://kafka.apache.org/documentation.html#producerconfigs

 

  3) 寫一個簡單的Producer端,每隔1秒向Kafka集羣發送一條消息:

public class KafkaTest {
	public static void main(String[] args) throws Exception{
		Producer<String, String> producer = KafkaUtil.getProducer();
		int i = 0;
		while(true) {
			ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", String.valueOf(i), "this is message"+i);
			producer.send(record, new Callback() {
				public void onCompletion(RecordMetadata metadata, Exception e) {
					if (e != null)
						e.printStackTrace();
					System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset());
				}
			});
			i++;
			Thread.sleep(1000);
		}
	}
}

 

  在調用KafkaProducer的send方法時,能夠註冊一個回調方法,在Producer端完成發送後會觸發回調邏輯,在回調方法的 metadata對象中,咱們可以獲取到已發送消息的offset和落在的分區等信息。注意,若是acks配置爲0,依然會觸發回調邏輯,只是拿不到 offset和消息落地的分區信息。

    跑一下,輸出是這樣的:

message send to partition 0, offset: 28 
message send to partition 1, offset: 26 
message send to partition 0, offset: 29 
message send to partition 1, offset: 27 
message send to partition 1, offset: 28 
message send to partition 0, offset: 30 
message send to partition 0, offset: 31 
message send to partition 1, offset: 29 
message send to partition 1, offset: 30 
message send to partition 1, offset: 31 
message send to partition 0, offset: 32 
message send to partition 0, offset: 33 
message send to partition 0, offset: 34 
message send to partition 1, offset: 32

  乍一看彷佛offset亂掉了,但其實這是由於消息分佈在了兩個分區上,每一個分區上的offset實際上是正確遞增的。

 

5. 使用Kafka的Consumer API來完成消息的消費

 

1) 改造一下KafkaUtil類,加入Consumer client的構造。

public class KafkaUtil {
	private static KafkaProducer<String, String> kp;
	private static KafkaConsumer<String, String> kc;

	public static KafkaProducer<String, String> getProducer() {
		if (kp == null) {
			Properties props = new Properties();
			props.put("bootstrap.servers", "10.0.0.100:9092,10.0.0.101:9092");
			props.put("acks", "1");
			props.put("retries", 0);
			props.put("batch.size", 16384);
			props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
			props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
			kp = new KafkaProducer<String, String>(props);
		}
		return kp;
	}
	
	public static KafkaConsumer<String, String> getConsumer() {
		if(kc == null) {
			Properties props = new Properties();
			props.put("bootstrap.servers", "10.0.0.100:9092,10.0.0.101:9092");
			props.put("group.id", "1");
			props.put("enable.auto.commit", "true");
			props.put("auto.commit.interval.ms", "1000");
			props.put("session.timeout.ms", "30000");
			props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
			props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
			kc = new KafkaConsumer<String, String>(props);
		}
		return kc;
	}
}

  一樣,咱們介紹一下Consumer經常使用配置

  • bootstrap.servers/key.deserializer/value.deserializer:和Producer端的含義同樣,再也不贅述
  • fetch.min.bytes:每次最小拉取的消息大小(byte)。Consumer會等待消息積累到必定尺寸後進行批量拉取。默認爲1,表明有一條就拉一條
  • max.partition.fetch.bytes:每次從單個分區中拉取的消息最大尺寸(byte),默認爲1M
  • group.id:Consumer的group id,同一個group下的多個Consumer不會拉取到重複的消息,不一樣group下的Consumer則會保證拉取到每一條消息。注意,同一個group下的consumer數量不能超過度區數。
  • enable.auto.commit:是否自動提交已拉取消息的offset。提交offset即視爲該消息已經成功被消費,該組下的Consumer沒法再拉取到該消息(除非手動修改offset)。默認爲true
  • auto.commit.interval.ms:自動提交offset的間隔毫秒數,默認5000。

  所有的Consumer配置見官方文檔:http://kafka.apache.org/documentation.html#newconsumerconfigs

 

2) 編寫Consumer端:

public class KafkaTest {
	public static void main(String[] args) throws Exception{
		KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer();
		consumer.subscribe(Arrays.asList("test"));
		while(true) {
			ConsumerRecords<String, String> records = consumer.poll(1000);
			for(ConsumerRecord<String, String> record : records) {
				System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());
			}
		}
	}
}

 

  運行輸出:

fetched from partition 0, offset: 28, message: this is message0 
fetched from partition 0, offset: 29, message: this is message2 
fetched from partition 0, offset: 30, message: this is message5 
fetched from partition 0, offset: 31, message: this is message6 
fetched from partition 0, offset: 32, message: this is message10 
fetched from partition 0, offset: 33, message: this is message11 
fetched from partition 0, offset: 34, message: this is message12 
fetched from partition 1, offset: 26, message: this is message1 
fetched from partition 1, offset: 27, message: this is message3 
fetched from partition 1, offset: 28, message: this is message4 
fetched from partition 1, offset: 29, message: this is message7 
fetched from partition 1, offset: 30, message: this is message8 
fetched from partition 1, offset: 31, message: this is message9 
fetched from partition 1, offset: 32, message: this is message13

 

說明:

  • KafkaConsumer的poll方法便是從Broker拉取消息,在poll以前首先要用subscribe方法訂閱一個Topic。
  • poll方法的入參是拉取超時毫秒數,若是沒有新的消息可供拉取,consumer會等待指定的毫秒數,到達超時時間後會直接返回一個空的結果集。
  • 如 果Topic有多個partition,KafkaConsumer會在多個partition間以輪詢方式實現負載均衡。若是啓動了多個 Consumer線程,Kafka也可以經過zookeeper實現多個Consumer間的調度,保證同一組下的Consumer不會重複消費消息。注 意,Consumer數量不能超過partition數,超出部分的Consumer沒法拉取到任何數據。
  • 能夠看出,拉取到的消息並非徹底順序化的,kafka只能保證一個partition內的消息先進先出,因此在跨partition的狀況下,消息的順序是沒有保證的。
  • 本 例中採用的是自動提交offset,Kafka client會啓動一個線程按期將offset提交至broker。假設在自動提交的間隔內發生故障(好比整個JVM進程死掉),那麼有一部分消息是會被 重複消費的。要避免這一問題,可以使用手動提交offset的方式。構造consumer時將enable.auto.commit設爲false,並在代 碼中用consumer.commitSync()來手動提交。

若是不想讓kafka控制consumer拉取數據時在partition間的負載均衡,也能夠手工控制:

	public static void main(String[] args) throws Exception{
		KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer();
	    String topic = "test";
	    TopicPartition partition0 = new TopicPartition(topic, 0);
	    TopicPartition partition1 = new TopicPartition(topic, 1);
	    consumer.assign(Arrays.asList(partition0, partition1));
		while(true) {
			ConsumerRecords<String, String> records = consumer.poll(100);
			for(ConsumerRecord<String, String> record : records) {
				System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());
			}
			consumer.commitSync();
		}
	}

 使用consumer.assign()方法爲consumer線程指定1個或多個partition。

 

  此處的坑:

在測試中我發現,若是用手工指定partition的方法拉取消息,不知爲什麼kafka的自動提交offset機制會失效,必須使用手動方式才能正確提交已消費的消息offset。

 

  題外話:

在 真正的應用環境中,Consumer端將消息拉取下來後要作的確定不止是輸出出來這麼簡單,在消費消息時頗有可能須要花掉更多的時間。1個 Consumer線程消費消息的速度頗有多是趕不上Producer產生消息的速度,因此咱們不得不考慮Consumer端採用多線程模型來消費消息。 
然而KafkaConsumer並非線程安全的,多個線程操做同一個KafkaConsumer實例會出現各類問題,Kafka官方對於Consumer端的多線程處理給出的指導建議以下: 

1. 每一個線程都持有一個KafkaConsumer對象 
好處: 
  • 實現簡單
  • 不須要線程間的協做,效率最高
  • 最容易實現每一個Partition內消息的順序處理

弊端:

  • 每一個KafkaConsumer都要與集羣保持一個TCP鏈接
  • 線程數不能超過Partition數
  • 每一batch拉取的數據量會變小,對吞吐量有必定影響

2. 解耦,1個Consumer線程負責拉取消息,數個Worker線程負責消費消息
好處:

  • 可自由控制Worker線程的數量,不受Partition數量限制

弊端:

  • 消息消費的順序沒法保證
  • 難以控制手動提交offset的時機

我的認爲第二種方式更加可取,consumer數不能超過partition數這個限制是很要命的,不可能爲了提升Consumer消費消息的效率而把Topic分紅更多的partition,partition越多,集羣的高可用性就越低。

 

 

6. Kafka集羣高可用性測試

 

1) 查看當前Topic的狀態:

/kafka-topics.sh --describe --zookeeper 10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181 --topic test

  輸出:

Topic:test PartitionCount:2 ReplicationFactor:2 Configs: 
   Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1 
   Topic: test Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1

  能夠看到,partition0的leader是broker1,parition1的leader是broker0

 

2) 啓動Producer向Kafka集羣發送消息

  輸出:

message send to partition 0, offset: 35 
message send to partition 1, offset: 33 
message send to partition 0, offset: 36 
message send to partition 1, offset: 34 
message send to partition 1, offset: 35 
message send to partition 0, offset: 37 
message send to partition 0, offset: 38 
message send to partition 1, offset: 36 
message send to partition 1, offset: 37

 

3) 登陸SSH將broker0,也就是partition 1的leader kill掉

 

  再次查看Topic狀態:

Topic:test PartitionCount:2 ReplicationFactor:2 Configs: 
  Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1 
  Topic: test Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1

  能夠看到,當前parition0和parition1的leader都是broker1了

 

  此時再去看Producer的輸出:

[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - Connection with /10.0.0.100 disconnected 
java.net.ConnectException: Connection refused: no further information 
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) 
    at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:54) 
    at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:72) 
    at org.apache.kafka.common.network.Selector.poll(Selector.java:274) 
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) 
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) 
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) 
    at java.lang.Thread.run(Thread.java:745)
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 7 to Cluster(nodes = [Node(1, 10.0.0.101, 9092)], partitions = [Partition(topic = test, partition = 1, leader = 1, replicas = [1,], isr = [1,], Partition(topic = test, partition = 0, leader = 1, replicas = [1,], isr = [1,]])

 

  能看到Producer端的DEBUG日誌顯示與broker0的連接斷開了,此時Kafka馬上開始更新集羣metadata,更新後的metadata表示broker1如今是兩個partition的leader,Producer進程很快就恢復繼續運行,沒有漏發任何消息,可以看出Kafka集羣的故障切換機制仍是很厲害的

 

4) 咱們再把broker0啓動起來

bin/kafka-server-start.sh -daemon config/server.properties 

  而後再次檢查Topic狀態:

Topic:test PartitionCount:2 ReplicationFactor:2 Configs: 
   Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0 
   Topic: test Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1,0

  咱們看到,broker0啓動起來了,而且已是in-sync狀態(注意Isr從1變成了1,0),但此時兩個partition的leader還都是 broker1,也就是說當前broker1會承載全部的發送和拉取請求。這顯然是不行的,咱們要讓集羣恢復到負載均衡的狀態。

  這時候,須要使用Kafka的選舉工具觸發一次選舉:

bin/kafka-preferred-replica-election.sh --zookeeper 10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181

  選舉完成後,再次查看Topic狀態:

Topic:test PartitionCount:2 ReplicationFactor:2 Configs: 
   Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0 
   Topic: test Partition: 1 Leader: 0 Replicas: 0,1 Isr: 1,0

  能夠看到,集羣從新回到了broker0掛掉以前的狀態

  但此時,Producer端產生了異常:

org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

  緣由是Producer端在嘗試向broker1的parition0發送消息時,partition0的leader已經切換成了broker0,因此消息發送失敗。

  此時用Consumer去消費消息,會發現消息的編號不連續了,確實漏發了一條消息。這是由於咱們在構造Producer時設定了retries=0,因此在發送失敗時Producer端不會嘗試重發。

  將retries改成3後再次嘗試,會發現leader切換時再次發生了一樣的問題,但Producer的重發機制起了做用,消息重發成功,啓動Consumer端檢查也證明了全部消息都發送成功了。

 

每 次集羣單點發生故障恢復後,都須要進行從新選舉才能完全恢復集羣的leader分配,若是嫌每次這樣作很麻煩,能夠在broker的配置文件(即 server.properties)中配置auto.leader.rebalance.enable=true,這樣broker在啓動後就會自動進 行從新選舉

 

 

至此,咱們經過測試證明了集羣出現單點故障和恢復的過程當中,Producer端可以保持正確運轉。接下來咱們看一下Consumer端的表現:

 

5) 同時啓動Producer進程和Consumer進程

  此時Producer一邊在生產消息,Consumer一邊在消費消息

 

6) 把broker0幹掉,觀察Consumer端的輸出:

能看到,在broker0掛掉後,consumer也端產生了一系列INFO和WARN輸出,但同Producer端同樣,若干秒後自動恢復,消息仍然是連續的,並未出現斷點。

 

7) 再次把broker0啓動,並觸發從新選舉,而後觀察輸出:

fetched from partition 0, offset: 418, message: this is message48 
fetched from partition 0, offset: 419, message: this is message49 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Offset commit for group 1 failed due to NOT_COORDINATOR_FOR_GROUP, will find new coordinator and retry 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking the coordinator 2147483646 dead. 
[main] WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Auto offset commit failed: This is not the correct coordinator for this group. 
fetched from partition 1, offset: 392, message: this is message50 
fetched from partition 0, offset: 420, message: this is message51

  能看到,重選舉後Consumer端也輸出了一些日誌,意思是在提交offset時發現當前的調度器已經失效了,但很快就從新獲取了新的有效調度器,恢復 了offset的自動提交,驗證已提交offset的值也證實了offset提交併未因leader切換而發生錯誤。

 

  如上,咱們也經過測試證明了Kafka集羣出現單點故障時,Consumer端的功能正確性。

相關文章
相關標籤/搜索