原文地址: www.jianshu.com/p/1ed50e6d4…git
Bull是基於Redis的一個Node.js任務隊列管理庫,支持延遲隊列,優先級任務,重複任務,以及原子操做等多種功能.github
本文將從基本的使用來分析Bull的源碼,對於repeat job,seperate processes等暫不展開.redis
Bull: Premium Queue package for handling jobs and messages in NodeJS.json
相關的信息以下:promise
develop
Bull的使用分爲三個步驟:bash
以下示例:async
const Bull = require('bull')
// 1. 建立隊列
const myFirstQueue = new Bull('my-first-queue');
// 2. 綁定任務處理函數
myFirstQueue.process(async (job, data) => {
return doSomething(data);
});
// 3. 添加任務
const job = await myFirstQueue.add({
foo: 'bar'
});
複製代碼
建立隊列是先經過require
而後再經過new
來實現的,所以要先找到require
的入口.打開package.json
:ide
{
"name": "bull",
"version": "3.7.0",
"description": "Job manager",
"main": "./index.js",
...
}
複製代碼
看到入口爲index.js
,打開:函數
module.exports = require('./lib/queue');
module.exports.Job = require('./lib/job');
複製代碼
從而找到目標函數所在文件./lib/queue
:ui
module.exports = Queue;
複製代碼
能夠看到exports的是Queue
,接着去分析Queue
函數:
const Queue = function Queue(name, url, opts) {
...
// 默認設置
this.settings = _.defaults(opts.settings, {
lockDuration: 30000,
stalledInterval: 30000,
maxStalledCount: 1,
guardInterval: 5000,
retryProcessDelay: 5000,
drainDelay: 5, // 空隊列時brpoplpush的等待時間
backoffStrategies: {}
});
...
// Bind these methods to avoid constant rebinding and/or creating closures
// in processJobs etc.
this.moveUnlockedJobsToWait = this.moveUnlockedJobsToWait.bind(this);
this.processJob = this.processJob.bind(this);
this.getJobFromId = Job.fromId.bind(null, this);
...
};
複製代碼
主要是進行參數初始化和函數的綁定.
該步驟是從myFirstQueue.process
開始的,先看process
函數:
Queue.prototype.process = function (name, concurrency, handler) {
...
this.setHandler(name, handler); // 1. 綁定handler
return this._initProcess().then(() => {
return this.start(concurrency); // 2. 啓動隊列
});
};
複製代碼
該函數作了兩個事情:
先看綁定handler:
Queue.prototype.setHandler = function (name, handler) {
...
if (this.handlers[name]) {
throw new Error('Cannot define the same handler twice ' + name);
}
...
if (typeof handler === 'string') {
...
} else {
handler = handler.bind(this);
// 將handler和名字保存起來
if (handler.length > 1) {
this.handlers[name] = promisify(handler);
} else {
this.handlers[name] = function () {
...
}
}
};
複製代碼
再看隊列的啓動:
Queue.prototype.start = function (concurrency) {
return this.run(concurrency).catch(err => {
this.emit('error', err, 'error running queue');
throw err;
});
};
複製代碼
看run
函數:
Queue.prototype.run = function (concurrency) {
const promises = [];
return this.isReady()
.then(() => {
return this.moveUnlockedJobsToWait(); // 將unlocked的任務移動到wait隊列
})
.then(() => {
return utils.isRedisReady(this.bclient);
})
.then(() => {
while (concurrency--) {
promises.push(
new Promise(resolve => {
this.processJobs(concurrency, resolve); // 處理任務
})
);
}
this.startMoveUnlockedJobsToWait(); // unlocked job定時檢查
return Promise.all(promises);
});
};
複製代碼
unlocked job(stalled job): job的運行須要鎖,正常狀況下job在active時會獲取鎖(有過時時間
lockDuration
,定時延長lockRenewTime
),complete時釋放鎖,若是job在active時無鎖,說明進程被阻塞或崩潰致使鎖過時
看processJobs
:
Queue.prototype.processJobs = function (index, resolve, job) {
const processJobs = this.processJobs.bind(this, index, resolve);
process.nextTick(() => {
this._processJobOnNextTick(processJobs, index, resolve, job);
});
};
複製代碼
再看_processJobOnNextTick
:
// 關鍵代碼
const gettingNextJob = job ? Promise.resolve(job) : this.getNextJob();
return (this.processing[index] = gettingNextJob
.then(this.processJob)
.then(processJobs, err => {
...
}));
複製代碼
上述代碼能夠做以下描述:
getNextJob
函數來獲取jobprocessJob
函數processJobs
函數先看getNextJob
:
if (this.drained) {
//
// Waiting for new jobs to arrive
//
console.log('bclient start get new job');
return this.bclient
.brpoplpush(this.keys.wait, this.keys.active, this.settings.drainDelay)
.then(
jobId => {
if (jobId) {
return this.moveToActive(jobId);
}
},
err => {
...
}
);
} else {
return this.moveToActive();
}
複製代碼
運用Redis的PUSH/POP
機制來獲取消息,超時時間爲drainDelay
.
接着來看processJob
:
Queue.prototype.processJob = function (job) {
...
const handleCompleted = result => {
return job.moveToCompleted(result).then(jobData => {
...
return jobData ? this.nextJobFromJobData(jobData[0], jobData[1]) : null;
});
};
// 延長鎖的時間
lockExtender();
const handler = this.handlers[job.name] || this.handlers['*'];
if (!handler) {
...
} else {
let jobPromise = handler(job);
...
return jobPromise
.then(handleCompleted)
.catch(handleFailed)
.finally(() => {
stopTimer();
});
}
};
複製代碼
能夠看到任務處理成功後會調用handleCompleted
,在其中調用的是job的moveToCompleted
,中間還有一些調用,最終會調用lua腳本moveToFinished
:
...
-- Try to get next job to avoid an extra roundtrip if the queue is not closing,
-- and not rate limited.
...
複製代碼
該腳本到做用是將job移動到completed或failed隊列,而後取下一個job.
在processJob
執行完後就又重複執行processJobs
,這就是一個循環,這個是核心,以下圖:
直接看add函數:
Queue.prototype.add = function (name, data, opts) {
...
if (opts.repeat) {
...
} else {
return Job.create(this, name, data, opts);
}
};
複製代碼
調用的是Job中的create函數:
Job.create = function (queue, name, data, opts) {
const job = new Job(queue, name, data, opts); // 1. 建立job
return queue
.isReady()
.then(() => {
return addJob(queue, job); // 2. 添加job到隊列中
})
...
};
複製代碼
繼續沿着addJob
,最終會調用的是lua腳本的addJob
,根據job設置將job存入redis.
job stalled more than allowable limit
在run函數中執行了函數this.startMoveUnlockedJobsToWait()
,來看看該函數:
Queue.prototype.startMoveUnlockedJobsToWait = function () {
clearInterval(this.moveUnlockedJobsToWaitInterval);
if (this.settings.stalledInterval > 0 && !this.closing) {
this.moveUnlockedJobsToWaitInterval = setInterval(
this.moveUnlockedJobsToWait,
this.settings.stalledInterval
);
}
};
複製代碼
該函數是用來定時執行moveUnlockedJobsToWait
函數:
Queue.prototype.moveUnlockedJobsToWait = function () {
...
return scripts
.moveUnlockedJobsToWait(this)
.then(([failed, stalled]) => {
const handleFailedJobs = failed.map(jobId => {
return this.getJobFromId(jobId).then(job => {
this.emit(
'failed',
job,
new Error('job stalled more than allowable limit'),
'active'
);
return null;
});
});
...
})
...
;
};
複製代碼
該函數會經過scripts的moveUnlockedJobsToWait
函數最終調用lua腳本moveUnlockedJobsToWait
:
...
local MAX_STALLED_JOB_COUNT = tonumber(ARGV[1])
...
if(stalledCount > MAX_STALLED_JOB_COUNT) then
rcall("ZADD", KEYS[4], ARGV[3], jobId)
rcall("HSET", jobKey, "failedReason", "job stalled more than allowable limit")
table.insert(failed, jobId)
else
-- Move the job back to the wait queue, to immediately be picked up by a waiting worker.
rcall("RPUSH", dst, jobId)
rcall('PUBLISH', KEYS[1] .. '@', jobId)
table.insert(stalled, jobId)
end
...
return {failed, stalled}
複製代碼
- MAX_STALLED_JOB_COUNT: 默認爲1
該腳本會將stalled的job取出並返回,從而生成如題錯誤.