異常
Async function call has timed out
1.3+
AsyncFunction
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
@PublicEvolving
public interface AsyncFunction<IN, OUT> extends Function, Serializable {
void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;
default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {
resultFuture.completeExceptionally(
new TimeoutException("Async function call has timed out."));
}
}
---
1.3
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator
public void processElement(StreamRecord<IN> element) throws Exception {
final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry(element);
if (this.timeout > 0L) {
long timeoutTimestamp = this.timeout + this.getProcessingTimeService().getCurrentProcessingTime();
final ScheduledFuture<?> timerFuture = this.getProcessingTimeService().registerTimer(timeoutTimestamp, new ProcessingTimeCallback() {
public void onProcessingTime(long timestamp) throws Exception {
streamRecordBufferEntry.collect(new TimeoutException("Async function call has timed out."));
}
});
streamRecordBufferEntry.onComplete(new AcceptFunction<StreamElementQueueEntry<Collection<OUT>>>() {
public void accept(StreamElementQueueEntry<Collection<OUT>> value) {
timerFuture.cancel(true);
}
}, this.executor);
}
this.addAsyncBufferEntry(streamRecordBufferEntry);
((AsyncFunction)this.userFunction).asyncInvoke(element.getValue(), streamRecordBufferEntry);
}