在Apache Kafka簡介的前半部分,您使用Kafka開發了幾個小規模的生產者/消費者應用程序。從這些練習中,您應該熟悉Apache Kafka消息傳遞系統的基礎知識。在下半部分,您將學習如何使用分區來分佈負載並橫向擴展應用程序,天天處理多達數百萬條消息。您還將瞭解Kafka如何使用消息偏移來跟蹤和管理複雜的消息處理,以及如何在消費者失敗時保護您的Apache Kafka消息傳遞系統免於失敗。咱們將從第1部分開發用於發佈 - 訂閱和點對點用例的示例應用程序。html
Kafka中的topic能夠細分爲分區。例如,在建立名爲Demo的topic時,您能夠將其配置爲具備三個分區。服務器將建立三個日誌文件,每一個文件分區一個。當生產者向topic發佈消息時,它將爲該消息分配分區ID。而後,服務器將消息僅附加到該分區的日誌文件中。java
若是您隨後啓動了兩個消費者,則服務器可能會將分區1和2分配給第一個消費者,將分區3分配給第二個消費者。每一個消費者只能從其分配的分區中讀取。您能夠在圖1中看到爲三個分區配置的Demo的topic。git
爲了擴展這個場景,想象一下有兩個代理的Kafka集羣,它位於兩臺機器中。分區演示tpoic時,您將其配置爲具備兩個分區和兩個副本。對於此類配置,Kafka服務器會將兩個分區分配給羣集中的兩個broker。每一個broker都是其中一個分區的領導者。github
當生產者發佈消息時,它將轉到分區領導者。領導者將獲取消息並將其附加到本地計算機上的日誌文件中。第二個broker會被動地將該提交日誌複製到本身的機器上。若是分區負責人發生故障,第二個broker將成爲新的領導者並開始提供客戶端請求。以一樣的方式,當消費者向分區發送請求時,該請求將首先發送給分區領導者,分區領導者將返回所請求的消息。數據庫
考慮分區基於Kafka的消息傳遞系統的好處:apache
生產者負責決定消息將進入的分區。生產者有兩種控制這種分配的選擇:編程
org.apache.kafka.clients.producer.Partitioner
接口的類。此自定義Partitioner
將實現業務邏輯以肯定發送消息的位置。org.apache.kafka.clients.producer.internals.DefaultPartitioner
。對於大多數狀況,默認分區程序足夠好,提供三個選項:ProducerRecord
時,使用重載的構造函數new ProducerRecord(topicName, partitionId,messageKey,message)
指定分區ID。ProducerRecord
時,經過調用指定messageKey
的構造方法 new ProducerRecord(topicName,messageKey,message)
。DefaultPartitioner
將使用messageKey
的散列來確保相同messageKey
的全部消息都轉到同一個生產者。這是最簡單也是最經常使用的方法。new ProducerRecord(topicName, message)
以建立您的ProducerRecord
。在這種狀況下,分區程序將以循環方式向全部分區發送消息,從而確保平衡的服務器負載。對於第1部分中的簡單生產者/消費者示例,咱們使用了 DefaultPartitioner
。如今咱們將嘗試建立自定義分區程序。對於此示例,咱們假設咱們有一個零售網站,消費者可使用該網站在世界任何地方訂購產品。根據使用狀況,咱們知道大多數消費者都在美國或印度。咱們但願對咱們的應用程序進行分區,以便未來自美國或印度的訂單發送給各自的消費者,而來自其餘任何地方的訂單將轉發給第三個消費者。api
首先,咱們將建立一個實現org.apache.kafka.clients.producer.Partitioner
接口的CountryPartitioner
。咱們必須實現如下方法:bash
PartitionerMapcountryNameconfigProperties.put("partitions.0","USA")
Producer
爲每一個消息調用partition()方法。在這種狀況下,咱們將使用它來讀取消息並從消息中解析國家/地區的名稱。若是國家的名稱在countryToPartitionMap
,它將返回存儲在Map
的partitionId
若是沒有,它將散列國家的值並使用它來計算它應該去哪一個分區。請注意,當Kafka調用configure()
時,Kafka生成器會將咱們爲生成器配置的全部屬性傳遞給Partitioner
類。咱們必須只讀取那些以partitions.
開頭的屬性,解析它們以獲取partitionId
並存儲ID到 countryToPartitionMap
。服務器
如下是咱們的Partitioner
界面自定義實現。
public class CountryPartitioner implements Partitioner {
private static Map<String,Integer> countryToPartitionMap;
public void configure(Map<String, ?> configs) {
System.out.println("Inside CountryPartitioner.configure " + configs);
countryToPartitionMap = new HashMap<String, Integer>();
for(Map.Entry<String,?> entry: configs.entrySet()){
if(entry.getKey().startsWith("partitions.")){
String keyName = entry.getKey();
String value = (String)entry.getValue();
System.out.println( keyName.substring(11));
int paritionId = Integer.parseInt(keyName.substring(11));
countryToPartitionMap.put(value,paritionId);
}
}
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
Cluster cluster) {
List partitions = cluster.availablePartitionsForTopic(topic);
String valueStr = (String)value;
String countryName = ((String) value).split(":")[0];
if(countryToPartitionMap.containsKey(countryName)){
//If the country is mapped to particular partition return it
return countryToPartitionMap.get(countryName);
}else {
//If no country is mapped to particular partition distribute between remaining partitions
int noOfPartitions = cluster.topics().size();
return value.hashCode()%noOfPartitions + countryToPartitionMap.size() ;
}
}
public void close() {}
}
複製代碼
Producer
清單2(下面)中的類與第1部分中的簡單生成器很是類似,其中兩個更改以粗體標記:
ProducerConfig.PARTITIONER_CLASS_CONFIG
的鍵設置config屬性,該值匹配咱們CountryPartitioner
類的徹底限定名。咱們還設置countryName
爲partitionId
,從而映射了咱們想要傳遞給CountryPartitioner
的屬性。org.apache.kafka.clients.producer.Callback
接口的類的實例做爲producer.send()
方法的第二個參數傳遞。一旦成功發佈消息(附加了RecordMetadata
對象),Kafka客戶端將調用onCompletion()
其方法。咱們將可以使用此對象來找出發送消息的分區,以及分配給已發佈消息的偏移量。public class Producer {
private static Scanner in;
public static void main(String[] argv)throws Exception {
if (argv.length != 1) {
System.err.println("Please specify 1 parameters ");
System.exit(-1);
}
String topicName = argv[0];
in = new Scanner(System.in);
System.out.println("Enter message(type exit to quit)");
//Configure the Producer
Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
configProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CountryPartitioner.class.getCanonicalName());
configProperties.put("partition.1","USA");
configProperties.put("partition.2","India");
org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties);
String line = in.nextLine();
while(!line.equals("exit")) {
ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName, null, line);
producer.send(rec, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println("Message sent to topic ->" + metadata.topic()+ " ,parition->" + metadata.partition() +" stored at offset->" + metadata.offset());
;
}
});
line = in.nextLine();
}
in.close();
producer.close();
}
}
複製代碼
Kafka服務器保證僅將分區分配給一個消費者,從而保證消息的消耗順序。您能夠手動分配分區或自動分配分區。
若是您的業務邏輯須要更多控制,那麼您將須要手動分配分區。在這種狀況下,您將使用KafkaConsumer.assign(<listOfPartitions>)
將每一個消費者感興趣的分區列表傳遞給Kakfa服務器。
自動分配分區是默認和最多見的選擇。在這種狀況下,Kafka服務器將爲每一個使用者分配一個分區,並將從新分配分區以擴展新的使用者。
假設您正在建立一個包含三個分區的新topic。當您爲新topic啓動第一個消費者時,Kafka會將全部三個分區分配給同一個消費者。若是您隨後啓動第二個消費者,Kafka將從新分配全部分區,將一個分區分配給第一個下發者,將剩餘的兩個分區分配給第二個消費者。若是添加第三個消費者,Kafka將再次從新分配分區,以便爲每一個消費者分配一個分區。最後,若是您啓動第四個和第五個消費者,那麼三個消費者將擁有一個分配的分區,但其餘消費者將不會收到任何消息。若是最初的三個分區之一出現故障,Kafka將使用相同的分區邏輯將該消費者的分區從新分配給其餘消費者。
咱們將爲示例應用程序使用自動分配。咱們的大部分消費者代碼都與第1部分中的簡單消費者代碼相同。惟一的區別是咱們將一個實例ConsumerRebalanceListener
做爲第二個參數傳遞給咱們的KafkaConsumer.subscribe()
方法。Kafka每次爲此消費者分配或撤銷分區時都會調用此類的方法。咱們將覆蓋ConsumerRebalanceListener
的onPartitionsRevoked()
和onPartitionsAssigned()
方法,並打印今後訂閱者分配或撤消的分區列表。
private static class ConsumerThread extends Thread {
private String topicName;
private String groupId;
private KafkaConsumer<String, String> kafkaConsumer;
public ConsumerThread(String topicName, String groupId) {
this.topicName = topicName;
this.groupId = groupId;
}
public void run() {
Properties configProperties = new Properties();
configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
//Figure out where to start processing messages from
kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
kafkaConsumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.printf("%s topic-partitions are revoked from this consumer\n", Arrays.toString(partitions.toArray()));
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.printf("%s topic-partitions are assigned to this consumer\n", Arrays.toString(partitions.toArray()));
}
});
//Start processing messages
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.value());
}
} catch (WakeupException ex) {
System.out.println("Exception caught " + ex.getMessage());
} finally {
kafkaConsumer.close();
System.out.println("After closing KafkaConsumer");
}
}
public KafkaConsumer<String, String> getKafkaConsumer() {
return this.kafkaConsumer;
}
}
複製代碼
咱們已準備好運行並測試生產者/消費者應用程序的當前迭代。如前所述,您可使用清單1到清單3中的代碼,或者在GitHub上下載完整的源代碼。
mvn compile assembly:single
。part-demo
: <KAFKA_HOME>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic part-demo
java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.partition.Producer part-demo
java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.partition.Consumer part-demo group1
USA: First order India: First order USA: Second order France: First order
圖2顯示了分區主題中的生產者/消費者輸出。
可以將單個主題劃分爲多個部分是Kafka可擴展性的關鍵。經過分區,您能夠水平擴展消息傳遞基礎結構,同時還能夠維護每一個分區內的順序 接下來,咱們將瞭解Kafka如何使用消息偏移來跟蹤和管理複雜的消息傳遞方案。
我在第1部分中提到,每當生產者發佈消息時,Kafka服務器就會爲該消息分配一個偏移量。消費者可以經過設置或重置消息偏移來控制它想要消費的消息。在開發消費者時,您有兩種管理偏移的選項:自動和手動。
當您在Kafka客戶端中啓動使用者時,它將讀取您的ConsumerConfig.AUTO_OFFSET_RESET_CONFIG(auto.offset.reset)
配置值。若是該配置設置爲
消費者跟蹤它處理的最後一條消息的偏移量,所以它將始終請求偏移量高於最後一個偏移量的消息。當消費者正常運行時,此設置有效,但若是消費者崩潰,或者您想中止維護,會發生什麼?在這種狀況下,您但願使用者記住上次處理的消息的偏移量,以便它能夠從第一個未處理的消息開始。
爲了確保消息持久性,Kafka使用兩種類型的偏移:
若是消費者因爲某種緣由而關閉或被關閉,它能夠向Kafka服務器查詢
__consumer_offsets
的topic中。此數據將複製到多個broker,以便broker不會丟失偏移量。
您能夠選擇提交偏移數據的頻率。若是您常常提交,則會受到性能損失。另外一方面,若是消費者確實失敗了,那麼從新處理和消費的消息就會減小。您的另外一個選擇是減小提交(以得到更好的性能),但在發生故障時從新處理更多消息。在任何一種狀況下,消費者都有兩種提交偏移的選項:
auto.commit
爲true並使用以毫秒爲單位的值設置auto.commit.interval.ms
屬性。啓用此功能後,Kafka使用者將提交poll()
調用而收到的最後一條消息的偏移量。該poll()
調用在auto.commit.interval.ms
後發出。KafkaConsumer
的commitSync()
或commitAsync()
方法。當您發出調用時,使用者將獲取在poll()
期間收到的最後一條消息的偏移量並將其提交給Kafka服務器。讓咱們考慮三種使用狀況,您不但願使用Kafka的默認偏移管理基礎架構。相反,您將手動肯定要從哪一個消息開始。
kafkaConsumer.seekToBeginning(topicPartition)
方法從頭開始讀取。請記住,默認狀況下,Kafka將刪除超過七天的消息,所以您須要爲此用例配置更高的log.retention.hours
值。kafkaConsumer.seekToEnd(topicPartition)
來配置偏移量以忽略停機期間的消息。相反,消費者將開始處理重啓之時發生的消息kafkaConsumer.seek(topicPartition, startingOffset)
手動將偏移量設置爲生成的第一個良好消息。咱們迄今爲止開發的消費者代碼每5秒自動提交一次記錄。如今讓咱們更新消費者以獲取手動設置偏移消耗的第三個參數。
若是使用等於0的最後一個參數的值,則使用者將假定您要從頭開始,所以它將爲每一個分區調用一個kafkaConsumer.seekToBeginning()
方法。若是傳遞值-1,則會假定您要忽略現有消息,而且僅消費在從新啓動使用者後發佈的消息。在這種狀況下,它將爲每一個分區調用kafkaConsumer.seekToEnd()
。最後,若是指定除0或-1之外的任何值,則會假定您已指定了消費者要從中開始的偏移量; 例如,若是您將第三個值傳遞爲5,那麼在從新啓動時,使用者將使用偏移量大於5的消息。爲此,它將調用kafkaConsumer.seek(<topicname>, <startingoffset>)
。
private static class ConsumerThread extends Thread{
private String topicName;
private String groupId;
private long startingOffset;
private KafkaConsumer<String,String> kafkaConsumer;
public ConsumerThread(String topicName, String groupId, long startingOffset){
this.topicName = topicName;
this.groupId = groupId;
this.startingOffset=startingOffset;
}
public void run() {
Properties configProperties = new Properties();
configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "offset123");
configProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
//Figure out where to start processing messages from
kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
kafkaConsumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.printf("%s topic-partitions are revoked from this consumer\n", Arrays.toString(partitions.toArray()));
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.printf("%s topic-partitions are assigned to this consumer\n", Arrays.toString(partitions.toArray()));
Iterator<TopicPartition> topicPartitionIterator = partitions.iterator();
while(topicPartitionIterator.hasNext()){
TopicPartition topicPartition = topicPartitionIterator.next();
System.out.println("Current offset is " + kafkaConsumer.position(topicPartition) + " committed offset is ->" + kafkaConsumer.committed(topicPartition) );
if(startingOffset ==0){
System.out.println("Setting offset to beginning");
kafkaConsumer.seekToBeginning(topicPartition);
}else if(startingOffset == -1){
System.out.println("Setting it to the end ");
kafkaConsumer.seekToEnd(topicPartition);
}else {
System.out.println("Resetting offset to " + startingOffset);
kafkaConsumer.seek(topicPartition, startingOffset);
}
}
}
});
//Start processing messages
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}catch(WakeupException ex){
System.out.println("Exception caught " + ex.getMessage());
}finally{
kafkaConsumer.close();
System.out.println("After closing KafkaConsumer");
}
}
public KafkaConsumer<String,String> getKafkaConsumer(){
return this.kafkaConsumer;
}
}
複製代碼
代碼準備就緒後,您能夠經過執行如下命令對其進行測試:
java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.offset.Consumer part-demo group1 0
複製代碼
Kafka客戶端應該打印偏移量爲0的全部消息,或者您能夠更改最後一個參數的值以在消息隊列中跳轉。
傳統的消息傳遞用例能夠分爲兩種主要類型:點對點和發佈 - 訂閱。在
ConsumerConfig.GROUP_ID_CONFIG
屬性。
若是您對多個消費者使用相同的GROUP_ID_CONFIG
消息,Kafka將假設它們都是單個組的一部分,而且它將僅向一個消費者傳遞消息。若是你在不一樣的group.id
中啓動兩個消費者,Kafka將假設它們不相關,所以每一個消費者將得到它本身的消息副本。
回想一下清單3中的分區使用者將groupId
其做爲第二個參數。如今咱們將使用該groupId
參數爲消費者實現隊列和主題用例。
group-test
兩個分區命名的主題: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic group-test
group-test
剛剛建立的主題的生產者: java -cp target/KafkaDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.partition.Producer group-test
group-test
主題的消息的消費者。使用group1
你的價值group id
。這將爲您提供三個消費者group1
: java -cp target/KafkaDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Consumer group-test group1
group id
to 的值group2
。這將爲您提供三個消費者group1
和一個消費者group2
: java -cp target/KafkaDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Consumer group-test group2
group2
消費者窗口中出現一次,並在三個group1
消費者窗口中出現一次,如圖3所示。大數據消息系統的早期用例須要批處理,例如運行夜間ETL過程或按期將數據從RDBMS移動到NoSQL數據存儲區。在過去幾年中,對實時處理的需求增長,特別是對於欺詐檢測和應急響應系統。Apache Kafka是爲這些類型的實時場景而構建的。
Apache Kafka是一個很好的開源產品,但確實有一些限制; 例如,您沒法在主題到達目標以前從主題內部查詢數據,也不能跨多個地理位置分散的羣集複製數據。您能夠將MapR Streams(商業產品)與Kafka API結合使用,以實現這些和其餘更復雜的發佈 - 訂閱方案。