本文主要研究一下eureka的TaskDispatcherjava
public class PeerEurekaNode { public PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl, HttpReplicationClient replicationClient, EurekaServerConfig config) { this(registry, targetHost, serviceUrl, replicationClient, config, BATCH_SIZE, MAX_BATCHING_DELAY_MS, RETRY_SLEEP_TIME_MS, SERVER_UNAVAILABLE_SLEEP_TIME_MS); } /* For testing */ PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl, HttpReplicationClient replicationClient, EurekaServerConfig config, int batchSize, long maxBatchingDelayMs, long retrySleepTimeMs, long serverUnavailableSleepTimeMs) { this.registry = registry; this.targetHost = targetHost; this.replicationClient = replicationClient; this.serviceUrl = serviceUrl; this.config = config; this.maxProcessingDelayMs = config.getMaxTimeForReplication(); String batcherName = getBatcherName(); ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient); this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher( batcherName, config.getMaxElementsInPeerReplicationPool(), batchSize, config.getMaxThreadsForPeerReplication(), maxBatchingDelayMs, serverUnavailableSleepTimeMs, retrySleepTimeMs, taskProcessor ); this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher( targetHost, config.getMaxElementsInStatusReplicationPool(), config.getMaxThreadsForStatusReplication(), maxBatchingDelayMs, serverUnavailableSleepTimeMs, retrySleepTimeMs, taskProcessor ); } //...... }
/** * Send the status information of of the ASG represented by the instance. * * <p> * ASG (Autoscaling group) names are available for instances in AWS and the * ASG information is used for determining if the instance should be * registered as {@link InstanceStatus#DOWN} or {@link InstanceStatus#UP}. * * @param asgName * the asg name if any of this instance. * @param newStatus * the new status of the ASG. */ public void statusUpdate(final String asgName, final ASGStatus newStatus) { long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs; nonBatchingDispatcher.process( asgName, new AsgReplicationTask(targetHost, Action.StatusUpdate, asgName, newStatus) { public EurekaHttpResponse<?> execute() { return replicationClient.statusUpdate(asgName, newStatus); } }, expiryTime ); }
提交任務到nonBatchingDispatchergit
public void cancel(final String appName, final String id) throws Exception { long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs; batchingDispatcher.process( taskId("cancel", appName, id), new InstanceReplicationTask(targetHost, Action.Cancel, appName, id) { @Override public EurekaHttpResponse<Void> execute() { return replicationClient.cancel(appName, id); } @Override public void handleFailure(int statusCode, Object responseEntity) throws Throwable { super.handleFailure(statusCode, responseEntity); if (statusCode == 404) { logger.warn("{}: missing entry.", getTaskName()); } } }, expiryTime ); }
像cancel等方法是提交到batchingDispatchergithub
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/cluster/ReplicationTask.javaapp
/** * Base class for all replication tasks. */ abstract class ReplicationTask { private static final Logger logger = LoggerFactory.getLogger(ReplicationTask.class); protected final String peerNodeName; protected final Action action; ReplicationTask(String peerNodeName, Action action) { this.peerNodeName = peerNodeName; this.action = action; } public abstract String getTaskName(); public Action getAction() { return action; } public abstract EurekaHttpResponse<?> execute() throws Throwable; public void handleSuccess() { } public void handleFailure(int statusCode, Object responseEntity) throws Throwable { logger.warn("The replication of task {} failed with response code {}", getTaskName(), statusCode); } }
它是全部replication任務的基類ide
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/cluster/InstanceReplicationTask.javaoop
/** * Base {@link ReplicationTask} class for instance related replication requests. * * @author Tomasz Bak */ public abstract class InstanceReplicationTask extends ReplicationTask { /** * For cancel request there may be no InstanceInfo object available so we need to store app/id pair * explicitly. */ private final String appName; private final String id; private final InstanceInfo instanceInfo; private final InstanceStatus overriddenStatus; private final boolean replicateInstanceInfo; //...... }
跟instance相關的replication任務,peerEurekaNode裏頭的register、heartbeat、statusUpdate、deleteStatusOverride、cancel用的都是InstanceReplicationTask。其中statusUpdate是提交到nonBatchingDispatcher,其餘的都提交到batchingDispatcherui
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/util/batcher/TaskDispatcher.javathis
/** * Task dispatcher takes task from clients, and delegates their execution to a configurable number of workers. * The task can be processed one at a time or in batches. Only non-expired tasks are executed, and if a newer * task with the same id is scheduled for execution, the old one is deleted. Lazy dispatch of work (only on demand) * to workers, guarantees that data are always up to date, and no stale task processing takes place. * <h3>Task processor</h3> * A client of this component must provide an implementation of {@link TaskProcessor} interface, which will do * the actual work of task processing. This implementation must be thread safe, as it is called concurrently by * multiple threads. * <h3>Execution modes</h3> * To create non batched executor call {@link TaskDispatchers#createNonBatchingTaskDispatcher(String, int, int, long, long, TaskProcessor)} * method. Batched executor is created by {@link TaskDispatchers#createBatchingTaskDispatcher(String, int, int, int, long, long, TaskProcessor)}. * * @author Tomasz Bak */ public interface TaskDispatcher<ID, T> { void process(ID id, T task, long expiryTime); void shutdown(); }
這個TaskDispatcher主要是任務分發,其中最重要的一點是隻有沒有過時的任務纔會執行,而後若是同一個id有更新的任務調度,則舊的那個將會被刪除掉。TaskDispatcher分nonBatchingDispatcher以及batchingDispatcher兩種。spa
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/util/batcher/TaskDispatchers.java設計
public class TaskDispatchers { public static <ID, T> TaskDispatcher<ID, T> createNonBatchingTaskDispatcher(String id, int maxBufferSize, int workerCount, long maxBatchingDelay, long congestionRetryDelayMs, long networkFailureRetryMs, TaskProcessor<T> taskProcessor) { final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>( id, maxBufferSize, 1, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs ); final TaskExecutors<ID, T> taskExecutor = TaskExecutors.singleItemExecutors(id, workerCount, taskProcessor, acceptorExecutor); return new TaskDispatcher<ID, T>() { @Override public void process(ID id, T task, long expiryTime) { acceptorExecutor.process(id, task, expiryTime); } @Override public void shutdown() { acceptorExecutor.shutdown(); taskExecutor.shutdown(); } }; } public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id, int maxBufferSize, int workloadSize, int workerCount, long maxBatchingDelay, long congestionRetryDelayMs, long networkFailureRetryMs, TaskProcessor<T> taskProcessor) { final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>( id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs ); final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor); return new TaskDispatcher<ID, T>() { @Override public void process(ID id, T task, long expiryTime) { acceptorExecutor.process(id, task, expiryTime); } @Override public void shutdown() { acceptorExecutor.shutdown(); taskExecutor.shutdown(); } }; } }
提供了兩個工廠方法,分別用來建立nonBatchingDispatcher以及batchingDispatcher。前者的AcceptorExecutor的maxBatchingSize爲1,TaskExecutors是singleItemExecutors方法建立;後者的AcceptorExecutor的maxBatchingSize由構造器傳入設置,默認是250,TaskExecutors是batchExecutors方法建立。
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/util/batcher/AcceptorExecutor.java
private final BlockingQueue<TaskHolder<ID, T>> acceptorQueue = new LinkedBlockingQueue<>(); private final BlockingDeque<TaskHolder<ID, T>> reprocessQueue = new LinkedBlockingDeque<>(); void process(ID id, T task, long expiryTime) { acceptorQueue.add(new TaskHolder<ID, T>(id, task, expiryTime)); acceptedTasks++; } void reprocess(List<TaskHolder<ID, T>> holders, ProcessingResult processingResult) { reprocessQueue.addAll(holders); replayedTasks += holders.size(); trafficShaper.registerFailure(processingResult); } void reprocess(TaskHolder<ID, T> taskHolder, ProcessingResult processingResult) { reprocessQueue.add(taskHolder); replayedTasks++; trafficShaper.registerFailure(processingResult); }
process放入acceptorQueue,reprocess放入reprocessQueue
ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors"); this.acceptorThread = new Thread(threadGroup, new AcceptorRunner(), "TaskAcceptor-" + id); this.acceptorThread.setDaemon(true); this.acceptorThread.start(); class AcceptorRunner implements Runnable { @Override public void run() { long scheduleTime = 0; while (!isShutdown.get()) { try { drainInputQueues(); int totalItems = processingOrder.size(); long now = System.currentTimeMillis(); if (scheduleTime < now) { scheduleTime = now + trafficShaper.transmissionDelay(); } if (scheduleTime <= now) { assignBatchWork(); assignSingleItemWork(); } // If no worker is requesting data or there is a delay injected by the traffic shaper, // sleep for some time to avoid tight loop. if (totalItems == processingOrder.size()) { Thread.sleep(10); } } catch (InterruptedException ex) { // Ignore } catch (Throwable e) { // Safe-guard, so we never exit this loop in an uncontrolled way. logger.warn("Discovery AcceptorThread error", e); } } } //...... }
這裏會循環不斷地drainInputQueues,而後assignBatchWork、assignSingleItemWork
private void drainInputQueues() throws InterruptedException { do { drainReprocessQueue(); drainAcceptorQueue(); if (!isShutdown.get()) { // If all queues are empty, block for a while on the acceptor queue if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) { TaskHolder<ID, T> taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS); if (taskHolder != null) { appendTaskHolder(taskHolder); } } } } while (!reprocessQueue.isEmpty() || !acceptorQueue.isEmpty() || pendingTasks.isEmpty()); }
這裏調用了drainReprocessQueue、drainAcceptorQueue
private void drainAcceptorQueue() { while (!acceptorQueue.isEmpty()) { appendTaskHolder(acceptorQueue.poll()); } } private void appendTaskHolder(TaskHolder<ID, T> taskHolder) { if (isFull()) { pendingTasks.remove(processingOrder.poll()); queueOverflows++; } TaskHolder<ID, T> previousTask = pendingTasks.put(taskHolder.getId(), taskHolder); if (previousTask == null) { processingOrder.add(taskHolder.getId()); } else { overriddenTasks++; } }
把acceptorQueue裏頭的任務拿出來,放到pendingTasks隊列裏頭
private void drainReprocessQueue() { long now = System.currentTimeMillis(); while (!reprocessQueue.isEmpty() && !isFull()) { TaskHolder<ID, T> taskHolder = reprocessQueue.pollLast(); ID id = taskHolder.getId(); if (taskHolder.getExpiryTime() <= now) { expiredTasks++; } else if (pendingTasks.containsKey(id)) { overriddenTasks++; } else { pendingTasks.put(id, taskHolder); processingOrder.addFirst(id); } } if (isFull()) { queueOverflows += reprocessQueue.size(); reprocessQueue.clear(); } }
把reprocessQueue裏頭的任務拿出來,若是沒有過時並且不是重複id,則放到pendingTasks,而且processingOrder.addFirst(id)
void assignSingleItemWork() { if (!processingOrder.isEmpty()) { if (singleItemWorkRequests.tryAcquire(1)) { long now = System.currentTimeMillis(); while (!processingOrder.isEmpty()) { ID id = processingOrder.poll(); TaskHolder<ID, T> holder = pendingTasks.remove(id); if (holder.getExpiryTime() > now) { singleItemWorkQueue.add(holder); return; } expiredTasks++; } singleItemWorkRequests.release(); } } } void assignBatchWork() { if (hasEnoughTasksForNextBatch()) { if (batchWorkRequests.tryAcquire(1)) { long now = System.currentTimeMillis(); int len = Math.min(maxBatchingSize, processingOrder.size()); List<TaskHolder<ID, T>> holders = new ArrayList<>(len); while (holders.size() < len && !processingOrder.isEmpty()) { ID id = processingOrder.poll(); TaskHolder<ID, T> holder = pendingTasks.remove(id); if (holder.getExpiryTime() > now) { holders.add(holder); } else { expiredTasks++; } } if (holders.isEmpty()) { batchWorkRequests.release(); } else { batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS); batchWorkQueue.add(holders); } } } }
這裏頭就是根據優先級把pendingTasks的任務放到singleItemWorkQueue或者batchWorkQueue
abstract static class WorkerRunnable<ID, T> implements Runnable { final String workerName; final AtomicBoolean isShutdown; final TaskExecutorMetrics metrics; final TaskProcessor<T> processor; final AcceptorExecutor<ID, T> taskDispatcher; WorkerRunnable(String workerName, AtomicBoolean isShutdown, TaskExecutorMetrics metrics, TaskProcessor<T> processor, AcceptorExecutor<ID, T> taskDispatcher) { this.workerName = workerName; this.isShutdown = isShutdown; this.metrics = metrics; this.processor = processor; this.taskDispatcher = taskDispatcher; } String getWorkerName() { return workerName; } }
定義了基本的runnable類
private final BlockingQueue<TaskHolder<ID, T>> singleItemWorkQueue = new LinkedBlockingQueue<>(); static class SingleTaskWorkerRunnable<ID, T> extends WorkerRunnable<ID, T> { SingleTaskWorkerRunnable(String workerName, AtomicBoolean isShutdown, TaskExecutorMetrics metrics, TaskProcessor<T> processor, AcceptorExecutor<ID, T> acceptorExecutor) { super(workerName, isShutdown, metrics, processor, acceptorExecutor); } @Override public void run() { try { while (!isShutdown.get()) { BlockingQueue<TaskHolder<ID, T>> workQueue = taskDispatcher.requestWorkItem(); TaskHolder<ID, T> taskHolder; while ((taskHolder = workQueue.poll(1, TimeUnit.SECONDS)) == null) { if (isShutdown.get()) { return; } } metrics.registerExpiryTime(taskHolder); if (taskHolder != null) { ProcessingResult result = processor.process(taskHolder.getTask()); switch (result) { case Success: break; case Congestion: case TransientError: taskDispatcher.reprocess(taskHolder, result); break; case PermanentError: logger.warn("Discarding a task of {} due to permanent error", workerName); } metrics.registerTaskResult(result, 1); } } } catch (InterruptedException e) { // Ignore } catch (Throwable e) { // Safe-guard, so we never exit this loop in an uncontrolled way. logger.warn("Discovery WorkerThread error", e); } } }
這裏是直接從singleItemWorkQueue去poll任務,poll出來是TaskHolder<ID, T>>
private final BlockingQueue<List<TaskHolder<ID, T>>> batchWorkQueue = new LinkedBlockingQueue<>(); static class BatchWorkerRunnable<ID, T> extends WorkerRunnable<ID, T> { BatchWorkerRunnable(String workerName, AtomicBoolean isShutdown, TaskExecutorMetrics metrics, TaskProcessor<T> processor, AcceptorExecutor<ID, T> acceptorExecutor) { super(workerName, isShutdown, metrics, processor, acceptorExecutor); } @Override public void run() { try { while (!isShutdown.get()) { List<TaskHolder<ID, T>> holders = getWork(); metrics.registerExpiryTimes(holders); List<T> tasks = getTasksOf(holders); ProcessingResult result = processor.process(tasks); switch (result) { case Success: break; case Congestion: case TransientError: taskDispatcher.reprocess(holders, result); break; case PermanentError: logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName); } metrics.registerTaskResult(result, tasks.size()); } } catch (InterruptedException e) { // Ignore } catch (Throwable e) { // Safe-guard, so we never exit this loop in an uncontrolled way. logger.warn("Discovery WorkerThread error", e); } } private List<TaskHolder<ID, T>> getWork() throws InterruptedException { BlockingQueue<List<TaskHolder<ID, T>>> workQueue = taskDispatcher.requestWorkItems(); List<TaskHolder<ID, T>> result; do { result = workQueue.poll(1, TimeUnit.SECONDS); } while (!isShutdown.get() && result == null); return (result == null) ? new ArrayList<>() : result; } private List<T> getTasksOf(List<TaskHolder<ID, T>> holders) { List<T> tasks = new ArrayList<>(holders.size()); for (TaskHolder<ID, T> holder : holders) { tasks.add(holder.getTask()); } return tasks; } }
從batchWorkQueue取poll任務,不過與single不一樣的是,它poll出來是List<TaskHolder<ID, T>>
兩者對ProcessingResult的處理邏輯都同樣,以下:
switch (result) { case Success: break; case Congestion: case TransientError: taskDispatcher.reprocess(holders, result); break; case PermanentError: logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName); }
對於Congestion以及TransientError會從新放入隊列重試,對於PermanentError則會log warn一下。
eureka本身設計了TaskDispatcher,分爲nonBatchingDispatcher以及batchingDispatcher。
調度任務爲繼承ReplicationTask的InstanceReplicationTask,其定義了基本屬性,可是定義了public abstract String getTaskName()
以及public abstract EurekaHttpResponse<?> execute() throws Throwable
兩個抽象方法,它們在PeerEurekaNode裏頭有匿名的實現類,實現register、cancel等相應的請求邏輯。
調度邏輯主要是支持根據id及優先級來調度,後來的同id的任務會覆蓋正在運行的同id的任務,若是處理失敗,則會放入重試隊列,以後以最高優先級放入pendingTasks。