從零系列--node爬蟲利用進程池寫數據

一、主進程html

const http = require('http');
const fs = require('fs');
const cheerio = require('cheerio');
const request = require('request');
const makePool = require('./pooler')
const runJob = makePool('./worker')
var i = 0;
var url = "http://xxx.com/articles/"; 
//初始url 
let g = '';
function fetchPage(x) {     //封裝了一層函數
  console.log(x)
  if(!x || x==''){
    g.next()
    return
  }
    startRequest(x); 
}


function startRequest(x) {
     //採用http模塊向服務器發起一次get請求      
    return http.get(x, function (res) {     
        var html = '';        //用來存儲請求網頁的整個html內容
        var titles = [];        
        res.setEncoding('utf-8'); //防止中文亂碼
     //監聽data事件,每次取一塊數據
        res.on('data', function (chunk) {   
            html += chunk;
        });
     //監聽end事件,若是整個網頁內容的html都獲取完畢,就執行回調函數
        res.on('end', function () {
          var $ = cheerio.load(html); //採用cheerio模塊解析html

          var time = new Date();
          var p =  $('.content p')
          p.each((index,item)=>{
                if($(item).find('strong').length) {
                  var fex_item = {
                    //獲取文章的標題
                      title: $(item).find('strong').text().trim(),
                  //獲取文章發佈的時間
                      time: time,   
                  //獲取當前文章的url
                      link: $($(item).children('a').get(0)).attr('href'),
                      des:$(item).children().remove()&&$(item).text(),
                  //i是用來判斷獲取了多少篇文章
                      i: index+1     
      
                  };
                  runJob(fex_item,(err,data)=>{
                    if(err) console.error('get link error')
                    console.log('get link ok')
                  })
                }
                
          })
          g.next()
        })         

    }).on('error', function (err) {
        console.log(err);
        g.next()
    });

}
function* gen(urls){
  let len = urls.length;
  for(var i=0;i<len;i++){
    yield fetchPage(urls[i])
  }
}

function getUrl(x){
    //採用http模塊向服務器發起一次get請求      
    http.get(x, function (res) {     
      var html = '';        //用來存儲請求網頁的整個html內容
      var titles = [];        
      res.setEncoding('utf-8'); //防止中文亂碼
   //監聽data事件,每次取一塊數據
      res.on('data', function (chunk) {   
          html += chunk;
      });
   //監聽end事件,若是整個網頁內容的html都獲取完畢,就執行回調函數
      res.on('end', function () {
        var $ = cheerio.load(html); //採用cheerio模塊解析html

        var time = new Date();
        var lists =  $('.articles .post-list li')
        var urls = [];
        lists.each(function(index,item){
          if($(item).find('a').length) {
              var url = 'http://xxxx.com'+$($(item).children('a').get(0)).attr('href');
              if(url)
              urls.push(url);      //主程序開始運行
          }
       })
        g = gen(urls)
        g.next()
      })         

  }).on('error', function (err) {
      console.log(err);
  });
}

getUrl(url)

二、建立進程池服務器

const cp = require('child_process')
const cpus = require('os').cpus().length;

module.exports =  function pooler(workModule){
  let awaiting = [],readyPool = [],poolSize = 0;
  return function doWork(job,cb){
    if(!readyPool.length&&poolSize>cpus)
      return awaiting.push([doWork,job,cb])

    let child = readyPool.length ? readyPool.shift():(poolSize++,cp.fork(workModule))
    let cbTriggered = false;
    child.removeAllListeners()
    .once('error',function(err){
      if(!cbTriggered){
        cb(err)
        cbTriggered = true
      }
      child.kill()
    })
    .once('eixt',function(){
      if(!cbTriggered)
      cb(new Error('childe exited with code:'+code))
      poolSize--;
      let childIdx = readyPool.indexOf(child)
      if(childIdx > -1)readyPool.splice(childIdx,1)
    })
    .once('message',function(msg){
      cb(null,msg)
      cbTriggered = true
      readyPool.push(child)
      if(awaiting.length)setImmediate.apply(null,awaiting.shift())
    })
    .send(job)
  }
}

三、工做進程接受消息並處理內容app

const fs = require('fs')
process.on('message',function(job){
  let _job = job
  let x = 'TITLE:'+_job.title+'\n' + 'LINK:'+_job.link + '\n DES:'+_job.des+'\n SAVE-TIME:'+_job.time
  
  fs.writeFile('../xx/data/' + _job.title + '.txt', x, 'utf-8', function (err) {
      if (err) {
          console.log(err);
      }
  });
  process.send('finish')
})
相關文章
相關標籤/搜索