本文主要研究一下flink的slot.request.timeout配置html
flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.javajava
@PublicEvolving public class JobManagerOptions { //...... /** * The timeout in milliseconds for requesting a slot from Slot Pool. */ public static final ConfigOption<Long> SLOT_REQUEST_TIMEOUT = key("slot.request.timeout") .defaultValue(5L * 60L * 1000L) .withDescription("The timeout in milliseconds for requesting a slot from Slot Pool."); //...... }
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.javaapache
public class SlotManagerConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(SlotManagerConfiguration.class); private final Time taskManagerRequestTimeout; private final Time slotRequestTimeout; private final Time taskManagerTimeout; public SlotManagerConfiguration( Time taskManagerRequestTimeout, Time slotRequestTimeout, Time taskManagerTimeout) { this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout); this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout); this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout); } public Time getTaskManagerRequestTimeout() { return taskManagerRequestTimeout; } public Time getSlotRequestTimeout() { return slotRequestTimeout; } public Time getTaskManagerTimeout() { return taskManagerTimeout; } public static SlotManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException { final String strTimeout = configuration.getString(AkkaOptions.ASK_TIMEOUT); final Time rpcTimeout; try { rpcTimeout = Time.milliseconds(Duration.apply(strTimeout).toMillis()); } catch (NumberFormatException e) { throw new ConfigurationException("Could not parse the resource manager's timeout " + "value " + AkkaOptions.ASK_TIMEOUT + '.', e); } final Time slotRequestTimeout = getSlotRequestTimeout(configuration); final Time taskManagerTimeout = Time.milliseconds( configuration.getLong(ResourceManagerOptions.TASK_MANAGER_TIMEOUT)); return new SlotManagerConfiguration(rpcTimeout, slotRequestTimeout, taskManagerTimeout); } private static Time getSlotRequestTimeout(final Configuration configuration) { final long slotRequestTimeoutMs; if (configuration.contains(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT)) { LOGGER.warn("Config key {} is deprecated; use {} instead.", ResourceManagerOptions.SLOT_REQUEST_TIMEOUT, JobManagerOptions.SLOT_REQUEST_TIMEOUT); slotRequestTimeoutMs = configuration.getLong(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT); } else { slotRequestTimeoutMs = configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT); } return Time.milliseconds(slotRequestTimeoutMs); } }
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.javaapp
public class SlotManager implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class); /** Scheduled executor for timeouts. */ private final ScheduledExecutor scheduledExecutor; /** Timeout for slot requests to the task manager. */ private final Time taskManagerRequestTimeout; /** Timeout after which an allocation is discarded. */ private final Time slotRequestTimeout; /** Timeout after which an unused TaskManager is released. */ private final Time taskManagerTimeout; /** Map for all registered slots. */ private final HashMap<SlotID, TaskManagerSlot> slots; /** Index of all currently free slots. */ private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots; /** All currently registered task managers. */ private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations; /** Map of fulfilled and active allocations for request deduplication purposes. */ private final HashMap<AllocationID, SlotID> fulfilledSlotRequests; /** Map of pending/unfulfilled slot allocation requests. */ private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests; private final HashMap<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots; /** ResourceManager's id. */ private ResourceManagerId resourceManagerId; /** Executor for future callbacks which have to be "synchronized". */ private Executor mainThreadExecutor; /** Callbacks for resource (de-)allocations. */ private ResourceActions resourceActions; private ScheduledFuture<?> taskManagerTimeoutCheck; private ScheduledFuture<?> slotRequestTimeoutCheck; /** True iff the component has been started. */ private boolean started; public SlotManager( ScheduledExecutor scheduledExecutor, Time taskManagerRequestTimeout, Time slotRequestTimeout, Time taskManagerTimeout) { this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor); this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout); this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout); this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout); slots = new HashMap<>(16); freeSlots = new LinkedHashMap<>(16); taskManagerRegistrations = new HashMap<>(4); fulfilledSlotRequests = new HashMap<>(16); pendingSlotRequests = new HashMap<>(16); pendingSlots = new HashMap<>(16); resourceManagerId = null; resourceActions = null; mainThreadExecutor = null; taskManagerTimeoutCheck = null; slotRequestTimeoutCheck = null; started = false; } public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions) { LOG.info("Starting the SlotManager."); this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId); mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor); resourceActions = Preconditions.checkNotNull(newResourceActions); started = true; taskManagerTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay( () -> mainThreadExecutor.execute( () -> checkTaskManagerTimeouts()), 0L, taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); slotRequestTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay( () -> mainThreadExecutor.execute( () -> checkSlotRequestTimeouts()), 0L, slotRequestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); } /** * Suspends the component. This clears the internal state of the slot manager. */ public void suspend() { LOG.info("Suspending the SlotManager."); // stop the timeout checks for the TaskManagers and the SlotRequests if (taskManagerTimeoutCheck != null) { taskManagerTimeoutCheck.cancel(false); taskManagerTimeoutCheck = null; } if (slotRequestTimeoutCheck != null) { slotRequestTimeoutCheck.cancel(false); slotRequestTimeoutCheck = null; } for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) { cancelPendingSlotRequest(pendingSlotRequest); } pendingSlotRequests.clear(); ArrayList<InstanceID> registeredTaskManagers = new ArrayList<>(taskManagerRegistrations.keySet()); for (InstanceID registeredTaskManager : registeredTaskManagers) { unregisterTaskManager(registeredTaskManager); } resourceManagerId = null; resourceActions = null; started = false; } public boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException { checkInit(); if (checkDuplicateRequest(slotRequest.getAllocationId())) { LOG.debug("Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId()); return false; } else { PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest); pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest); try { internalRequestSlot(pendingSlotRequest); } catch (ResourceManagerException e) { // requesting the slot failed --> remove pending slot request pendingSlotRequests.remove(slotRequest.getAllocationId()); throw new SlotManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e); } return true; } } private void checkSlotRequestTimeouts() { if (!pendingSlotRequests.isEmpty()) { long currentTime = System.currentTimeMillis(); Iterator<Map.Entry<AllocationID, PendingSlotRequest>> slotRequestIterator = pendingSlotRequests.entrySet().iterator(); while (slotRequestIterator.hasNext()) { PendingSlotRequest slotRequest = slotRequestIterator.next().getValue(); if (currentTime - slotRequest.getCreationTimestamp() >= slotRequestTimeout.toMilliseconds()) { slotRequestIterator.remove(); if (slotRequest.isAssigned()) { cancelPendingSlotRequest(slotRequest); } resourceActions.notifyAllocationFailure( slotRequest.getJobId(), slotRequest.getAllocationId(), new TimeoutException("The allocation could not be fulfilled in time.")); } } } } //...... }