本文主要聊一下jesque的幾個daojava
FailureDAOapp
KeysDAOide
QueueInfoDAOthis
WorkerInfoDAOspa
jesque-2.1.0-sources.jar!/net/greghaines/jesque/meta/dao/FailureDAO.javacode
/** * FailureDAO provides access to job failures. * * @author Greg Haines */ public interface FailureDAO { /** * @return total number of failures */ long getCount(); /** * @param offset offset into the failures * @param count number of failures to return * @return a sub-list of the failures */ List<JobFailure> getFailures(long offset, long count); /** * Clear the list of failures. */ void clear(); /** * Re-queue a job for execution. * @param index the index into the failure list * @return the date the job was re-queued */ Date requeue(long index); /** * Remove a failure from the list. * @param index the index of the failure to remove */ void remove(long index); }
主要操縱的是namespace:failed,是一個list類型orm
count對象
使用llen方法獲取隊列長度隊列
clearrem
使用del刪除namespace:failed隊列
getFailures
使用lrange命令查詢
requeue
根據index取出failed job,從新設定retry時間,放到入隊列中
remove
根據index刪,使用lrem,這裏是先lset一個隨機值,再根據這個隨機值lrem
jesque-2.1.0-sources.jar!/net/greghaines/jesque/meta/dao/KeysDAO.java
/** * KeysDAO provides access to available keys. * * @author Greg Haines */ public interface KeysDAO { /** * Get basic key info. * @param key the key name * @return the key information or null if the key did not exist */ KeyInfo getKeyInfo(String key); /** * Get basic key info plus a sub-list of the array value for the key, if applicable. * @param key the key name * @param offset the offset into the array * @param count the number of values to return * @return the key information or null if the key did not exist */ KeyInfo getKeyInfo(String key, int offset, int count); /** * Get basic info on all keys. * @return a list of key informations */ List<KeyInfo> getKeyInfos(); /** * @return information about the backing Redis database */ Map<String, String> getRedisInfo(); }
getKeyInfo
使用type獲取類型
getKeyInfos
使用keys *方法
getRedisInfo
使用info
jesque-2.1.0-sources.jar!/net/greghaines/jesque/meta/dao/QueueInfoDAO.java
/** * QueueInfoDAO provides access to the queues in use by Jesque. * * @author Greg Haines */ public interface QueueInfoDAO { /** * @return the list of queue names */ List<String> getQueueNames(); /** * @return total number of jobs pending in all queues */ long getPendingCount(); /** * @return total number of jobs processed */ long getProcessedCount(); /** * @return the list of queue informations */ List<QueueInfo> getQueueInfos(); /** * @param name the queue name * @param jobOffset the offset into the queue * @param jobCount the number of jobs to return * @return the queue information or null if the queue does not exist */ QueueInfo getQueueInfo(String name, long jobOffset, long jobCount); /** * Delete the given queue. * @param name the name of the queue */ void removeQueue(String name); }
getQueueNames
使用smembers方法操做namespace:queues
getPendingCount
對每一個queue計算大小,分queue類型
private long size(final Jedis jedis, final String queueName) { final String key = key(QUEUE, queueName); final long size; if (JedisUtils.isDelayedQueue(jedis, key)) { // If delayed queue, use ZCARD size = jedis.zcard(key); } else { // Else, use LLEN size = jedis.llen(key); } return size; }
延時隊列使用的是zcard操做SortSet
非延時隊列使用llen操做list
getProcessedCount
直接查詢stat的string對象
getQueueInfos
順帶計算每一個queue的大小
removeQueue
public void removeQueue(final String name) { PoolUtils.doWorkInPoolNicely(this.jedisPool, new PoolWork<Jedis, Void>() { /** * {@inheritDoc} */ @Override public Void doWork(final Jedis jedis) throws Exception { jedis.srem(key(QUEUES), name); jedis.del(key(QUEUE, name)); return null; } }); }
操做了queues以及queue兩個對象
jesque-2.1.0-sources.jar!/net/greghaines/jesque/meta/dao/WorkerInfoDAO.java
/** * WorkerInfoDAO provides access to information about workers. * * @author Greg Haines */ public interface WorkerInfoDAO { /** * @return total number of workers known */ long getWorkerCount(); /** * @return number of active workers */ long getActiveWorkerCount(); /** * @return number of paused workers */ long getPausedWorkerCount(); /** * @return information about all active workers */ List<WorkerInfo> getActiveWorkers(); /** * @return information about all paused workers */ List<WorkerInfo> getPausedWorkers(); /** * @return information about all workers */ List<WorkerInfo> getAllWorkers(); /** * @param workerName the name of the worker * @return information about the given worker or null if that worker does not exist */ WorkerInfo getWorker(String workerName); /** * @return a map of worker informations by hostname */ Map<String, List<WorkerInfo>> getWorkerHostMap(); /** * Removes the metadata about a worker. * * @param workerName * The worker name to remove */ void removeWorker(String workerName); }
getAllWorkers
smembers操做namespace:workers
getActiveWorkers
smembers操做namespace:workers,而後過來出來state是working的
getPausedWorkers
smembers操做namespace:workers,而後過來出來state是paused的
getWorkerCount
直接scard操做namespace:workers
getActiveWorkerCount
smembers操做namespace:workers,而後過來出來state是working的
getPausedWorkerCount
smembers操做namespace:workers,而後過來出來state是paused的
getWorkerHostMap
smembers操做namespace:workers,而後按照host來分map
這個基本是萬能的,其餘的count基本是這個衍生出來
removeWorker
public void removeWorker(final String workerName) { PoolUtils.doWorkInPoolNicely(this.jedisPool, new PoolWork<Jedis, Void>() { /** * {@inheritDoc} */ @Override public Void doWork(final Jedis jedis) throws Exception { jedis.srem(key(WORKERS), workerName); jedis.del(key(WORKER, workerName), key(WORKER, workerName, STARTED), key(STAT, FAILED, workerName), key(STAT, PROCESSED, workerName)); return null; } }); }
操做works以及其餘相關的對象