本文主要研究一下storm的AssignmentDistributionServicehtml
storm-2.0.0/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.javajava
/** * A service for distributing master assignments to supervisors, this service makes the assignments notification * asynchronous. * * <p>We support multiple working threads to distribute assignment, every thread has a queue buffer. * * <p>Master will shuffle its node request to the queues, if the target queue is full, we just discard the request, * let the supervisors sync instead. * * <p>Caution: this class is not thread safe. * * <pre>{@code * Working mode * +--------+ +-----------------+ * | queue1 | ==> | Working thread1 | * +--------+ shuffle +--------+ +-----------------+ * | Master | ==> * +--------+ +--------+ +-----------------+ * | queue2 | ==> | Working thread2 | * +--------+ +-----------------+ * } * </pre> */ public class AssignmentDistributionService implements Closeable { //...... private ExecutorService service; /** * Assignments request queue. */ private volatile Map<Integer, LinkedBlockingQueue<NodeAssignments>> assignmentsQueue; /** * Add an assignments for a node/supervisor for distribution. * @param node node id of supervisor. * @param host host name for the node. * @param serverPort node thrift server port. * @param assignments the {@link org.apache.storm.generated.SupervisorAssignments} */ public void addAssignmentsForNode(String node, String host, Integer serverPort, SupervisorAssignments assignments) { try { //For some reasons, we can not get supervisor port info, eg: supervisor shutdown, //Just skip for this scheduling round. if (serverPort == null) { LOG.warn("Discard an assignment distribution for node {} because server port info is missing.", node); return; } boolean success = nextQueue().offer(NodeAssignments.getInstance(node, host, serverPort, assignments), 5L, TimeUnit.SECONDS); if (!success) { LOG.warn("Discard an assignment distribution for node {} because the target sub queue is full.", node); } } catch (InterruptedException e) { LOG.error("Add node assignments interrupted: {}", e.getMessage()); throw new RuntimeException(e); } } private LinkedBlockingQueue<NodeAssignments> nextQueue() { return this.assignmentsQueue.get(nextQueueId()); } }
storm-2.0.0/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.javanode
/** * Factory method for initialize a instance. * @param conf config. * @return an instance of {@link AssignmentDistributionService} */ public static AssignmentDistributionService getInstance(Map conf) { AssignmentDistributionService service = new AssignmentDistributionService(); service.prepare(conf); return service; } /** * Function for initialization. * * @param conf config */ public void prepare(Map conf) { this.conf = conf; this.random = new Random(47); this.threadsNum = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREADS), 10); this.queueSize = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE), 100); this.assignmentsQueue = new HashMap<>(); for (int i = 0; i < threadsNum; i++) { this.assignmentsQueue.put(i, new LinkedBlockingQueue<NodeAssignments>(queueSize)); } //start the thread pool this.service = Executors.newFixedThreadPool(threadsNum); this.active = true; //start the threads for (int i = 0; i < threadsNum; i++) { this.service.submit(new DistributeTask(this, i)); } // for local cluster localSupervisors = new HashMap<>(); if (ConfigUtils.isLocalMode(conf)) { isLocalMode = true; } }
storm-2.0.0/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.javaapache
/** * Task to distribute assignments. */ static class DistributeTask implements Runnable { private AssignmentDistributionService service; private Integer queueIndex; DistributeTask(AssignmentDistributionService service, Integer index) { this.service = service; this.queueIndex = index; } @Override public void run() { while (service.isActive()) { try { NodeAssignments nodeAssignments = this.service.nextAssignments(queueIndex); sendAssignmentsToNode(nodeAssignments); } catch (InterruptedException e) { if (service.isActive()) { LOG.error("Get an unexpected interrupt when distributing assignments to node, {}", e.getCause()); } else { // service is off now just interrupt it. Thread.currentThread().interrupt(); } } } } private void sendAssignmentsToNode(NodeAssignments assignments) { if (this.service.isLocalMode) { //local node Supervisor supervisor = this.service.localSupervisors.get(assignments.getNode()); if (supervisor != null) { supervisor.sendSupervisorAssignments(assignments.getAssignments()); } else { LOG.error("Can not find node {} for assignments distribution", assignments.getNode()); throw new RuntimeException("null for node " + assignments.getNode() + " supervisor instance."); } } else { // distributed mode try (SupervisorClient client = SupervisorClient.getConfiguredClient(service.getConf(), assignments.getHost(), assignments.getServerPort())) { try { client.getClient().sendSupervisorAssignments(assignments.getAssignments()); } catch (Exception e) { //just ignore the exception. LOG.error("Exception when trying to send assignments to node {}: {}", assignments.getNode(), e.getMessage()); } } catch (Throwable e) { //just ignore any error/exception. LOG.error("Exception to create supervisor client for node {}: {}", assignments.getNode(), e.getMessage()); } } } } /** * Get an assignments from the target queue with the specific index. * @param queueIndex index of the queue * @return an {@link NodeAssignments} * @throws InterruptedException */ public NodeAssignments nextAssignments(Integer queueIndex) throws InterruptedException { NodeAssignments target = null; while (true) { target = getQueueById(queueIndex).poll(); if (target != null) { return target; } Time.sleep(100L); } }
storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.javaapp
private void launchSupervisorThriftServer(Map<String, Object> conf) throws IOException { // validate port int port = getThriftServerPort(); try { ServerSocket socket = new ServerSocket(port); socket.close(); } catch (BindException e) { LOG.error("{} is not available. Check if another process is already listening on {}", port, port); throw new RuntimeException(e); } TProcessor processor = new org.apache.storm.generated.Supervisor.Processor( new org.apache.storm.generated.Supervisor.Iface() { @Override public void sendSupervisorAssignments(SupervisorAssignments assignments) throws AuthorizationException, TException { checkAuthorization("sendSupervisorAssignments"); LOG.info("Got an assignments from master, will start to sync with assignments: {}", assignments); SynchronizeAssignments syn = new SynchronizeAssignments(getSupervisor(), assignments, getReadClusterState()); getEventManger().add(syn); } //...... }); this.thriftServer = new ThriftServer(conf, processor, ThriftConnectionType.SUPERVISOR); this.thriftServer.serve(); }
storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.javadom
/** * A runnable which will synchronize assignments to node local and then worker processes. */ public class SynchronizeAssignments implements Runnable { //...... @Override public void run() { // first sync assignments to local, then sync processes. if (null == assignments) { getAssignmentsFromMaster(this.supervisor.getConf(), this.supervisor.getStormClusterState(), this.supervisor.getAssignmentId()); } else { assignedAssignmentsToLocal(this.supervisor.getStormClusterState(), assignments); } this.readClusterState.run(); } private static void assignedAssignmentsToLocal(IStormClusterState clusterState, SupervisorAssignments assignments) { if (null == assignments) { //unknown error, just skip return; } Map<String, byte[]> serAssignments = new HashMap<>(); for (Map.Entry<String, Assignment> entry : assignments.get_storm_assignment().entrySet()) { serAssignments.put(entry.getKey(), Utils.serialize(entry.getValue())); } clusterState.syncRemoteAssignments(serAssignments); } }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.javajvm
@Override public void syncRemoteAssignments(Map<String, byte[]> remote) { if (null != remote) { this.assignmentsBackend.syncRemoteAssignments(remote); } else { Map<String, byte[]> tmp = new HashMap<>(); List<String> stormIds = this.stateStorage.get_children(ClusterUtils.ASSIGNMENTS_SUBTREE, false); for (String stormId : stormIds) { byte[] assignment = this.stateStorage.get_data(ClusterUtils.assignmentPath(stormId), false); tmp.put(stormId, assignment); } this.assignmentsBackend.syncRemoteAssignments(tmp); } }
即本地內存
)/assignments/{topologyId}
)storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.javasocket
@Override public synchronized void run() { try { List<String> stormIds = stormClusterState.assignments(null); Map<String, Assignment> assignmentsSnapshot = getAssignmentsSnapshot(stormClusterState); Map<Integer, LocalAssignment> allAssignments = readAssignments(assignmentsSnapshot); if (allAssignments == null) { //Something odd happened try again later return; } Map<String, List<ProfileRequest>> topoIdToProfilerActions = getProfileActions(stormClusterState, stormIds); HashSet<Integer> assignedPorts = new HashSet<>(); LOG.debug("Synchronizing supervisor"); LOG.debug("All assignment: {}", allAssignments); LOG.debug("Topology Ids -> Profiler Actions {}", topoIdToProfilerActions); for (Integer port : allAssignments.keySet()) { if (iSuper.confirmAssigned(port)) { assignedPorts.add(port); } } HashSet<Integer> allPorts = new HashSet<>(assignedPorts); iSuper.assigned(allPorts); allPorts.addAll(slots.keySet()); Map<Integer, Set<TopoProfileAction>> filtered = new HashMap<>(); for (Entry<String, List<ProfileRequest>> entry : topoIdToProfilerActions.entrySet()) { String topoId = entry.getKey(); if (entry.getValue() != null) { for (ProfileRequest req : entry.getValue()) { NodeInfo ni = req.get_nodeInfo(); if (host.equals(ni.get_node())) { Long port = ni.get_port().iterator().next(); Set<TopoProfileAction> actions = filtered.get(port.intValue()); if (actions == null) { actions = new HashSet<>(); filtered.put(port.intValue(), actions); } actions.add(new TopoProfileAction(topoId, req)); } } } } for (Integer port : allPorts) { Slot slot = slots.get(port); if (slot == null) { slot = mkSlot(port); slots.put(port, slot); slot.start(); } slot.setNewAssignment(allAssignments.get(port)); slot.addProfilerActions(filtered.get(port)); } } catch (Exception e) { LOG.error("Failed to Sync Supervisor", e); throw new RuntimeException(e); } }