前面的文章將puppeteer作爬蟲的基礎一直到部署都梳理了一遍,如今來看一下分佈式的處理
1) 爲何須要分佈式
1. 須要抓取的不一樣數據有不少,會同時開啓無頭瀏覽器去抓取,而後獲取到數據後又無厘頭的一股腦擠進數據庫
2. 沒法保證同一時刻須要的數據只有一個操做在進行
2) 分佈式選擇
由於使用的是node,因此儘量的尋找node支持的分佈式框架
ZooKeeper 和 RabbitMQ 的思想百度上有好多說明,讀者能夠自行搜索做更詳細的瞭解
node版的
zookeeper
node版的
RabbitMQ
3) 銜接以前的 puppeteer進階版_爬取書旗小說 文章內容,文章只是放了一些主要的代碼,末尾會附上項目地址,你們能夠去擼一擼
發佈者,給書旗起一個標識爲 37 (channel_id),而後是要抓取書的書籍id(channel_book_id)
// 咱們以接口的形式接收爬取的參數
// 簡易版請求(除了接收參數不作任何處理) -> 發佈者
app.get('/v1.0/grasp_book', (req, res, next) => {
// 抓取時須要的參數
if (!req.query.channel_id && !req.query.channel_book_id) {
res.send({
code: 403,
msg: 'params error'
})
return null;
}
// 發佈者
// 鏈接rabbitmq
amqp.connect('amqp://rabbitmq:12345678@127.0.0.1:5672/').then(function(conn) {
return conn.createChannel().then(function(ch) {
// 建立 hello 的消息隊列
var q = 'hello';
// 解析爲json字符串格式做爲傳遞的數據格式
var msg = JSON.stringify({
"channel_id": req.query.channel_id,
"book_id": req.query.book_id
})
// 鏈接並保持
var ok = ch.assertQueue(q, {durable: true});
return ok.then(function(_qok) {
// 發送數據到消費者
ch.sendToQueue(q, Buffer.from(msg));
console.log(" [x] Sent '%s'", msg);
return ch.close();
});
}).finally(function() { conn.close(); });
}).catch(console.warn);
res.send({
code: 200,
msg: 'success'
});
});
命令行啓動app.js,發出請求並查看結果,{} 裏面的就是咱們發送出去的數據
消費者以及ZooKeeper
消費者,接收到發佈者傳遞過來的數據創建消息隊列,而後用Zookeeper建立臨時節點以保持隊列依次執行
var zookeeper = require('node-zookeeper-client');
// 根據標識動態引入js文件
function moduleCustomize(channel_id) {
return require(`../${channel_id}.js`)
}
var client = zookeeper.createClient('127.0.0.1:2181');
async function sleep(second) {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve('sleep')
}, second)
})
}
// 鏈接_zookeeper
client.once('connected', function () {
console.log('Connected to the server.');
// 創建鏈接 rabbitmp
amqp.connect('amqp://rabbitmq:12345678@127.0.0.1:5672/').then(function(conn) {
return conn.createChannel().then(function(ch) {
// 與名爲 hello(由發佈者建立) 的消息隊列創建鏈接
var ok = ch.assertQueue('hello', {durable: true});
// 預存爲1
ok = ok.then(function() { ch.prefetch(1); });
ok = ok.then(function() {
// doWork 回調函數 -> 執行接收到數據後的操做
ch.consume('hello', doWork, {noAck: false});
});
return ok;
// rabbitmq 處理
function doWork(msg) {
// 接收到數據
var body = msg.content.toString();
console.log("[x] Received '%s'", body);
let _body = JSON.parse(body)
let channel_book_id = _body['channel_book_id'];
let channel_id = _body['channel_id'];
// zookeeper 節點
let path = `/${channel_id + "_" + channel_book_id}`
// 鏈接_zookeeper 判斷是否存在
client.exists(path, function (error, stat) {
if (error) {
console.log(error.stack);
return;
}
if (stat) {
console.log('Node exists.');
// 存在則不執行,但須要將數據傳遞下去
// ack 能夠參考 https://www.jianshu.com/p/a5f7fce67803
ch.ack(msg);
} else {
console.log('Node does not exist.');
// 操做完成後則釋放當前建立的臨時節點
client.create(path, null, zookeeper.CreateMode.EPHEMERAL, function (error) {
if (error) {
console.log('Failed to create node: %s due to: %s.', path, error);
} else {
console.log('Node: %s is successfully created.', path);
// 根據傳入標識(如書旗就是37)動態引入js文件(抓書的操做)
moduleCustomize(channel_id).init(channel_book_id)
// 傳遞數據
ch.ack(msg);
// 釋放當前建立的臨時節點
client.remove(path, -1, function (error) {
if (error) {
console.log(error.stack);
return;
}
console.log('Node is deleted.');
});
}
});
}
console.log('任務執行完畢')
});
}
});
}).catch(console.warn);
});
client.connect();
命令行啓動rab_consumer.js,{} 裏面就是咱們接收到消息隊列裏面的數據了
4) 查看mongo結果數據