Kafka是一款很棒的消息系統,能夠看看我以前寫的 後端好書閱讀與推薦來了解一下它的總體設計。今天咱們就來深刻了解一下它的實現細節(我fork了一份代碼),首先關注Producer這一方。java
要使用kafka首先要實例化一個KafkaProducer
,須要有brokerIP、序列化器等必要Properties以及acks(0、一、n)、compression、retries、batch.size等非必要Properties,經過這個簡單的接口能夠控制Producer大部分行爲,實例化後就能夠調用send
方法發送消息了。node
核心實現是這個方法:git
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);//① return doSend(interceptedRecord, callback);//② }
經過不一樣的模式能夠實現發送即忘(忽略返回結果)、同步發送(獲取返回的future對象,回調函數置爲null)、異步發送(設置回調函數)三種消息模式。github
咱們來看看消息類ProducerRecord
有哪些屬性:算法
private final String topic;//主題 private final Integer partition;//分區 private final Headers headers;//頭 private final K key;//鍵 private final V value;//值 private final Long timestamp;//時間戳
它有多個構造函數,能夠適應不一樣的消息類型:好比有無分區、有無key等。segmentfault
①中ProducerInterceptors
(有0 ~ 無窮多個,造成一個攔截鏈)對ProducerRecord
進行攔截處理(好比打上時間戳,進行審計與統計等操做)後端
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) { ProducerRecord<K, V> interceptRecord = record; for (ProducerInterceptor<K, V> interceptor : this.interceptors) { try { interceptRecord = interceptor.onSend(interceptRecord); } catch (Exception e) { // 不拋出異常,繼續執行下一個攔截器 if (record != null) log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e); else log.warn("Error executing interceptor onSend callback", e); } } return interceptRecord; }
若是用戶有定義就進行處理並返回處理後的ProducerRecord
,不然直接返回自己。
而後②中doSend
真正發送消息,而且是異步的(源碼太長只保留關鍵):網絡
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { // 序列化 key 和 value byte[] serializedKey; try { serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { } byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { } // 計算分區得到主題與分區 int partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); // 回調與事務處理省略。 Header[] headers = record.headers().toArray(); // 消息追加到RecordAccumulator中 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs); // 該批次滿了或者建立了新的批次就要喚醒IO線程發送該批次了,也就是sender的wakeup方法 if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future; } catch (Exception e) { // 攔截異常並拋出 this.interceptors.onSendError(record, tp, e); throw e; } }
下面是計算分區的方法:app
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); // 消息有分區就直接使用,不然就使用分區器計算 return partition != null ? partition : partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }
默認的分區器DefaultPartitioner
實現方式是若是partition存在就直接使用,不然根據key計算partition,若是key也不存在就使用round robin算法分配partition。dom
/** * The default partitioning strategy: * <ul> * <li>If a partition is specified in the record, use it * <li>If no partition is specified but a key is present choose a partition based on a hash of the key * <li>If no partition or key is present choose a partition in a round-robin fashion */ public class DefaultPartitioner implements Partitioner { private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>(); public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) {//key爲空 int nextValue = nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);//可用的分區 if (availablePartitions.size() > 0) {//有分區,取模就行 int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else {// 無分區, return Utils.toPositive(nextValue) % numPartitions; } } else {// key 不爲空,計算key的hash並取模得到分區 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement();//返回並加一,在取模的配合下就是round robin } }
以上就是發送消息的邏輯處理,接下來咱們再看看消息發送的物理處理。
Sender
(是一個Runnable
,被包含在一個IO線程ioThread
中,該線程不斷從RecordAccumulator
隊列中的讀取消息並經過Selector
將數據發送給Broker)的wakeup
方法,其實是KafkaClient
接口的wakeup
方法,由NetworkClient
類實現,採用了NIO,也就是java.nio.channels.Selector.wakeup()
方法實現。
Sender
的run
中主要邏輯是不停執行準備消息和等待消息:
long pollTimeout = sendProducerData(now);//③ client.poll(pollTimeout, now);//④
③完成消息設置並保存到信道中,而後監聽感興趣的key,由KafkaChannel
實現。
public void setSend(Send send) { if (this.send != null) throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id); this.send = send; this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); } // transportLayer的一種實現中的相關方法 public void addInterestOps(int ops) { key.interestOps(key.interestOps() | ops); }
④主要是Selector
的poll
,其select被wakeup喚醒:
public void poll(long timeout) throws IOException { /* check ready keys */ long startSelect = time.nanoseconds(); int numReadyKeys = select(timeout);//wakeup使其中止阻塞 long endSelect = time.nanoseconds(); this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) { Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys(); // Poll from channels that have buffered data (but nothing more from the underlying socket) if (dataInBuffers) { keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice Set<SelectionKey> toPoll = keysWithBufferedRead; keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed pollSelectionKeys(toPoll, false, endSelect); } // Poll from channels where the underlying socket has more data pollSelectionKeys(readyKeys, false, endSelect); // Clear all selected keys so that they are included in the ready count for the next select readyKeys.clear(); pollSelectionKeys(immediatelyConnectedKeys, true, endSelect); immediatelyConnectedKeys.clear(); } else { madeReadProgressLastPoll = true; //no work is also "progress" } long endIo = time.nanoseconds(); this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); }
其中pollSelectionKeys
方法會調用以下方法完成消息發送:
public Send write() throws IOException { Send result = null; if (send != null && send(send)) { result = send; send = null; } return result; } private boolean send(Send send) throws IOException { send.writeTo(transportLayer); if (send.completed()) transportLayer.removeInterestOps(SelectionKey.OP_WRITE); return send.completed(); }
Send
是一次數據發包,通常由ByteBufferSend
或者MultiRecordsSend
實現,其writeTo
調用transportLayer
的write
方法,通常由PlaintextTransportLayer
或者SslTransportLayer
實現,區分是否使用ssl:
public long writeTo(GatheringByteChannel channel) throws IOException { long written = channel.write(buffers); if (written < 0) throw new EOFException("Wrote negative bytes to channel. This shouldn't happen."); remaining -= written; pending = TransportLayers.hasPendingWrites(channel); return written; } public int write(ByteBuffer src) throws IOException { return socketChannel.write(src); }
到此就把Producer的業務相關邏輯處理和非業務相關的網絡 2方面的主要流程梳理清楚了。其餘額外的功能是經過一些配置保證的。
好比順序保證就是max.in.flight.requests.per.connection
,InFlightRequests
的doSend
會進行判斷(由NetworkClient
的canSendRequest
調用),只要該參數設爲1便可保證當前包未確認就不能發送下一個包從而實現有序性
public boolean canSendMore(String node) { Deque<NetworkClient.InFlightRequest> queue = requests.get(node); return queue == null || queue.isEmpty() || (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection); }
再好比可靠性,經過設置acks,Sender
中sendProduceRequest
的clientRequest
加入了回調函數:
RequestCompletionHandler callback = new RequestCompletionHandler() { public void onComplete(ClientResponse response) { handleProduceResponse(response, recordsByPartition, time.milliseconds());//調用completeBatch } }; /** * 完成或者重試投遞,這裏若是acks不對就會重試 * * @param batch The record batch * @param response The produce response * @param correlationId The correlation id for the request * @param now The current POSIX timestamp in milliseconds */ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId, long now, long throttleUntilTimeMs) { } public class ProduceResponse extends AbstractResponse { /** * Possible error code: * INVALID_REQUIRED_ACKS (21) */ }
kafka源碼一層一層包裝不少,錯綜複雜,若有錯誤請你們不吝賜教。