puppeteer_node爬蟲分佈式進階

前面的文章將puppeteer作爬蟲的基礎一直到部署都梳理了一遍,如今來看一下分佈式的處理

1) 爲何須要分佈式

1. 須要抓取的不一樣數據有不少,會同時開啓無頭瀏覽器去抓取,而後獲取到數據後又無厘頭的一股腦擠進數據庫
   2. 沒法保證同一時刻須要的數據只有一個操做在進行

2) 分佈式選擇

由於使用的是node,因此儘量的尋找node支持的分佈式框架
ZooKeeper 和 RabbitMQ 的思想百度上有好多說明,讀者能夠自行搜索做更詳細的瞭解
node版的 zookeeper
node版的 RabbitMQ

3) 銜接以前的 puppeteer進階版_爬取書旗小說 文章內容,文章只是放了一些主要的代碼,末尾會附上項目地址,你們能夠去擼一擼

發佈者,給書旗起一個標識爲 37 (channel_id),而後是要抓取書的書籍id(channel_book_id)
// 咱們以接口的形式接收爬取的參數    
// 簡易版請求(除了接收參數不作任何處理) -> 發佈者
app.get('/v1.0/grasp_book', (req, res, next) => {
  // 抓取時須要的參數
  if (!req.query.channel_id && !req.query.channel_book_id) {
    res.send({
      code: 403,
      msg: 'params error'
    })
    return null;
  }
  // 發佈者
  // 鏈接rabbitmq
  amqp.connect('amqp://rabbitmq:12345678@127.0.0.1:5672/').then(function(conn) {
    return conn.createChannel().then(function(ch) {
      // 建立 hello 的消息隊列
      var q = 'hello';
      // 解析爲json字符串格式做爲傳遞的數據格式
      var msg = JSON.stringify({
        "channel_id": req.query.channel_id,
        "book_id": req.query.book_id
      })
      // 鏈接並保持
      var ok = ch.assertQueue(q, {durable: true});
      return ok.then(function(_qok) {
        // 發送數據到消費者
        ch.sendToQueue(q, Buffer.from(msg));
        console.log(" [x] Sent '%s'", msg);
        return ch.close();
      });
    }).finally(function() { conn.close(); });
  }).catch(console.warn);
  res.send({
    code: 200,
    msg: 'success'
  });
});
命令行啓動app.js,發出請求並查看結果,{} 裏面的就是咱們發送出去的數據

圖片描述
圖片描述

消費者以及ZooKeeper

消費者,接收到發佈者傳遞過來的數據創建消息隊列,而後用Zookeeper建立臨時節點以保持隊列依次執行
var zookeeper = require('node-zookeeper-client');

// 根據標識動態引入js文件
function moduleCustomize(channel_id) {
    return require(`../${channel_id}.js`)
}

 
var client = zookeeper.createClient('127.0.0.1:2181');

async function sleep(second) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve('sleep')
    }, second)
  })
}

// 鏈接_zookeeper
client.once('connected', function () {
  console.log('Connected to the server.');
  // 創建鏈接 rabbitmp
  amqp.connect('amqp://rabbitmq:12345678@127.0.0.1:5672/').then(function(conn) {
    return conn.createChannel().then(function(ch) {
      // 與名爲 hello(由發佈者建立) 的消息隊列創建鏈接
      var ok = ch.assertQueue('hello', {durable: true});
      // 預存爲1
      ok = ok.then(function() { ch.prefetch(1); });
      ok = ok.then(function() {
        // doWork 回調函數 -> 執行接收到數據後的操做
        ch.consume('hello', doWork, {noAck: false});
      });
      return ok;
      // rabbitmq 處理
      function doWork(msg) {
          // 接收到數據
          var body = msg.content.toString();
          console.log("[x] Received '%s'", body);
          let _body = JSON.parse(body)
          let channel_book_id = _body['channel_book_id'];
          let channel_id = _body['channel_id'];
          // zookeeper 節點
          let path = `/${channel_id + "_" + channel_book_id}`
          // 鏈接_zookeeper 判斷是否存在
          client.exists(path, function (error, stat) {
              if (error) {
                  console.log(error.stack);
                  return;
              }
              if (stat) {
                console.log('Node exists.');
                // 存在則不執行,但須要將數據傳遞下去
                // ack 能夠參考 https://www.jianshu.com/p/a5f7fce67803
                ch.ack(msg);
              } else {
                  console.log('Node does not exist.');
                  // 操做完成後則釋放當前建立的臨時節點
                  client.create(path, null, zookeeper.CreateMode.EPHEMERAL, function (error) {
                    if (error) {
                        console.log('Failed to create node: %s due to: %s.', path, error);
                    } else {
                        console.log('Node: %s is successfully created.', path);
                          
                        // 根據傳入標識(如書旗就是37)動態引入js文件(抓書的操做)
                        moduleCustomize(channel_id).init(channel_book_id)

                        // 傳遞數據
                        ch.ack(msg);
                        // 釋放當前建立的臨時節點
                        client.remove(path, -1, function (error) {
                            if (error) {
                                console.log(error.stack);
                                return;
                            }
                            console.log('Node is deleted.');
                        });

                    }
                  });
                }
              console.log('任務執行完畢')
            });
      }
    });
  }).catch(console.warn);


});
 
client.connect();
命令行啓動rab_consumer.js,{} 裏面就是咱們接收到消息隊列裏面的數據了

圖片描述

4) 查看mongo結果數據

圖片描述

5) 項目地址(update分支)

相關文章
相關標籤/搜索