https://ci.apache.org/projects/flink/flink-docs-release-1.6/internals/stream_checkpointing.htmlhtml
@Override
publicfinalvoidnotifyCheckpointComplete(longcheckpointId)throwsException{
if(!running){
LOG.debug("notifyCheckpointComplete()calledonclosedsource");
return;
}react
finalAbstractFetcher<?,?>fetcher=this.kafkaFetcher;
if(fetcher==null){
LOG.debug("notifyCheckpointComplete()calledonuninitializedsource");
return;
}apache
if(offsetCommitMode==OffsetCommitMode.ON_CHECKPOINTS){
//onlyonecommitoperationmustbeinprogress
if(LOG.isDebugEnabled()){
LOG.debug("CommittingoffsetstoKafka/ZooKeeperforcheckpoint"+checkpointId);
}app
try{
finalintposInMap=pendingOffsetsToCommit.indexOf(checkpointId);
if(posInMap==-1){
LOG.warn("Receivedconfirmationforunknowncheckpointid{}",checkpointId);
return;
}ide
@SuppressWarnings("unchecked")
Map<KafkaTopicPartition,Long>offsets=
(Map<KafkaTopicPartition,Long>)pendingOffsetsToCommit.remove(posInMap);post
//removeoldercheckpointsinmap
for(inti=0;i<posInMap;i++){
pendingOffsetsToCommit.remove(0);
}fetch
if(offsets==null||offsets.size()==0){
LOG.debug("Checkpointstatewasempty.");
return;
}this
fetcher.commitInternalOffsetsToKafka(offsets,offsetCommitCallback);
}catch(Exceptione){
if(running){
throwe;
}
//elseignoreexceptionifwearenolongerrunning
}
}
}debug
/**
*Theoffsetcommitmoderepresentsthebehaviourofhowoffsetsareexternallycommitted
*backtoKafkabrokers/Zookeeper.
*
*<p>Theexactvalueofthisisdeterminedatruntimeintheconsumersubtasks.
*/
@Internal
publicenumOffsetCommitMode{code
/**Completelydisableoffsetcommitting.*/
DISABLED,
/**CommitoffsetsbacktoKafkaonlywhencheckpointsarecompleted.*/
ON_CHECKPOINTS,
/**CommitoffsetsperiodicallybacktoKafka,usingtheautocommitfunctionalityofinternalKafkaclients.*/
KAFKA_PERIODIC;
}
/**
*CommitsthegivenpartitionoffsetstotheKafkabrokers(ortoZooKeeperfor
*olderKafkaversions).Thismethodisonlyevercalledwhentheoffsetcommitmodeof
*theconsumeris{@linkOffsetCommitMode#ON_CHECKPOINTS}.
*
*<p>Thegivenoffsetsaretheinternalcheckpointedoffsets,representing
*thelastprocessedrecordofeachpartition.Version-specificimplementationsofthismethod
*needtoholdthecontractthatthegivenoffsetsmustbeincrementedby1before
*committingthem,sothatcommittedoffsetstoKafkarepresent"thenextrecordtoprocess".
*
*@paramoffsetsTheoffsetstocommittoKafka(implementationsmustincrementoffsetsby1beforecommitting).
*@paramcommitCallbackThecallbackthattheusershouldtriggerwhenacommitrequestcompletesorfails.
*@throwsExceptionThismethodforwardsexceptions.
*/
publicfinalvoidcommitInternalOffsetsToKafka(
Map<KafkaTopicPartition,Long>offsets,
@NonnullKafkaCommitCallbackcommitCallback)throwsException{
//Ignoresentinels.Theymightappearhereifsnapshothasstartedbeforeactualoffsetsvalues
//replacedsentinels
doCommitInternalOffsetsToKafka(filterOutSentinels(offsets),commitCallback);
}
/** * Invoking this method makes all buffered records immediately available to send (even if <code>linger.ms</code> is * greater than 0) and blocks on the completion of the requests associated with these records. The post-condition * of <code>flush()</code> is that any previously sent record will have completed (e.g. <code>Future.isDone() == true</code>). * A request is considered completed when it is successfully acknowledged * according to the <code>acks</code> configuration you have specified or else it results in an error. * <p> * Other threads can continue sending records while one thread is blocked waiting for a flush call to complete, * however no guarantee is made about the completion of records sent after the flush call begins. * <p> * This method can be useful when consuming from some input system and producing into Kafka. The <code>flush()</code> call * gives a convenient way to ensure all previously sent messages have actually completed. * <p> * This example shows how to consume from one Kafka topic and produce to another Kafka topic: * <pre> * {@code * for(ConsumerRecord<String, String> record: consumer.poll(100)) * producer.send(new ProducerRecord("my-topic", record.key(), record.value()); * producer.flush(); * consumer.commit(); * } * </pre> * * Note that the above example may drop records if the produce request fails. If we want to ensure that this does not occur * we need to set <code>retries=<large_number></code> in our config. * </p> * <p> * Applications don't need to call this method for transactional producers, since the {@link #commitTransaction()} will * flush all buffered records before performing the commit. This ensures that all the the {@link #send(ProducerRecord)} * calls made since the previous {@link #beginTransaction()} are completed before the commit. * </p> * * @throws InterruptException If the thread is interrupted while blocked */ @Override public void flush() { log.trace("Flushing accumulated records in producer."); this.accumulator.beginFlush(); this.sender.wakeup(); try { this.accumulator.awaitFlushCompletion(); } catch (InterruptedException e) { throw new InterruptException("Flush interrupted.", e); } }