producer.initTransactions(); try { producer.beginTransaction(); producer.send(record1); producer.send(record2); producer.commitTransaction(); } catch(ProducerFencedException e) { producer.close(); } catch(KafkaException e) { producer.abortTransaction(); }
messages.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreachPartition { iter => // output to database } messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) }