本文主要研究一下storm nimbus的mkAssignmentshtml
storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.javajava
void doRebalance(String topoId, StormBase stormBase) throws Exception { RebalanceOptions rbo = stormBase.get_topology_action_options().get_rebalance_options(); StormBase updated = new StormBase(); updated.set_topology_action_options(null); updated.set_component_debug(Collections.emptyMap()); if (rbo.is_set_num_executors()) { updated.set_component_executors(rbo.get_num_executors()); } if (rbo.is_set_num_workers()) { updated.set_num_workers(rbo.get_num_workers()); } stormClusterState.updateStorm(topoId, updated); updateBlobStore(topoId, rbo, ServerUtils.principalNameToSubject(rbo.get_principal())); idToExecutors.getAndUpdate(new Dissoc<>(topoId)); // remove the executors cache to let it recompute. mkAssignments(topoId); } private void mkAssignments() throws Exception { mkAssignments(null); }
storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.javanode
private void mkAssignments(String scratchTopoId) throws Exception { try { if (!isReadyForMKAssignments()) { return; } // get existing assignment (just the topologyToExecutorToNodePort map) -> default to {} // filter out ones which have a executor timeout // figure out available slots on cluster. add to that the used valid slots to get total slots. figure out how many executors // should be in each slot (e.g., 4, 4, 4, 5) // only keep existing slots that satisfy one of those slots. for rest, reassign them across remaining slots // edge case for slots with no executor timeout but with supervisor timeout... just treat these as valid slots that can be // reassigned to. worst comes to worse the executor will timeout and won't assign here next time around IStormClusterState state = stormClusterState; //read all the topologies Map<String, StormBase> bases; Map<String, TopologyDetails> tds = new HashMap<>(); synchronized (submitLock) { // should promote: only fetch storm bases of topologies that need scheduling. bases = state.topologyBases(); for (Iterator<Entry<String, StormBase>> it = bases.entrySet().iterator(); it.hasNext(); ) { Entry<String, StormBase> entry = it.next(); String id = entry.getKey(); try { tds.put(id, readTopologyDetails(id, entry.getValue())); } catch (KeyNotFoundException e) { //A race happened and it is probably not running it.remove(); } } } List<String> assignedTopologyIds = state.assignments(null); Map<String, Assignment> existingAssignments = new HashMap<>(); for (String id : assignedTopologyIds) { //for the topology which wants rebalance (specified by the scratchTopoId) // we exclude its assignment, meaning that all the slots occupied by its assignment // will be treated as free slot in the scheduler code. if (!id.equals(scratchTopoId)) { Assignment currentAssignment = state.assignmentInfo(id, null); if (!currentAssignment.is_set_owner()) { TopologyDetails td = tds.get(id); if (td != null) { currentAssignment.set_owner(td.getTopologySubmitter()); state.setAssignment(id, currentAssignment, td.getConf()); } } existingAssignments.put(id, currentAssignment); } } // make the new assignments for topologies lockingMkAssignments(existingAssignments, bases, scratchTopoId, assignedTopologyIds, state, tds); } catch (Exception e) { this.mkAssignmentsErrors.mark(); throw e; } }
storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.javaapache
private void lockingMkAssignments(Map<String, Assignment> existingAssignments, Map<String, StormBase> bases, String scratchTopoId, List<String> assignedTopologyIds, IStormClusterState state, Map<String, TopologyDetails> tds) throws Exception { Topologies topologies = new Topologies(tds); synchronized (schedLock) { Map<String, SchedulerAssignment> newSchedulerAssignments = computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId); Map<String, Map<List<Long>, List<Object>>> topologyToExecutorToNodePort = computeTopoToExecToNodePort(newSchedulerAssignments, assignedTopologyIds); Map<String, Map<WorkerSlot, WorkerResources>> newAssignedWorkerToResources = computeTopoToNodePortToResources(newSchedulerAssignments); int nowSecs = Time.currentTimeSecs(); Map<String, SupervisorDetails> basicSupervisorDetailsMap = basicSupervisorDetailsMap(state); //construct the final Assignments by adding start-times etc into it Map<String, Assignment> newAssignments = new HashMap<>(); //...... //tasks figure out what tasks to talk to by looking at topology at runtime // only log/set when there's been a change to the assignment for (Entry<String, Assignment> entry : newAssignments.entrySet()) { String topoId = entry.getKey(); Assignment assignment = entry.getValue(); Assignment existingAssignment = existingAssignments.get(topoId); TopologyDetails td = topologies.getById(topoId); if (assignment.equals(existingAssignment)) { LOG.debug("Assignment for {} hasn't changed", topoId); } else { LOG.info("Setting new assignment for topology id {}: {}", topoId, assignment); state.setAssignment(topoId, assignment, td.getConf()); } } //grouping assignment by node to see the nodes diff, then notify nodes/supervisors to synchronize its owned assignment //because the number of existing assignments is small for every scheduling round, //we expect to notify supervisors at almost the same time Map<String, String> totalAssignmentsChangedNodes = new HashMap<>(); for (Entry<String, Assignment> entry : newAssignments.entrySet()) { String topoId = entry.getKey(); Assignment assignment = entry.getValue(); Assignment existingAssignment = existingAssignments.get(topoId); totalAssignmentsChangedNodes.putAll(assignmentChangedNodes(existingAssignment, assignment)); } notifySupervisorsAssignments(newAssignments, assignmentsDistributer, totalAssignmentsChangedNodes, basicSupervisorDetailsMap); Map<String, Collection<WorkerSlot>> addedSlots = new HashMap<>(); for (Entry<String, Assignment> entry : newAssignments.entrySet()) { String topoId = entry.getKey(); Assignment assignment = entry.getValue(); Assignment existingAssignment = existingAssignments.get(topoId); if (existingAssignment == null) { existingAssignment = new Assignment(); existingAssignment.set_executor_node_port(new HashMap<>()); existingAssignment.set_executor_start_time_secs(new HashMap<>()); } Set<WorkerSlot> newSlots = newlyAddedSlots(existingAssignment, assignment); addedSlots.put(topoId, newSlots); } inimbus.assignSlots(topologies, addedSlots); } }
storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.javaapp
private Map<String, SchedulerAssignment> computeNewSchedulerAssignments(Map<String, Assignment> existingAssignments, Topologies topologies, Map<String, StormBase> bases, String scratchTopologyId) throws KeyNotFoundException, AuthorizationException, InvalidTopologyException, IOException { Map<String, Set<List<Integer>>> topoToExec = computeTopologyToExecutors(bases); Set<String> zkHeartbeatTopologies = topologies.getTopologies().stream() .filter(topo -> !supportRpcHeartbeat(topo)) .map(TopologyDetails::getId) .collect(Collectors.toSet()); updateAllHeartbeats(existingAssignments, topoToExec, zkHeartbeatTopologies); Map<String, Set<List<Integer>>> topoToAliveExecutors = computeTopologyToAliveExecutors(existingAssignments, topologies, topoToExec, scratchTopologyId); Map<String, Set<Long>> supervisorToDeadPorts = computeSupervisorToDeadPorts(existingAssignments, topoToExec, topoToAliveExecutors); Map<String, SchedulerAssignmentImpl> topoToSchedAssignment = computeTopologyToSchedulerAssignment(existingAssignments, topoToAliveExecutors); Set<String> missingAssignmentTopologies = new HashSet<>(); for (TopologyDetails topo : topologies.getTopologies()) { String id = topo.getId(); Set<List<Integer>> allExecs = topoToExec.get(id); Set<List<Integer>> aliveExecs = topoToAliveExecutors.get(id); int numDesiredWorkers = topo.getNumWorkers(); int numAssignedWorkers = numUsedWorkers(topoToSchedAssignment.get(id)); if (allExecs == null || allExecs.isEmpty() || !allExecs.equals(aliveExecs) || numDesiredWorkers > numAssignedWorkers) { //We have something to schedule... missingAssignmentTopologies.add(id); } } Map<String, SupervisorDetails> supervisors = readAllSupervisorDetails(supervisorToDeadPorts, topologies, missingAssignmentTopologies); Cluster cluster = new Cluster(inimbus, resourceMetrics, supervisors, topoToSchedAssignment, topologies, conf); cluster.setStatusMap(idToSchedStatus.get()); schedulingStartTimeNs.set(Time.nanoTime()); scheduler.schedule(topologies, cluster); //Get and set the start time before getting current time in order to avoid potential race with the longest-scheduling-time-ms gauge //...... return cluster.getAssignments(); }
storm-2.0.0/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.javaide
public void schedule(Topologies topologies, Cluster cluster) { defaultSchedule(topologies, cluster); } public static void defaultSchedule(Topologies topologies, Cluster cluster) { for (TopologyDetails topology : cluster.needsSchedulingTopologies()) { List<WorkerSlot> availableSlots = cluster.getAvailableSlots(); Set<ExecutorDetails> allExecutors = topology.getExecutors(); Map<WorkerSlot, List<ExecutorDetails>> aliveAssigned = EvenScheduler.getAliveAssignedWorkerSlotExecutors(cluster, topology.getId()); Set<ExecutorDetails> aliveExecutors = new HashSet<ExecutorDetails>(); for (List<ExecutorDetails> list : aliveAssigned.values()) { aliveExecutors.addAll(list); } Set<WorkerSlot> canReassignSlots = slotsCanReassign(cluster, aliveAssigned.keySet()); int totalSlotsToUse = Math.min(topology.getNumWorkers(), canReassignSlots.size() + availableSlots.size()); Set<WorkerSlot> badSlots = null; if (totalSlotsToUse > aliveAssigned.size() || !allExecutors.equals(aliveExecutors)) { badSlots = badSlots(aliveAssigned, allExecutors.size(), totalSlotsToUse); } if (badSlots != null) { cluster.freeSlots(badSlots); } EvenScheduler.scheduleTopologiesEvenly(new Topologies(topology), cluster); } }
storm-2.0.0/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.javafetch
public static void scheduleTopologiesEvenly(Topologies topologies, Cluster cluster) { for (TopologyDetails topology : cluster.needsSchedulingTopologies()) { String topologyId = topology.getId(); Map<ExecutorDetails, WorkerSlot> newAssignment = scheduleTopology(topology, cluster); Map<WorkerSlot, List<ExecutorDetails>> nodePortToExecutors = Utils.reverseMap(newAssignment); for (Map.Entry<WorkerSlot, List<ExecutorDetails>> entry : nodePortToExecutors.entrySet()) { WorkerSlot nodePort = entry.getKey(); List<ExecutorDetails> executors = entry.getValue(); cluster.assign(nodePort, topologyId, executors); } } }
storm-2.0.0/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.javathis
private static Map<ExecutorDetails, WorkerSlot> scheduleTopology(TopologyDetails topology, Cluster cluster) { List<WorkerSlot> availableSlots = cluster.getAvailableSlots(); Set<ExecutorDetails> allExecutors = topology.getExecutors(); Map<WorkerSlot, List<ExecutorDetails>> aliveAssigned = getAliveAssignedWorkerSlotExecutors(cluster, topology.getId()); int totalSlotsToUse = Math.min(topology.getNumWorkers(), availableSlots.size() + aliveAssigned.size()); List<WorkerSlot> sortedList = sortSlots(availableSlots); if (sortedList == null) { LOG.error("No available slots for topology: {}", topology.getName()); return new HashMap<ExecutorDetails, WorkerSlot>(); } //allow requesting slots number bigger than available slots int toIndex = (totalSlotsToUse - aliveAssigned.size()) > sortedList.size() ? sortedList.size() : (totalSlotsToUse - aliveAssigned.size()); List<WorkerSlot> reassignSlots = sortedList.subList(0, toIndex); Set<ExecutorDetails> aliveExecutors = new HashSet<ExecutorDetails>(); for (List<ExecutorDetails> list : aliveAssigned.values()) { aliveExecutors.addAll(list); } Set<ExecutorDetails> reassignExecutors = Sets.difference(allExecutors, aliveExecutors); Map<ExecutorDetails, WorkerSlot> reassignment = new HashMap<ExecutorDetails, WorkerSlot>(); if (reassignSlots.size() == 0) { return reassignment; } List<ExecutorDetails> executors = new ArrayList<ExecutorDetails>(reassignExecutors); Collections.sort(executors, new Comparator<ExecutorDetails>() { @Override public int compare(ExecutorDetails o1, ExecutorDetails o2) { return o1.getStartTask() - o2.getStartTask(); } }); for (int i = 0; i < executors.size(); i++) { reassignment.put(executors.get(i), reassignSlots.get(i % reassignSlots.size())); } if (reassignment.size() != 0) { LOG.info("Available slots: {}", availableSlots.toString()); } return reassignment; }
storm-2.0.0/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.javadebug
public class ExecutorDetails { public final int startTask; public final int endTask; //...... @Override public boolean equals(Object other) { if (!(other instanceof ExecutorDetails)) { return false; } ExecutorDetails executor = (ExecutorDetails) other; return (this.startTask == executor.startTask) && (this.endTask == executor.endTask); } }
storm-2.0.0/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.javarest
/** * Assign the slot to the executors for this topology. * * @throws RuntimeException if the specified slot is already occupied. */ public void assign(WorkerSlot slot, String topologyId, Collection<ExecutorDetails> executors) { assertValidTopologyForModification(topologyId); if (isSlotOccupied(slot)) { throw new RuntimeException( "slot: [" + slot.getNodeId() + ", " + slot.getPort() + "] is already occupied."); } TopologyDetails td = topologies.getById(topologyId); if (td == null) { throw new IllegalArgumentException( "Trying to schedule for topo " + topologyId + " but that is not a known topology " + topologies.getAllIds()); } WorkerResources resources = calculateWorkerResources(td, executors); SchedulerAssignmentImpl assignment = assignments.get(topologyId); if (assignment == null) { assignment = new SchedulerAssignmentImpl(topologyId); assignments.put(topologyId, assignment); } else { for (ExecutorDetails executor : executors) { if (assignment.isExecutorAssigned(executor)) { throw new RuntimeException( "Attempting to assign executor: " + executor + " of topology: " + topologyId + " to workerslot: " + slot + ". The executor is already assigned to workerslot: " + assignment.getExecutorToSlot().get(executor) + ". The executor must unassigned before it can be assigned to another slot!"); } } } assignment.assign(slot, executors, resources); String nodeId = slot.getNodeId(); double sharedOffHeapMemory = calculateSharedOffHeapMemory(nodeId, assignment); assignment.setTotalSharedOffHeapMemory(nodeId, sharedOffHeapMemory); updateCachesForWorkerSlot(slot, resources, sharedOffHeapMemory); totalResourcesPerNodeCache.remove(slot.getNodeId()); }
storm-2.0.0/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
/** * Assign the slot to executors. */ public void assign(WorkerSlot slot, Collection<ExecutorDetails> executors, WorkerResources slotResources) { assert slot != null; for (ExecutorDetails executor : executors) { this.executorToSlot.put(executor, slot); } slotToExecutors.computeIfAbsent(slot, MAKE_LIST) .addAll(executors); if (slotResources != null) { resources.put(slot, slotResources); } else { resources.remove(slot); } }
startTask及endTask
)分配slotMap<String, SchedulerAssignmentImpl>,key爲topologyId
),期間經過scheduler.schedule(topologies, cluster);進行調度