public class MaxwellKafkaProducer extends AbstractProducer { private final ArrayBlockingQueue<RowMap> queue; private final MaxwellKafkaProducerWorker worker; public MaxwellKafkaProducer(MaxwellContext context, Properties kafkaProperties, String kafkaTopic) { super(context); this.queue = new ArrayBlockingQueue<>(100); this.worker = new MaxwellKafkaProducerWorker(context, kafkaProperties, kafkaTopic, this.queue); Thread thread = new Thread(this.worker, "maxwell-kafka-worker"); thread.setDaemon(true); thread.start(); } @Override public void push(RowMap r) throws Exception { this.queue.put(r); } @Override public StoppableTask getStoppableTask() { return this.worker; } @Override public KafkaProducerDiagnostic getDiagnostic() { return new KafkaProducerDiagnostic(worker, context.getConfig(), context.getPositionStoreThread()); } }
class MaxwellKafkaProducerWorker extends AbstractAsyncProducer implements Runnable, StoppableTask { static final Logger LOGGER = LoggerFactory.getLogger(MaxwellKafkaProducer.class); private final Producer<String, String> kafka; private final String topic; private final String ddlTopic; private final MaxwellKafkaPartitioner partitioner; private final MaxwellKafkaPartitioner ddlPartitioner; private final KeyFormat keyFormat; private final boolean interpolateTopic; private final ArrayBlockingQueue<RowMap> queue; private Thread thread; private StoppableTaskState taskState; private String deadLetterTopic; private final ConcurrentLinkedQueue<Pair<ProducerRecord<String,String>, KafkaCallback>> deadLetterQueue; public static MaxwellKafkaPartitioner makeDDLPartitioner(String partitionHashFunc, String partitionKey) { if ( partitionKey.equals("table") ) { return new MaxwellKafkaPartitioner(partitionHashFunc, "table", null, "database"); } else { return new MaxwellKafkaPartitioner(partitionHashFunc, "database", null, null); } } public MaxwellKafkaProducerWorker(MaxwellContext context, String kafkaTopic, ArrayBlockingQueue<RowMap> queue, Producer<String,String> producer) { super(context); if ( kafkaTopic == null ) { this.topic = "maxwell"; } else { this.topic = kafkaTopic; } this.interpolateTopic = this.topic.contains("%{"); this.kafka = producer; String hash = context.getConfig().kafkaPartitionHash; String partitionKey = context.getConfig().producerPartitionKey; String partitionColumns = context.getConfig().producerPartitionColumns; String partitionFallback = context.getConfig().producerPartitionFallback; this.partitioner = new MaxwellKafkaPartitioner(hash, partitionKey, partitionColumns, partitionFallback); this.ddlPartitioner = makeDDLPartitioner(hash, partitionKey); this.ddlTopic = context.getConfig().ddlKafkaTopic; this.deadLetterTopic = context.getConfig().deadLetterTopic; this.deadLetterQueue = new ConcurrentLinkedQueue<>(); if ( context.getConfig().kafkaKeyFormat.equals("hash") ) keyFormat = KeyFormat.HASH; else keyFormat = KeyFormat.ARRAY; this.queue = queue; this.taskState = new StoppableTaskState("MaxwellKafkaProducerWorker"); } public MaxwellKafkaProducerWorker(MaxwellContext context, Properties kafkaProperties, String kafkaTopic, ArrayBlockingQueue<RowMap> queue) { this(context, kafkaTopic, queue, new KafkaProducer<String,String>(kafkaProperties, new StringSerializer(), new StringSerializer())); } @Override public void run() { this.thread = Thread.currentThread(); while ( true ) { try { drainDeadLetterQueue(); RowMap row = queue.take(); if (!taskState.isRunning()) { taskState.stopped(); return; } this.push(row); } catch ( Exception e ) { taskState.stopped(); context.terminate(e); return; } } } void drainDeadLetterQueue() { Pair<ProducerRecord<String, String>, KafkaCallback> pair; while ((pair = deadLetterQueue.poll()) != null) { sendAsync(pair.getLeft(), pair.getRight()); } } //...... }
public abstract class AbstractAsyncProducer extends AbstractProducer { public class CallbackCompleter { private InflightMessageList inflightMessages; private final MaxwellContext context; private final MaxwellConfig config; private final Position position; private final boolean isTXCommit; private final long messageID; public CallbackCompleter(InflightMessageList inflightMessages, Position position, boolean isTXCommit, MaxwellContext context, long messageID) { this.inflightMessages = inflightMessages; this.context = context; this.config = context.getConfig(); this.position = position; this.isTXCommit = isTXCommit; this.messageID = messageID; } public void markCompleted() { inflightMessages.freeSlot(messageID); if(isTXCommit) { InflightMessageList.InflightMessage message = inflightMessages.completeMessage(position); if (message != null) { context.setPosition(message.position); long currentTime = System.currentTimeMillis(); long age = currentTime - message.sendTimeMS; messagePublishTimer.update(age, TimeUnit.MILLISECONDS); messageLatencyTimer.update(Math.max(0L, currentTime - message.eventTimeMS - 500L), TimeUnit.MILLISECONDS); if (age > config.metricsAgeSlo) { messageLatencySloViolationCount.inc(); } } } } } private InflightMessageList inflightMessages; public AbstractAsyncProducer(MaxwellContext context) { super(context); this.inflightMessages = new InflightMessageList(context); Metrics metrics = context.getMetrics(); String gaugeName = metrics.metricName("inflightmessages", "count"); metrics.register(gaugeName, (Gauge<Long>) () -> (long) inflightMessages.size()); } public abstract void sendAsync(RowMap r, CallbackCompleter cc) throws Exception; @Override public final void push(RowMap r) throws Exception { Position position = r.getNextPosition(); // Rows that do not get sent to a target will be automatically marked as complete. // We will attempt to commit a checkpoint up to the current row. if(!r.shouldOutput(outputConfig)) { if ( position != null ) { inflightMessages.addMessage(position, r.getTimestampMillis(), 0L); InflightMessageList.InflightMessage completed = inflightMessages.completeMessage(position); if (completed != null) { context.setPosition(completed.position); } } return; } // back-pressure from slow producers long messageID = inflightMessages.waitForSlot(); if(r.isTXCommit()) { inflightMessages.addMessage(position, r.getTimestampMillis(), messageID); } CallbackCompleter cc = new CallbackCompleter(inflightMessages, position, r.isTXCommit(), context, messageID); sendAsync(r, cc); } }