一,基礎講解java
本文是基於kafka 0.10講的,kafkaProducer模型和0.8的客戶端模型大體是同樣的,區別是0.8版本的會爲每一個Broker(有給定topic分區leader的Broker)建立一個SyncProducer,而0.10的Producer是用一個NioSelector實現實現了多連接的維護的。也是一個後臺線程進行發送。基本步驟,也是按期獲取元數據,將消息按照key進行分區後歸類,每一類發送到正確的Broker上去。node
再寫kafka文章的緣由是0.10版本後跟spark結合有了大的變更,後面會講解多版本的sparkStreaming和StructuredStreaming 與kafka的各類結合。因此在這裏會更新兩篇kafka文章:一篇關於kafka 0.10版本的Producer,另外一篇固然是kafka 0.10版本的Consumer了。爲後面的文章打下基礎。apache
二,重要類講解安全
Cluster微信
表明一個當前kafka集羣的nodes,topics和partitions子集網絡
Selectorapp
org.apache.kafka.common.network.Selector。一個nioSelector的接口,負責非阻塞多連接網絡I/O操做。該類於NetworkSend和NetworkReceive協同工做,傳輸大小限制的網絡請求和應答。一個新的連接能夠被加入到該nioSelector,固然須要配上一個id,經過調用機器學習
connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize)異步
內部維護了一個java NIOSelector函數
java.nio.channels.Selector nioSelector;
NetworkClient
一個針對異步請求/應答的網絡IO 的網絡客戶端。這是一個內部類,用來實現用戶層面的生產消費者客戶端。非線程安全的。
Sender
一個後臺線程,主要負責發送生產請求到kafka集羣。該線程會更新kafka集羣的metadata,將produce Request發送到正確的節點。
RecordAccumulator
該類扮演的是一個隊列的角色,將records追加到MemoryRecords實例中,用於發送到server端。
RecordBatch
一批准備發送的消息。該類是線程不安全的,須要加入外部同步加入須要修改的話。
MemoryRecords
用一個byteBuffer支撐的Records的實現。
RecordAccumulator維護了一個ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
RecordBatch維護了一個MemoryRecords。
三,源碼過程
1,構建必要對象的過程
用戶代碼裏會構建一個KafkaProducer對象。
producer = new KafkaProducer<>(props);
在構造函數裏活作三個重要的的事情
A),new Selector傳遞給NetworkClient
B),new NetworkClient
C),new Sender
D),new KafkaThread並將構建的send對象,當作該線程的runnable。並啓動該線程。
E),構建了分區器和一個Metatdata。
F),構建了一個RecordAccumulator。此時須要關注的兩個配置是
batch.size:批量發送的大小。
linger.ms:超時發送的時間。
合理配置兩個值,有利於咱們提高kafkaProducer的性能。
2,消息加入發送隊列的過程
1),用戶程序裏調用KafkaProducer.send發送消息
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr), new DemoCallBack(startTime, messageNo, messageStr));
2),對消息按照partition策略進行分區。
//獲取分區號
int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
3),將消息追加到RecordAccumulator。
//將消息追加到RecordAccumulator
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
具體追加的細節,先根據topic和partition信息獲取一個recordBatch,而後在獲取MemoryRecords,將消息加入其中
//根據topic和partition信息獲取該partition的隊列
Deque<RecordBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {//RecordBatch類非安全,須要加外部同步
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null)
return appendResult;
}
tryAppend內部
// 獲取最後一個RecordBatch
RecordBatch last = deque.peekLast();
if (last != null) {
// 將消息追加到該RecordBatch裏面
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
Last.TryAppend
// 首先會判斷是否有充足的空間
if (!this.records.hasRoomFor(key, value)) {
return null;
} else {
// 將消息加入memoryRecords
long checksum = this.records.append(offsetCounter++, timestamp, key, value);
this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
key == null ? -1 : key.length,
value == null ? -1 : value.length);
if (callback != null)
thunks.add(new Thunk(callback, future));
this.recordCount++;
return future;
}
3,消息發送的過程
1),獲取Cluster
//獲取當前cluster信息,
Cluster cluster = metadata.fetch();
2),獲取那些有數據待發送的分區,依據是batch.size和linger.ms
//獲取當前準備好發送的有數據的分區
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
3),更新那些leader未知的分區信息
if (result.unknownLeadersExist)
this.metadata.requestUpdate();
4),移除不能發送Request的node
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
//判斷鏈接是否能發消息
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
5),轉化爲list格式,以node爲基準,清空那些給定的node數據
清空全部給定node的數據,而後將它們放到給定適合大小的list,以單個node爲基準
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
result.readyNodes,
this.maxRequestSize,
now);
6),以node爲基準,將batches轉化爲ProducerRequests
//以單個node爲基準,將batches數據轉化爲ProducerRequests
List<ClientRequest> requests = createProduceRequests(batches, now);
7),發送數據
for (ClientRequest request : requests)
client.send(request, now);
8),作真正的網絡讀寫的動做,以前會更新元數據
this.client.poll(pollTimeout, now);
四,總結
寫本文的緣由是爲StructuredStreaming的系列文章之kafkaSink作準備。
1,具體調優請參考kafka系列文章。
2,性能調優的參數重要的就兩個
batch.size:批量發送的大小。
linger.ms:超時發送的時間。
3,具體跟0.8.2.2區別,請參考:
假如你對kafka,spark,hbase源碼,spark機器學習等感興趣,請關注浪尖公衆號。一塊兒探討進步。
本文分享自微信公衆號 - 浪尖聊大數據(bigdatatip)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。