4. 使用Kafka的Producer API來完成消息的推送
1) Kafka 0.9.0.1的java client依賴:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency>
2) 寫一個KafkaUtil工具類,用於構造Kafka Client
public class KafkaUtil { private static KafkaProducer<String, String> kp; public static KafkaProducer<String, String> getProducer() { if (kp == null) { Properties props = new Properties(); props.put("bootstrap.servers", "10.0.0.100:9092,10.0.0.101:9092"); props.put("acks", "1"); props.put("retries", 0); props.put("batch.size", 16384); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kp = new KafkaProducer<String, String>(props); } return kp; } }
KafkaProducer<K,V>的K表明每條消息的key類型,V表明消息類型。消息的key用於決定此條消息由哪個partition接收,因此咱們須要保證每條消息的key是不一樣的。
Producer端的經常使用配置
- bootstrap.servers:Kafka集羣鏈接串,能夠由多個host:port組成
- acks:broker消息確認的模式,有三種:
0:不進行消息接收確認,即Client端發送完成後不會等待Broker的確認
1:由Leader確認,Leader接收到消息後會當即返回確認信息
all:集羣完整確認,Leader會等待全部in-sync的follower節點都確認收到消息後,再返回確認信息
咱們能夠根據消息的重要程度,設置不一樣的確認模式。默認爲1 - retries:發送失敗時Producer端的重試次數,默認爲0
- batch.size:當同時有大量消息要向同一個分區發送時,Producer端會將消息打包後進行批量發送。若是設置爲0,則每條消息都DuLi發送。默認爲16384字節
- linger.ms:發送消息前等待的毫秒數,與batch.size配合使用。在消息負載不高的狀況下,配置linger.ms可以讓Producer在發送消息前等待必定時間,以積累更多的消息打包發送,達到節省網絡資源的目的。默認爲0
- key.serializer/value.serializer:消息key/value的序列器Class,根據key和value的類型決定
- buffer.memory:消息緩衝池大小。還沒有被髮送的消息會保存在Producer的內存中,若是消息產生的速度大於消息發送的速度,那麼緩衝池滿後發送消息的請求會被阻塞。默認33554432字節(32MB)
更多的Producer配置見官網:http://kafka.apache.org/documentation.html#producerconfigs
3) 寫一個簡單的Producer端,每隔1秒向Kafka集羣發送一條消息:
public class KafkaTest { public static void main(String[] args) throws Exception{ Producer<String, String> producer = KafkaUtil.getProducer(); int i = 0; while(true) { ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", String.valueOf(i), "this is message"+i); producer.send(record, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) e.printStackTrace(); System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset()); } }); i++; Thread.sleep(1000); } } }
在調用KafkaProducer的send方法時,能夠註冊一個回調方法,在Producer端完成發送後會觸發回調邏輯,在回調方法的 metadata對象中,咱們可以獲取到已發送消息的offset和落在的分區等信息。注意,若是acks配置爲0,依然會觸發回調邏輯,只是拿不到 offset和消息落地的分區信息。
跑一下,輸出是這樣的:
message send to partition 1, offset: 26
message send to partition 0, offset: 29
message send to partition 1, offset: 27
message send to partition 1, offset: 28
message send to partition 0, offset: 30
message send to partition 0, offset: 31
message send to partition 1, offset: 29
message send to partition 1, offset: 30
message send to partition 1, offset: 31
message send to partition 0, offset: 32
message send to partition 0, offset: 33
message send to partition 0, offset: 34
message send to partition 1, offset: 32
乍一看彷佛offset亂掉了,但其實這是由於消息分佈在了兩個分區上,每一個分區上的offset實際上是正確遞增的。
5. 使用Kafka的Consumer API來完成消息的消費
1) 改造一下KafkaUtil類,加入Consumer client的構造。
public class KafkaUtil { private static KafkaProducer<String, String> kp; private static KafkaConsumer<String, String> kc; public static KafkaProducer<String, String> getProducer() { if (kp == null) { Properties props = new Properties(); props.put("bootstrap.servers", "10.0.0.100:9092,10.0.0.101:9092"); props.put("acks", "1"); props.put("retries", 0); props.put("batch.size", 16384); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kp = new KafkaProducer<String, String>(props); } return kp; } public static KafkaConsumer<String, String> getConsumer() { if(kc == null) { Properties props = new Properties(); props.put("bootstrap.servers", "10.0.0.100:9092,10.0.0.101:9092"); props.put("group.id", "1"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kc = new KafkaConsumer<String, String>(props); } return kc; } }
一樣,咱們介紹一下Consumer經常使用配置
- bootstrap.servers/key.deserializer/value.deserializer:和Producer端的含義同樣,再也不贅述
- fetch.min.bytes:每次最小拉取的消息大小(byte)。Consumer會等待消息積累到必定尺寸後進行批量拉取。默認爲1,表明有一條就拉一條
- max.partition.fetch.bytes:每次從單個分區中拉取的消息最大尺寸(byte),默認爲1M
- group.id:Consumer的group id,同一個group下的多個Consumer不會拉取到重複的消息,不一樣group下的Consumer則會保證拉取到每一條消息。注意,同一個group下的consumer數量不能超過度區數。
- enable.auto.commit:是否自動提交已拉取消息的offset。提交offset即視爲該消息已經成功被消費,該組下的Consumer沒法再拉取到該消息(除非手動修改offset)。默認爲true
- auto.commit.interval.ms:自動提交offset的間隔毫秒數,默認5000。
所有的Consumer配置見官方文檔:http://kafka.apache.org/documentation.html#newconsumerconfigs
2) 編寫Consumer端:
public class KafkaTest { public static void main(String[] args) throws Exception{ KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer(); consumer.subscribe(Arrays.asList("test")); while(true) { ConsumerRecords<String, String> records = consumer.poll(1000); for(ConsumerRecord<String, String> record : records) { System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value()); } } } }
運行輸出:
fetched from partition 0, offset: 29, message: this is message2
fetched from partition 0, offset: 30, message: this is message5
fetched from partition 0, offset: 31, message: this is message6
fetched from partition 0, offset: 32, message: this is message10
fetched from partition 0, offset: 33, message: this is message11
fetched from partition 0, offset: 34, message: this is message12
fetched from partition 1, offset: 26, message: this is message1
fetched from partition 1, offset: 27, message: this is message3
fetched from partition 1, offset: 28, message: this is message4
fetched from partition 1, offset: 29, message: this is message7
fetched from partition 1, offset: 30, message: this is message8
fetched from partition 1, offset: 31, message: this is message9
fetched from partition 1, offset: 32, message: this is message13
說明:
- KafkaConsumer的poll方法便是從Broker拉取消息,在poll以前首先要用subscribe方法訂閱一個Topic。
- poll方法的入參是拉取超時毫秒數,若是沒有新的消息可供拉取,consumer會等待指定的毫秒數,到達超時時間後會直接返回一個空的結果集。
- 如 果Topic有多個partition,KafkaConsumer會在多個partition間以輪詢方式實現負載均衡。若是啓動了多個 Consumer線程,Kafka也可以經過zookeeper實現多個Consumer間的調度,保證同一組下的Consumer不會重複消費消息。注 意,Consumer數量不能超過partition數,超出部分的Consumer沒法拉取到任何數據。
- 能夠看出,拉取到的消息並非徹底順序化的,kafka只能保證一個partition內的消息先進先出,因此在跨partition的狀況下,消息的順序是沒有保證的。
- 本 例中採用的是自動提交offset,Kafka client會啓動一個線程按期將offset提交至broker。假設在自動提交的間隔內發生故障(好比整個JVM進程死掉),那麼有一部分消息是會被 重複消費的。要避免這一問題,可以使用手動提交offset的方式。構造consumer時將enable.auto.commit設爲false,並在代 碼中用consumer.commitSync()來手動提交。
若是不想讓kafka控制consumer拉取數據時在partition間的負載均衡,也能夠手工控制:
public static void main(String[] args) throws Exception{ KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer(); String topic = "test"; TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1)); while(true) { ConsumerRecords<String, String> records = consumer.poll(100); for(ConsumerRecord<String, String> record : records) { System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value()); } consumer.commitSync(); } }
使用consumer.assign()方法爲consumer線程指定1個或多個partition。
此處的坑:
題外話:
然而KafkaConsumer並非線程安全的,多個線程操做同一個KafkaConsumer實例會出現各類問題,Kafka官方對於Consumer端的多線程處理給出的指導建議以下:
1. 每一個線程都持有一個KafkaConsumer對象
好處:
- 實現簡單
- 不須要線程間的協做,效率最高
- 最容易實現每一個Partition內消息的順序處理
弊端:
- 每一個KafkaConsumer都要與集羣保持一個TCP鏈接
- 線程數不能超過Partition數
- 每一batch拉取的數據量會變小,對吞吐量有必定影響
2. 解耦,1個Consumer線程負責拉取消息,數個Worker線程負責消費消息
好處:
- 可自由控制Worker線程的數量,不受Partition數量限制
弊端:
- 消息消費的順序沒法保證
- 難以控制手動提交offset的時機
我的認爲第二種方式更加可取,consumer數不能超過partition數這個限制是很要命的,不可能爲了提升Consumer消費消息的效率而把Topic分紅更多的partition,partition越多,集羣的高可用性就越低。
6. Kafka集羣高可用性測試
1) 查看當前Topic的狀態:
/kafka-topics.sh --describe --zookeeper 10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181 --topic test
輸出:
Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1
Topic: test Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
能夠看到,partition0的leader是broker1,parition1的leader是broker0
2) 啓動Producer向Kafka集羣發送消息
輸出:
message send to partition 1, offset: 33
message send to partition 0, offset: 36
message send to partition 1, offset: 34
message send to partition 1, offset: 35
message send to partition 0, offset: 37
message send to partition 0, offset: 38
message send to partition 1, offset: 36
message send to partition 1, offset: 37
3) 登陸SSH將broker0,也就是partition 1的leader kill掉
再次查看Topic狀態:
Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1
Topic: test Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1
能夠看到,當前parition0和parition1的leader都是broker1了
此時再去看Producer的輸出:
java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:54)
at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:72)
at org.apache.kafka.common.network.Selector.poll(Selector.java:274)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
at java.lang.Thread.run(Thread.java:745)
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 7 to Cluster(nodes = [Node(1, 10.0.0.101, 9092)], partitions = [Partition(topic = test, partition = 1, leader = 1, replicas = [1,], isr = [1,], Partition(topic = test, partition = 0, leader = 1, replicas = [1,], isr = [1,]])
能看到Producer端的DEBUG日誌顯示與broker0的連接斷開了,此時Kafka馬上開始更新集羣metadata,更新後的metadata表示broker1如今是兩個partition的leader,Producer進程很快就恢復繼續運行,沒有漏發任何消息,可以看出Kafka集羣的故障切換機制仍是很厲害的
4) 咱們再把broker0啓動起來
bin/kafka-server-start.sh -daemon config/server.properties
而後再次檢查Topic狀態:
Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: test Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1,0
咱們看到,broker0啓動起來了,而且已是in-sync狀態(注意Isr從1變成了1,0),但此時兩個partition的leader還都是 broker1,也就是說當前broker1會承載全部的發送和拉取請求。這顯然是不行的,咱們要讓集羣恢復到負載均衡的狀態。
這時候,須要使用Kafka的選舉工具觸發一次選舉:
bin/kafka-preferred-replica-election.sh --zookeeper 10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181
選舉完成後,再次查看Topic狀態:
Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: test Partition: 1 Leader: 0 Replicas: 0,1 Isr: 1,0
能夠看到,集羣從新回到了broker0掛掉以前的狀態
但此時,Producer端產生了異常:
緣由是Producer端在嘗試向broker1的parition0發送消息時,partition0的leader已經切換成了broker0,因此消息發送失敗。
此時用Consumer去消費消息,會發現消息的編號不連續了,確實漏發了一條消息。這是由於咱們在構造Producer時設定了retries=0,因此在發送失敗時Producer端不會嘗試重發。
將retries改成3後再次嘗試,會發現leader切換時再次發生了一樣的問題,但Producer的重發機制起了做用,消息重發成功,啓動Consumer端檢查也證明了全部消息都發送成功了。
至此,咱們經過測試證明了集羣出現單點故障和恢復的過程當中,Producer端可以保持正確運轉。接下來咱們看一下Consumer端的表現:
5) 同時啓動Producer進程和Consumer進程
此時Producer一邊在生產消息,Consumer一邊在消費消息
6) 把broker0幹掉,觀察Consumer端的輸出:
能看到,在broker0掛掉後,consumer也端產生了一系列INFO和WARN輸出,但同Producer端同樣,若干秒後自動恢復,消息仍然是連續的,並未出現斷點。
7) 再次把broker0啓動,並觸發從新選舉,而後觀察輸出:
fetched from partition 0, offset: 419, message: this is message49
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Offset commit for group 1 failed due to NOT_COORDINATOR_FOR_GROUP, will find new coordinator and retry
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking the coordinator 2147483646 dead.
[main] WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Auto offset commit failed: This is not the correct coordinator for this group.
fetched from partition 1, offset: 392, message: this is message50
fetched from partition 0, offset: 420, message: this is message51
能看到,重選舉後Consumer端也輸出了一些日誌,意思是在提交offset時發現當前的調度器已經失效了,但很快就從新獲取了新的有效調度器,恢復 了offset的自動提交,驗證已提交offset的值也證實了offset提交併未因leader切換而發生錯誤。
如上,咱們也經過測試證明了Kafka集羣出現單點故障時,Consumer端的功能正確性。