本文主要講一下jesque的WorkerImpl與WorkerPool。java
Resque是一個使用redis來建立後臺任務的ruby組件。而jesque是其java版本。一般用來作延時隊列。git
List<String> queues = Arrays.asList(delayedQueue); final Worker worker = new WorkerImpl(jesqueConfig,queues, new MapBasedJobFactory(map(entry("DemoJob", DemoJob.class)))); final Thread workerThread = new Thread(worker); workerThread.start();
這是worker實例
jesque-2.1.2-sources.jar!/net/greghaines/jesque/worker/WorkerImpl.javagithub
/** * Starts this worker. Registers the worker in Redis and begins polling the queues for jobs.<br> * Stop this worker by calling end() on any thread. */ @Override public void run() { if (this.state.compareAndSet(NEW, RUNNING)) { try { renameThread("RUNNING"); this.threadRef.set(Thread.currentThread()); this.jedis.sadd(key(WORKERS), this.name); this.jedis.set(key(WORKER, this.name, STARTED), new SimpleDateFormat(DATE_FORMAT).format(new Date())); this.listenerDelegate.fireEvent(WORKER_START, this, null, null, null, null, null); this.popScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(POP_LUA))); this.lpoplpushScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(LPOPLPUSH_LUA))); this.multiPriorityQueuesScriptHash .set(this.jedis.scriptLoad(ScriptUtils.readScript(POP_FROM_MULTIPLE_PRIO_QUEUES))); poll(); } catch (Exception ex) { LOG.error("Uncaught exception in worker run-loop!", ex); this.listenerDelegate.fireEvent(WORKER_ERROR, this, null, null, null, null, ex); } finally { renameThread("STOPPING"); this.listenerDelegate.fireEvent(WORKER_STOP, this, null, null, null, null, null); this.jedis.srem(key(WORKERS), this.name); this.jedis.del(key(WORKER, this.name), key(WORKER, this.name, STARTED), key(STAT, FAILED, this.name), key(STAT, PROCESSED, this.name)); this.jedis.quit(); this.threadRef.set(null); } } else if (RUNNING.equals(this.state.get())) { throw new IllegalStateException("This WorkerImpl is already running"); } else { throw new IllegalStateException("This WorkerImpl is shutdown"); } }
實現了runnable方法,裏頭poll方法無限循環redis
/** * Polls the queues for jobs and executes them. */ protected void poll() { int missCount = 0; String curQueue = null; while (RUNNING.equals(this.state.get())) { try { if (threadNameChangingEnabled) { renameThread("Waiting for " + JesqueUtils.join(",", this.queueNames)); } curQueue = getNextQueue(); if (curQueue != null) { checkPaused(); // Might have been waiting in poll()/checkPaused() for a while if (RUNNING.equals(this.state.get())) { this.listenerDelegate.fireEvent(WORKER_POLL, this, curQueue, null, null, null, null); final String payload = pop(curQueue); if (payload != null) { process(ObjectMapperFactory.get().readValue(payload, Job.class), curQueue); missCount = 0; } else { missCount++; if (shouldSleep(missCount) && RUNNING.equals(this.state.get())) { // Keeps worker from busy-spinning on empty queues missCount = 0; Thread.sleep(EMPTY_QUEUE_SLEEP_TIME); } } } } } catch (InterruptedException ie) { if (!isShutdown()) { recoverFromException(curQueue, ie); } } catch (JsonParseException | JsonMappingException e) { // If the job JSON is not deserializable, we never want to submit it again... removeInFlight(curQueue); recoverFromException(curQueue, e); } catch (Exception e) { recoverFromException(curQueue, e); } } }
不斷地pop和processruby
/** * Materializes and executes the given job. * * @param job the Job to process * @param curQueue the queue the payload came from */ protected void process(final Job job, final String curQueue) { try { this.processingJob.set(true); if (threadNameChangingEnabled) { renameThread("Processing " + curQueue + " since " + System.currentTimeMillis()); } this.listenerDelegate.fireEvent(JOB_PROCESS, this, curQueue, job, null, null, null); this.jedis.set(key(WORKER, this.name), statusMsg(curQueue, job)); final Object instance = this.jobFactory.materializeJob(job); final Object result = execute(job, curQueue, instance); success(job, instance, result, curQueue); } catch (Throwable thrwbl) { failure(thrwbl, job, curQueue); } finally { removeInFlight(curQueue); this.jedis.del(key(WORKER, this.name)); this.processingJob.set(false); } }
而process這個方法,就是實例化目標job,而後execute併發
/** * Executes the given job. * * @param job the job to execute * @param curQueue the queue the job came from * @param instance the materialized job * @throws Exception if the instance is a {@link Callable} and throws an exception * @return result of the execution */ protected Object execute(final Job job, final String curQueue, final Object instance) throws Exception { if (instance instanceof WorkerAware) { ((WorkerAware) instance).setWorker(this); } this.listenerDelegate.fireEvent(JOB_EXECUTE, this, curQueue, job, instance, null, null); final Object result; if (instance instanceof Callable) { result = ((Callable<?>) instance).call(); // The job is executing! } else if (instance instanceof Runnable) { ((Runnable) instance).run(); // The job is executing! result = null; } else { // Should never happen since we're testing the class earlier throw new ClassCastException( "Instance must be a Runnable or a Callable: " + instance.getClass().getName() + " - " + instance); } return result; }
而execute就是調用call或者run方法。
從這裏能夠看出是單線程阻塞的,若是一個job比較耗時,是會影響其餘job的觸發和執行。app
jesque-2.1.2-sources.jar!/net/greghaines/jesque/worker/WorkerPool.javaide
/** * Create a WorkerPool with the given number of Workers and the given <code>ThreadFactory</code>. * @param workerFactory a Callable that returns an implementation of Worker * @param numWorkers the number of Workers to create * @param threadFactory the factory to create pre-configured Threads */ public WorkerPool(final Callable<? extends Worker> workerFactory, final int numWorkers, final ThreadFactory threadFactory) { this.workers = new ArrayList<>(numWorkers); this.threads = new ArrayList<>(numWorkers); this.eventEmitter = new WorkerPoolEventEmitter(this.workers); for (int i = 0; i < numWorkers; i++) { try { final Worker worker = workerFactory.call(); this.workers.add(worker); this.threads.add(threadFactory.newThread(worker)); } catch (RuntimeException re) { throw re; } catch (Exception e) { throw new RuntimeException(e); } } } /** * {@inheritDoc} */ @Override public void run() { for (final Thread thread : this.threads) { thread.start(); } Thread.yield(); } /** * {@inheritDoc} */ @Override public void end(final boolean now) { for (final Worker worker : this.workers) { worker.end(now); } }
workerpool維護了一組worker實例,起線程池的做用,儘量提升job的併發度。oop