某系統使用 Kafka 存儲實時的行情數據,爲了保證數據的實時性,須要在多地機房維護多個 Kafka 集羣,並將行情數據同步到這些集羣上。java
一個經常使用的方案就是官方提供的 KafkaMirrorMaker 方案:git
該方案的優勢是能儘量保證兩個 Kafka 集羣的數據一致(爲了不網絡故障致使丟數據,要將其與 Kafka Cluster B 部署在同個機房),而且使用者無需進行開發工做,只須要進行響應的配置便可。github
行情數據具備數據量大且時效性強的特色:apache
所以 KafkaMirrorMaker 的同步方式存在如下兩個不合理的地方:api
主要的發送流程發送流程以下:安全
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { // 1. 阻塞獲取集羣信息,超時後拋出異常 ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); Cluster cluster = clusterAndWaitTime.cluster; // 2. 序列化要發送的數據 byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); // 3. 決定數據所屬的分區 int partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); // 4. 將數據追加到發送緩衝,等待發送線程異步發送 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs); // 5. 喚醒異步發送線程,將緩衝中的消息發送給 brokers if (result.batchIsFull || result.newBatchCreated) { this.sender.wakeup(); } return result.future; } catch (Exception e) { // ... } }
Producer 的功能是向某個 topic 的某個分區消息,因此它首先須要確認到底要向 topic 的哪一個分區寫入消息:網絡
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { // 若是 key 爲空,使用 round-robin 策略確認目標分區(保證數據均勻) int nextValue = nextValue(topic); return Utils.toPositive(nextValue) % numPartitions; } else { // 若是 key 不爲空,使用 key 的 hash 值確認目標分區(保證數據有序) return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
爲了保證防止過量消息積壓在內存中,每一個 Producer 會設置一個內存緩衝,其大小由buffer.memory
選項控制。
若是緩衝區的數據超過該值,會致使Producer.send
方法阻塞,等待內存釋放(記錄被髮送出去或超時後被清理):app
public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock) throws InterruptedException { ByteBuffer buffer = null; if (headers == null) headers = Record.EMPTY_HEADERS; try { // 若是緩衝中存在未滿的 ProducerBatch,則會嘗試將記錄追加到其中 // ... // 估計記錄所須要的空間 byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); // 分配內存空間給當前記錄 // 若是內存空間不足則會阻塞等待內存空間釋放,若是超過等待時間會拋出異常 buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { // 再次嘗試向現存的 ProducerBatch 中追加數據,若是成功則直接返回 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) { return appendResult; } // 新建 ProducerBatch 並將當前記錄追加到其中 MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds())) ; dq.addLast(batch); buffer = null; return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true); } } finally { if (buffer != null) free.deallocate(buffer); } }
每一個 Producer 都有一個發送線程,該線程會不停地調用Sender.sendProducerData
方法將緩衝中的 RecordBatch 發送出去:框架
private long sendProducerData(long now) { Cluster cluster = metadata.fetch(); // 獲取就緒的 broker 節點信息,準備發送 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); if (!result.unknownLeaderTopics.isEmpty()) { // 若是部分 topic 沒有 leader 節點,則觸發強制刷新 for (String topic : result.unknownLeaderTopics) this.metadata.add(topic); this.metadata.requestUpdate(); } // 根據就緒 broker 節點信息,獲取緩衝中對應的 ProducerBatch,準備發送 Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); if (guaranteeMessageOrder) { // 排除已經檢查過的分區,避免重複檢查 for (List<ProducerBatch> batchList : batches.values()) { for (ProducerBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } } // 清理已通過期的 ProducerBatch 數據,釋放被佔用的緩衝內存 List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now); if (!expiredBatches.isEmpty()) log.trace("Expired {} batches in accumulator", expiredBatches.size()); for (ProducerBatch expiredBatch : expiredBatches) { failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException(), false); } // 若是任意 broker 節點已經就緒,則將 pollTimeout 設置爲 0 // 這是爲了不沒必要要的等待,讓內存中的數據可以儘快被髮送出去 long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (!result.readyNodes.isEmpty()) { pollTimeout = 0; } // 經過 NetworkClient -> NetworkChannel -> TransportLayer // 最終將將消息寫入 NIO 的 Channel sendProduceRequests(batches, now); return pollTimeout; }
從前面的分析咱們能夠得知如下兩點信息:異步
爲了提升轉發效率、節省帶寬,使用 Java 復刻了一版 KafkaMirrorMaker 並進行了一些優化:
若是同時使用多個 Producer,可能在轉發過程當中發生數據亂序,折中的策略是根據 key 的 hash 值來選擇 Producer,保證 key 相同的數據會使用同個 Producer 進行發送:
void send(ConsumerRecord<byte[], byte[]> message) { ProducerRecord record = new ProducerRecord<>(message.topic(), message.key(), message.value()); int hash = Math.abs(Arrays.hashCode(message.key())); producers[hash % producers.length].send(record, onSend); }
多集羣廣播雖然可以必定程度上節省流量與機器資源,可是須要處理多個集羣間發送速度不一致的問題。
極端狀況下,若是其中某個機房的專線發生故障,Producer 會阻塞等待消息超時。當過量消息積壓在 Queue 中,會致使 JMV 頻繁的 FullGC,最終影響到對另外一個機房的轉發。
爲了處理這一狀況,須要在發送隊列上加上水位線watermark
限制:
interface Watermark { default long high() { return Long.MAX_VALUE; } default long low() { return 0; } } final BlockingQueue<byte[]> messageQueue = new LinkedBlockingQueue<>(); final AtomicLong messageBytes = new AtomicLong(); private void checkWatermark() { long bytesInQueue = messageBytes.get(); if (bytesInQueue > bytesWatermark.high()) { long discardBytes = bytesInQueue - bytesWatermark.low(); WatermarkKeeper keeper = new WatermarkKeeper(Integer.MAX_VALUE, discardBytes); keeper.discardMessage(messageQueue); long remainBytes = messageBytes.addAndGet(-discard.bytes()); } }
爲了實現高效的數據丟棄,使用BlockingQueue.drainTo
減小鎖開銷:
public class WatermarkKeeper extends AbstractCollection<byte[]> { private final int maxDiscardCount; // 丟棄消息數量上限 private final long maxDiscardBytes; // 丟棄消息字節上限 private int count; // 實際丟棄的消息數 private long bytes; // 實際丟棄消息字節數 public MessageBlackHole(int maxDiscardCount, long maxDiscardBytes) { this.maxDiscardCount = maxDiscardCount; this.maxDiscardBytes = maxDiscardBytes; } public void discardMessage(BlockingQueue<byte[]> queue) { try { queue.drainTo(this); } catch (StopDiscardException ignore) {} } @Override public boolean add(byte[] record) { if (count >= maxDiscardCount || bytes >= maxDiscardBytes) { throw new StopDiscardException(); } count++; bytes += record.length; return true; } @Override public int size() { return count; } public long bytes() { return bytes; } @Override public Iterator<byte[]> iterator() { throw new UnsupportedOperationException("iterator"); } // 中止丟棄 private static class StopDiscardException extends RuntimeException { @Override public synchronized Throwable fillInStackTrace() { return this; } } }
不使用 KafkairrorMaker 的另外一個重要緣由是其 JMX 監控不友好:
一個比較好的方式是使用 SpringBoot2 的 micrometer 框架實現監控:
// 監控註冊表(底層能夠接入不一樣的監控平臺) @Autowired private MeterRegistry meterRegistry; // 接入 Kafka 的監控信息 new KafkaClientMetrics(consumer).bindTo(meterRegistry); new KafkaClientMetrics(producer).bindTo(meterRegistry); // 接入自定義監控信息 Gauge.builder("bytesInQueue", messageBytes, AtomicLong::get) .description("Estimated message bytes backlog in BlockingQueue") .register(meterRegistry);
經過這一方式可以最大程度地利用現有可視化監控工具,減小沒必要要地開發工做。