flink的Async I/O 異常處理

異常

Async function call has timed out

1.3+
AsyncFunction
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
@PublicEvolving
public interface AsyncFunction<IN, OUT> extends Function, Serializable {

    void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;

    default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {
        resultFuture.completeExceptionally(
            new TimeoutException("Async function call has timed out."));
    }

}

---

1.3
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator
public void processElement(StreamRecord<IN> element) throws Exception {
    final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry(element);
    if (this.timeout > 0L) {
        long timeoutTimestamp = this.timeout + this.getProcessingTimeService().getCurrentProcessingTime();
        final ScheduledFuture<?> timerFuture = this.getProcessingTimeService().registerTimer(timeoutTimestamp, new ProcessingTimeCallback() {
            public void onProcessingTime(long timestamp) throws Exception {
                streamRecordBufferEntry.collect(new TimeoutException("Async function call has timed out."));
            }
        });
        streamRecordBufferEntry.onComplete(new AcceptFunction<StreamElementQueueEntry<Collection<OUT>>>() {
            public void accept(StreamElementQueueEntry<Collection<OUT>> value) {
                timerFuture.cancel(true);
            }
        }, this.executor);
    }

    this.addAsyncBufferEntry(streamRecordBufferEntry);
    ((AsyncFunction)this.userFunction).asyncInvoke(element.getValue(), streamRecordBufferEntry);
}
相關文章
相關標籤/搜索