本文主要介紹一下jesque的event機制java
jesque-2.1.2-sources.jar!/net/greghaines/jesque/worker/WorkerEvent.javagit
/** * The possible WorkerEvents that a WorkerListener may register for. */ public enum WorkerEvent { /** * The Worker just finished starting up and is about to start running. */ WORKER_START, /** * The Worker is polling the queue. */ WORKER_POLL, /** * The Worker is processing a Job. */ JOB_PROCESS, /** * The Worker is about to execute a materialized Job. */ JOB_EXECUTE, /** * The Worker successfully executed a materialized Job. */ JOB_SUCCESS, /** * The Worker caught an Exception during the execution of a materialized Job. */ JOB_FAILURE, /** * The Worker caught an Exception during normal operation. */ WORKER_ERROR, /** * The Worker just finished running and is about to shutdown. */ WORKER_STOP; }
JOB_PROCESS與JOB_EXECUTE可能讓人有點迷糊。兩者之間有個去redis更新狀態以及實例化job的操做,而JOB_EXECUTE則是before execute的意思
JOB_SUCCESS以及JOB_FAILURE則是after execute的意思github
jesque-2.1.2-sources.jar!/net/greghaines/jesque/worker/WorkerEventEmitter.javaredis
/** * A WorkerEventEmitter allows WorkerListeners to register for WorkerEvents. */ public interface WorkerEventEmitter { /** * Register a WorkerListener for all WorkerEvents. * @param listener the WorkerListener to register */ void addListener(WorkerListener listener); /** * Register a WorkerListener for the specified WorkerEvents. * @param listener the WorkerListener to register * @param events the WorkerEvents to be notified of */ void addListener(WorkerListener listener, WorkerEvent... events); /** * Unregister a WorkerListener for all WorkerEvents. * @param listener the WorkerListener to unregister */ void removeListener(WorkerListener listener); /** * Unregister a WorkerListener for the specified WorkerEvents. * @param listener the WorkerListener to unregister * @param events the WorkerEvents to no longer be notified of */ void removeListener(WorkerListener listener, WorkerEvent... events); /** * Unregister all WorkerListeners for all WorkerEvents. */ void removeAllListeners(); /** * Unregister all WorkerListeners for the specified WorkerEvents. * @param events the WorkerEvents to no longer be notified of */ void removeAllListeners(WorkerEvent... events); }
定義了event emitter的接口ide
jesque-2.1.0-sources.jar!/net/greghaines/jesque/worker/WorkerListenerDelegate.javathis
/** * WorkerListenerDelegate keeps track of WorkerListeners and notifies each listener when fireEvent() is invoked. */ public class WorkerListenerDelegate implements WorkerEventEmitter { private static final Logger log = LoggerFactory.getLogger(WorkerListenerDelegate.class); private final Map<WorkerEvent, ConcurrentSet<WorkerListener>> eventListenerMap; /** * Constructor. */ public WorkerListenerDelegate() { final Map<WorkerEvent, ConcurrentSet<WorkerListener>> elp = new EnumMap<WorkerEvent, ConcurrentSet<WorkerListener>>(WorkerEvent.class); for (final WorkerEvent event : WorkerEvent.values()) { elp.put(event, new ConcurrentHashSet<WorkerListener>()); } this.eventListenerMap = Collections.unmodifiableMap(elp); } /** * {@inheritDoc} */ @Override public void addListener(final WorkerListener listener) { addListener(listener, WorkerEvent.values()); } /** * {@inheritDoc} */ @Override public void addListener(final WorkerListener listener, final WorkerEvent... events) { if (listener != null) { for (final WorkerEvent event : events) { final ConcurrentSet<WorkerListener> listeners = this.eventListenerMap.get(event); if (listeners != null) { listeners.add(listener); } } } } /** * {@inheritDoc} */ @Override public void removeListener(final WorkerListener listener) { removeListener(listener, WorkerEvent.values()); } /** * {@inheritDoc} */ @Override public void removeListener(final WorkerListener listener, final WorkerEvent... events) { if (listener != null) { for (final WorkerEvent event : events) { final ConcurrentSet<WorkerListener> listeners = this.eventListenerMap.get(event); if (listeners != null) { listeners.remove(listener); } } } } /** * {@inheritDoc} */ @Override public void removeAllListeners() { removeAllListeners(WorkerEvent.values()); } /** * {@inheritDoc} */ @Override public void removeAllListeners(final WorkerEvent... events) { for (final WorkerEvent event : events) { final ConcurrentSet<WorkerListener> listeners = this.eventListenerMap.get(event); if (listeners != null) { listeners.clear(); } } } /** * Notify all WorkerListeners currently registered for the given WorkerEvent. * @param event the WorkerEvent that occurred * @param worker the Worker that the event occurred in * @param queue the queue the Worker is processing * @param job the Job related to the event (only supply for JOB_PROCESS, JOB_EXECUTE, JOB_SUCCESS, and * JOB_FAILURE events) * @param runner the materialized object that the Job specified (only supply for JOB_EXECUTE and * JOB_SUCCESS events) * @param result the result of the successful execution of the Job (only set for JOB_SUCCESS and if the Job was * a Callable that returned a value) * @param t the Throwable that caused the event (only supply for JOB_FAILURE and ERROR events) */ public void fireEvent(final WorkerEvent event, final Worker worker, final String queue, final Job job, final Object runner, final Object result, final Throwable t) { final ConcurrentSet<WorkerListener> listeners = this.eventListenerMap.get(event); if (listeners != null) { for (final WorkerListener listener : listeners) { if (listener != null) { try { listener.onEvent(event, worker, queue, job, runner, result, t); } catch (Exception e) { log.error("Failure executing listener " + listener + " for event " + event + " from queue " + queue + " on worker " + worker, e); } } } } } }
event emitter的實現類,使用EnumMap來存放listener,key是WorkerEvent枚舉,而value則是listener的ConcurrentSet,即同一個event能夠有多個listener。code
jesque-2.1.2-sources.jar!/net/greghaines/jesque/worker/WorkerImpl.javaorm
protected final WorkerListenerDelegate listenerDelegate = new WorkerListenerDelegate(); //...... protected void process(final Job job, final String curQueue) { try { this.processingJob.set(true); if (threadNameChangingEnabled) { renameThread("Processing " + curQueue + " since " + System.currentTimeMillis()); } this.listenerDelegate.fireEvent(JOB_PROCESS, this, curQueue, job, null, null, null); this.jedis.set(key(WORKER, this.name), statusMsg(curQueue, job)); final Object instance = this.jobFactory.materializeJob(job); final Object result = execute(job, curQueue, instance); success(job, instance, result, curQueue); } catch (Throwable thrwbl) { failure(thrwbl, job, curQueue); } finally { removeInFlight(curQueue); this.jedis.del(key(WORKER, this.name)); this.processingJob.set(false); } }
在wokerImpl類裏頭,組合了WorkerEventEmitter的實現類,而後在相應的方法裏頭去觸發/通知相應的listener(
默認是同步執行
)接口
其實本質就是觀察者模式,workerImpl是被觀察者,listener是觀察者,wokerImpl在有相應執行點會觸發相應事件,同步通知listner執行相關邏輯。事件