接上文:《架構設計:系統間通訊(28)——Kafka及場景應用(中1)》css
咱們在上文中已經討論了Kafka使用分區的概念存儲消息,一個topic能夠有多個分區它們分佈在整個Kafka集羣的多個Broker服務節點中,而且一條消息只會按照消息生產者的要求進入topic的某一個分區。那麼問題來了:若是某個分區中的消息在被消費端Pull以前,承載該分區的Broker服務節點就由於各類異常緣由崩潰了,那麼在這個Broker從新啓動前,消費者就沒法收到消息了。html
爲了解決這個問題,Apache Kafka在V 0.8+版本中加入了複製功能:讓topic下的每個分區存儲到多個Broker服務節點上,並由Zookeeper統一管理它們的狀態。java
請注意Kafka中Partition(分區)和replication(複製)是兩個徹底不一樣的概念,不少讀者容易將這兩個概念混淆——雖然它們都和「如何存儲消息」這件事情有關:前者是說將若干條消息按照必定的規則分別存放在不一樣的區域,一條消息只存入一個區域(且Topic下多個分區能夠存在於同一個Broker上);後者是說,爲了保證消息在被消費前不會丟失,須要將某一個區域中的消息集合複製出多個副本(同一個分區的多個副本不能存放在同一個Broker上)。web
Kafka將分區的多個副本分爲兩種角色:Leader和Follower,Leader Broker是主要服務節點,消息只會從消息生產者發送給Leader Broker,消息消費者也只會從Leader Broker中Pull消息。Follower Broker爲副本服務節點,正常狀況下不會公佈給生產者或者消費者直接進行操做。Follower Broker服務節點將會主動從Leader Broker上Pull消息。apache
在這種工做機制下,Follower和Leader的消息複製過程因爲Follower服務節點的性能、壓力、網絡等緣由,它們和Leader服務節點會有一個消息差別性。當這個差別性擴大到必定的範圍,Leader節點就會認爲這個Follower節點再也跟不上本身的節奏,致使的結果就是Leader節點會將這個Follower節點移出「待同步副本集」ISR(in-sync replicas),再也不關注這個Follower節點的同步問題。後端
只有當ISR中全部分區副本所有完成了某一條消息的同步過程,這條消息纔算真正完成了「記錄」操做。只有這樣的消息纔會發送給消息消費者。至於這個真正完成「記錄」操做的通知是否能返回給消息生產者,徹底取決於消息生產者採用的acks模式(後文會講到)。api
如今咱們能夠回過頭看看上文中4-1-3-5小節給出的「查看Topic狀態」命令以及命令結果:緩存
# 腳本命令範例
kafka-topics.sh --describe --zookeeper 192.168.61.139:2181 --topic my_topic2
# 顯示的結果
Topic:my_topic2 PartitionCount:4 ReplicationFactor:2 Configs:
Topic: my_topic2 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: my_topic2 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: my_topic2 Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: my_topic2 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
以上命令行用於顯示指定topic名稱的基本狀態信息。Partition表示分區號,Replicas表示全部副本的所在位置的Broker.id信息,Isr表示當前狀態正常能夠進行消息複製的副本所在位置的Broker.id信息。服務器
那麼從命令結果來看,名叫「my_topic2」的topic一共有4個數據分區,每個分區有兩個副本。其中:0號分區的Leader Broker服務節點的id爲2,0號分區的兩個副本分別在id爲2和id爲1的Broker服務節點上,且id爲2和id爲1的Broker上的副本狀態都是正常的;同理,1號分區的Leader Broker服務節點的id爲1,1號分區的兩個副本分別在id爲2和id爲1的Broker服務節點上,且id爲2和id爲1的Broker上的副本狀態都是正常的。。。網絡
請注意以前咱們給出的Kafka集羣方案的示意圖,在圖中消息生產者並無鏈接到zookeeper協調服務,而是直接和多個Kafka Server Brokers創建了鏈接。和其餘種類的消息隊列的設計不一樣,在整個Kafka方案中消息生產者(Producer)會有不少重要規則的決定權,例如:
消費生產者(Producer)能夠決定向指定的Topic的哪個分區(Partition)發送消息。而不是由Broker來決定。
消息生產者(Producer)能夠決定消息達到Kafka Broker後,Producer對消息的一致性關注到什麼樣的級別,又或者根本不關心消息在Broker上的一致性問題。
消息生產者(Producer)能夠決定是以同步方式(sync)仍是異步方式(aSync)向Broker Server List發送消息。
在異步方式下,消費生產者(Producer)還能夠決定以什麼樣的間隔(週期)向Broker Server List發送消息。
隨機選定Broker Server List中某一個服務節點,讀取當前Topic下的分區和複製表信息,並保存在本地Pool中的工做也是由消息生產者(Producer)主動完成。
另外,Kafka中的消息生產者沒有相似ActiveMQ中那樣的事務機制(可參見文章《架構設計:系統間通訊(23)——提升ActiveMQ工做性能(中)》)。這樣的設計和Kafka主要的業務場景有關——用來收集各類操做日誌。這樣的場景對消息的可靠性要求並不高:漏掉一兩條日誌並不影響後端大數據平臺對日誌數據的分析結果;並且這樣的設計大量簡化了Broker的設計結構:它不須要像ActiveMQ那樣專門爲達成傳輸但還未進行commit的消息專門建立存儲區域「transaction store」,並在進行了commit或者rollback操做後進行標記。這種處理機制是Apache Kafka高效性能的又一種保障。
Kafka中的多個消息生產者(Producer)並不須要ZooKeeper服務中的任何信息爲它們協調發送過程,由於沒有什麼可協調的。生產者惟一須要知道的Topic有多少個分區以及每一個分區,分別存在於哪些Broker上的信息都是來源於對某一個Broker的直接查詢。因此Kafka集羣中只剩下了Broker和Consumer須要進行協調(這個問題會在後文中進行詳細討論)。
這是分佈式系統建設思想中一個重要的原則——不可濫用協調裝置:完成同一件工做時,協調N個參與角色要比協調N-1個參與角色耗費更多的時間和性能;因此,只協調須要協調的角色,只通知須要通知的事件,只爲協調過程存儲必要的數據。我在後續的寫做中,會專門爲讀者詳細介紹Kafka中消息生產者的實現過程,這裏面有不少設計思想能夠在各位的實際工做中借鑑。
下面的代碼使用Kafka的Java Client API演示消息生產者的使用。這裏咱們使用的Kafka Java Client API的版本是V0.8.2.2,您能夠直接引入Maven的官方庫依賴便可:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version>
</dependency>
如下是Kafka消息生產者的代碼,以前咱們已經經過Kafka的命令腳本建立了一個擁有4個分區的Topic——my_topic2:
package kafkaTQ;
import java.util.Date;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/** * kafka消息生產者演示, * @author yinwenjie */
public class KafkaProducer {
public static void main(String[] args) throws RuntimeException {
Properties props = new Properties();
// 指定kafka節點列表,不須要由zookeeper進行協調
// 而且鏈接的目的也不是爲了發送消息,而是爲了在這些節點列表中選取一個,來獲取topic的分區情況
props.put("metadata.broker.list", "192.168.61.138:9092");
// 使用這個屬性能夠指定「將消息送到topic的哪個partition中」,若是業務規則比較複雜的話能夠指定分區控制器
// 不過開發者最好要清楚topic有多少個分區,這樣纔好進行多線程(負載均衡)發送
//props.put("partitioner.class", "kafkaTQ.PartitionerController");
// 能夠經過這個參數控制是異步發送仍是同步發送(默認爲「同步」)
//props.put("producer.type", "async");
// 能夠經過這個屬性控制複製過程的一致性規則
//props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
// 建立消費者
Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(config);
// 因爲咱們爲topic建立了四個partition,因此將數據分別發往這四個分區
for (Integer partitionIndex = 0; ; partitionIndex++) {
Date time = new Date();
// 建立和發送消息,能夠指定這條消息的key,producer根據這個key來決定這條消息發送到哪一個parition中
// 另一個能夠決定parition的方式是實現kafka.producer.Partitioner接口
String messageContext_Value = "this message from producer 由producer指的partitionIndex:[" + partitionIndex % 4 + "]" + time.getTime();
System.out.println(messageContext_Value);
byte[] messageContext = messageContext_Value.getBytes();
byte[] key = partitionIndex.toString().getBytes();
// 這是消息對象,請注意第二個參數和第三個參數,下一小節將會進行詳細介紹
KeyedMessage<byte[], byte[]> message = new KeyedMessage<byte[], byte[]>("my_topic2", key , partitionIndex % 4 , messageContext);
producer.send(message);
// 休息0.5秒鐘,循環發
synchronized (KafkaProducer.class) {
try {
KafkaProducer.class.wait(500);
} catch (InterruptedException e) {
e.printStackTrace(System.out);
}
}
}
}
}
開發人員能夠在消息生產者端指定發送的消息將要傳送到Topic下的哪個分區(partition),但前提條件是開發人員清楚這個Topic有多少個分區,不然開發人員就不知道怎麼編寫代碼了。固然開發人員也能夠徹底忽略決定分區的規則,這時將由消費者端攜帶的一個默認規則決定。
開發人員能夠有兩種方式進行分區指定:第一種方法是以上代碼片斷中演示的那樣,在建立消息對象KeyedMessage時,指定方法中partKey/key的值;另外一種方式是從新實現kafka.producer.Partitioner接口,以便覆蓋掉默認實現。
使用KeyedMessage類構造消息對象時,能夠指定4個參數,他們分別是:topic名稱、消息Key、分區Key和message消息內容。topic名稱和message消息內容很容易理解,可是怎樣理解消息Key和分區Key呢?如下是KeyedMessage類的源代碼(Scala語言):
package kafka.producer
/** * A topic, key, and value. * If a partition key is provided it will override the key for the purpose of partitioning but will not be stored. */
case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) {
if(topic == null)
throw new IllegalArgumentException("Topic cannot be null.")
def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)
def this(topic: String, key: K, message: V) = this(topic, key, key, message)
def partitionKey = {
if(partKey != null)
partKey
else if(hasKey)
key
else
null
}
def hasKey = key != null
}
KeyedMessage類的構造函數中有一個局部變量:partitionKey,在KeyedMessage類的首行註釋中,對該變量進行了一個說明:
If a partition key is provided it will override the key for the purpose of partitioning but will not be stored.
從源碼中能夠看出,partitionKey優先使用partKey做爲分區依據,若是partKey沒有被賦值,則使用key做爲分區依據。因此在使用KeyedMessage類的構造函數時,partKey和key您只須要指定其中的一個就徹底夠了。
您還能夠實現kafka.producer.Partitioner接口,並在建立消費者對象時進行指定,以便實現分區的指定(若是不進行指定,默認的實現類爲「kafka.producer.DefaultPartitioner」)。代碼片斷以下:
package kafkaTQ;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class PartitionerController implements Partitioner {
/** * 必需要有這個構造函數 * @param vp */
public PartitionerController(VerifiableProperties vp) {
}
/* (non-Javadoc) * @see kafka.producer.Partitioner#partition(java.lang.Object, int) */
@Override
public int partition(Object parKey, int partition) {
/* * 在這裏您能夠根據自身的業務過程從新運算一個分區,並進行返回。 */
Integer parKeyValue = (Integer)parKey;
return parKeyValue;
}
}
須要實現的partition方法中,第一個參數是您在建立消息時所傳遞的partyKey(這是的partyKey不必定傳入Integer),第二個參數是send方法根據自身內部機制決定的目標分區。
消息生產這還能夠決定是以同步方式向Broker發送消息仍是以異步方式向Broker發送消息。只須要使用生產者配置中的「producer.type」屬性進行指定。當該屬性值爲「sync」時,表示使用同步發送的方式;當該屬性值爲「async」時,表示使用異步發送方式。
在異步發送方式下,開發人員調用send方法發送消息時,這個消息並不會當即被髮送到topic指定的Leader partition所在的Broker,而是會存儲在本地的一個緩衝區域(必定注意是客戶端本地)。當緩衝區的狀態知足最長等待時間或者最大數據量條數時,消息會以一個設置值批量發送給Broker。以下圖所示:
緩存區的數據按照batch.num.messages設置的數值被一批一批的發送給目標Broker(默認爲200條),若是消息的滯留時間超過了queue.buffering.max.ms設置的值(單位毫秒,默認值爲5000)就算沒有達到batch.num.messages的數值,消息也會被髮送。
若是因爲Broker的緣由致使消息發送緩慢,這時在本地待發送消息緩存區中的消息就有可能達到
queue.buffering.max.messages設置的緩存區容許存儲的最大消息數量,一旦達到這個數量消息生產者端再次調用send方法的時候,send方法所在線程就會被阻塞,直到緩存區有足夠的空間可以放下新的數據爲止。
Kafka中的消息生產者還能夠配置發送的消息在Broker端以哪一種方式進行副本複製:強一致性複製仍是弱一致性複製,又或者不關注消息的一致性。(在分佈式系統中強一致性、弱一致性和最終一致性是一個很是關鍵的知識點,它們是CAP原則重要的實踐,我將會在「存儲」專題中進行對它們的定義和主流的實現方式進行講解)
在Kafka的實現中,強一致性複製是指當Leader Partition收到消息後,將在全部Follower partition完成這條消息的複製後才認爲消息處理成功,並向消息生產者返回ack信息;弱一致性複製是指當Leader partition收到消息後,只要Leader Broker本身完成了消息的存儲就認爲消息處理成立,並向消息生產者返回ack信息(複製過程隨後由Broker節點自行完成);
您能夠經過消息生產者配置中的「request.required.acks」屬性來設置消息的複製性要求。在官方文檔中,對於這個屬性的解釋是:
acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect . The offset given back for each record will always be set to -1.
當acks設置爲0時,生產者端不會等待Server Broker回執任何的ACK確認信息。只是將要發送的消息交給網絡層。這種狀況下,消息是否真的到達了Server Broker,實際上生產者端並不知道。因爲生產者端並不等待Server Broker回執任何的ACK確認信息,那麼消息一旦傳輸失敗(例如,等待超時的狀況)「重試」過程就無從談起了。因爲生產者端在這種狀況下發送的消息,極可能Server Broker還沒來得及處理,甚至更有可能Server Broker都沒有接收到,因此Server Broker也沒法告知生產者這條消息在分區中的偏移位置。
acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.
當acks設置爲1時,生產者發送消息將等待這個分區的Leader Server Broker 完成它本地的消息記錄操做,但不會等待這個分區下其它Follower Server Brokers的操做。在這種狀況下,雖然Leader Server Broker對消息的處理成功了,也返回了ACK信息給生產者端,可是在進行副本複製時,仍是可能失敗。
acks=all (#注:原文如此,實際上屬性值爲「-1」)This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.
當acks設置爲「all」時,消息生產者發送消息時將會等待目標分區的Leader Server Broker以及全部的Follower Server Brokers所有處理完,纔會獲得ACK確認信息。這樣的處理邏輯下犧牲了一部分性能,可是消息存儲可靠性是最高的。
在2013年2月2日,Kafka的主要參與者Neha Narkhede發表了一篇講解Kafka Replication過程的技術文檔(算是官方文檔)《Kafka Replication》,在這篇文檔的Synchronous replication-write章節Neha Narkhede這樣描述了「寫」過程前的準備工做:
To publish a message to a partition, the client first finds the leader of the partition from Zookeeper and sends the message to the leader。
這句話的大意是:爲了發送消息到一個分區,客戶端首先要經過zookeeper查詢到這個分區的Leader Broker在哪一個位置,而且向這個Leader Broker發送信息。國內一些譯文由此也造成了相關的中文描述。這顯然與本文中提到的「生產者不須要鏈接zookeeper進行任何協調操做」的描述徹底矛盾。
這裏衝突的重點在於「生產者在發送消息時,是直接鏈接到了zookeeper服務查詢相關信息,仍是鏈接到某一個已知的Broker查詢現信息?」
那麼咱們只能以實驗的形式實際驗證一下消息生產者在建立、發送消息的過程當中是否須要鏈接zookeeper。實際上筆者經過閱讀0.8.2.2版本的JAVA Client For Producer API 部分的的源碼,真沒有發現Producer直接鏈接zookeeper的證據(主要的類位置包括:kafka.producer.OldProducer、kafka.producer.ProducerPool、kafka.producer.SyncProducer、org.apache.kafka.clients.producer.internals.Sender和org.apache.kafka.clients.producer.KafkaProducer)。可是這顯然不具備太大的說服力,畢竟極可能出現漏讀代碼的狀況。
驗證明驗基於以前咱們已經搭建的Apacke Kafka集羣環境,192.168.61.140服務器上運行着一個standalone模式的zookeeper服務。在實驗中,咱們使用192.168.61.140服務器上自帶的防火牆,設置只有兩個Kafka Broker服務節點(139和138)可以訪問zookeeper上的2181端口。並在這種狀況下觀察消息生產者的工做狀況(以及相同topic下的消費者是能正常收到生產者發送的消息)。以下圖所示:
[root@zk ~]# service iptables status
Table: filter
Chain INPUT (policy ACCEPT)
num target prot opt source destination
1 ACCEPT tcp -- 192.168.61.138 0.0.0.0/0 tcp dpt:2181
2 ACCEPT tcp -- 192.168.61.139 0.0.0.0/0 tcp dpt:2181
3 ACCEPT icmp -- 0.0.0.0/0 0.0.0.0/0
4 REJECT all -- 0.0.0.0/0 0.0.0.0/0 reject-with icmp-host-prohibited
Chain FORWARD (policy ACCEPT)
num target prot opt source destination
1 REJECT all -- 0.0.0.0/0 0.0.0.0/0 reject-with icmp-host-prohibited
Chain OUTPUT (policy ACCEPT)
num target prot opt source destination
在全端口開放ICMP協議,只是爲了可以使用ping命令進行檢查。
# 在140上啓動zookeeper,而且肯定它以standalone模式運行
[root@zk ~]# zkServer.sh start
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@kp2 ~]# zkServer.sh status
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: standalone
首先測試在140節點開啓防火牆的狀況下,producer所在的192.168.61.1服務節點是否能順利鏈接到2181端口(使用telnet命令):
telnet 192.168.61.140 2181
Trying 192.168.61.140...
telnet: connect to address 192.168.61.140: Connection refused
而後關閉140上的防火牆,再使用一樣的telnet命令進行測試:
telnet 192.168.61.140 2181
Trying 192.168.61.140...
Connected to 192.168.61.140.
Escape character is '^]'.
能夠看到140啓動防火牆後,192.168.61.1服務節點不能鏈接到140服務的2181端口。這說明咱們設置的實驗前提的確起到了限制192.168.61.1節點訪問140節點上zookeeper服務的做用。
接下來咱們從新開啓140上防火牆,啓動140上的zookeeper服務,啓動139和138上的Kafka Broker服務,讓整個Kafka Broker集羣工做起來。正式開始進行實驗:
// 生產者的測試代碼就採用4-4-1小節中咱們給出的代碼樣例。
// 很顯然在140開啓防火牆,producer沒法鏈接zookeeper的狀況下
// producer也可以正常工做。如下是producer程序打印的運行信息
this message from producer 由producer指的partitionIndex:[0]1462439421320
this message from producer 由producer指的partitionIndex:[1]1462439429482
this message from producer 由producer指的partitionIndex:[2]1462439437655
this message from producer 由producer指的partitionIndex:[3]1462439441987
因此爲了確認這些發送出去的消息可以被消費者接收到,在進行producer測試工做的同時,咱們在138節點上,使用kafka-console-consumer運行了一個對應的消費者以便接收數據(不能在138節點或者139節點之外運行consumer,由於沒法鏈接zookeeper服務節點的2181端口)。如下是消費者接收到的信息:
[root@activemq ~]# kafka-console-consumer.sh --zookeeper 192.168.61.140:2181 --topic my_topic2
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
this message from producer 由producer指的partitionIndex:[0]1462439421320
this message from producer 由producer指的partitionIndex:[1]1462439429482
this message from producer 由producer指的partitionIndex:[2]1462439437655
this message from producer 由producer指的partitionIndex:[3]1462439441987
從以上小節的實驗狀況,咱們看到的結果是:Producer所在的服務節點192.168.61.1,在不能訪問192.168.61.140節點上zookeeper服務的狀況下,包括Producer在內的整個Kafak集羣可以正常工做,消費者端可以正常消費數據。那麼問題來了,做爲Kafka的主要參與者Neha Narkhede在這樣的官方文檔中是不太可能出現這樣的低級錯誤的,那麼是什麼緣由呢?固然若是真要說到出錯,那麼筆者本身出錯的可能性卻是要高得多。筆者認爲形成這種衝突的緣由可能有如下幾種:
這篇文章是在2013年2月份發佈的,那時候主流的Kafka版本是V0.7.X。可是筆者在實際工做中並無使用任何V0.7.X版本,因此對V0.7.X版本中是否須要生產者鏈接zookeeper並無肯定的答案。
在Neha Narkhede的這段話中,Client並非指代的消息生產者,而是泛指的使用zookeeper服務的各類客戶端角色。若是是這樣的話,那麼Client最有可能指代的就是Server Broker。
以上實驗中,並無限制住producer直接訪問zookeeper的全部狀況。防火牆功能可能出現了問題,又或者producer自行經過一個隱藏的端口(例如:9999)訪問到了zookeeper。
筆者尚未考慮到的其它可能性。歡迎各位讀者留言討論。
======================================== (接下文)