webMagic+RabbitMQ+ES爬取京東建材數據

     本次爬蟲所要爬取的數據爲京東建材數據,在爬取京東的過程當中,發現京東並無作反爬蟲動做,因此爬取的過程仍是比較順利的。java

爲何要用WebMagic:

  • WebMagic做爲一款輕量級的Java爬蟲框架,能夠極大的減小爬蟲的開發時間

爲何要使用MQ(本項目用的RabbitMq,其餘的MQ也能夠):

  • 解耦各個模塊,實現各個爬蟲之間相互獨立
  • 項目健壯性,無論是主動仍是被動緣由(斷電等情況)停下了項目,只須要從新讀取MQ中的數據就能繼續工做
  • 拆分了業務邏輯,使每一個模塊更加簡單。代碼易於編寫

爲何要用ES:

  • 方便後期搜索
  • 業務需求

 

項目大致架構圖:

 

 此處有多個spider,前面幾層的spider分別處理不一樣模塊的數據,將處理好的數據放入mq,供下一級的spider來調用。web

 本次爬取的最終頁面是商品的詳情頁,因此最後一級的spider將詳情數據爬取完以後存儲到ES之中。spring

 spider1處理京東建材主頁:apache

 

  spider2:處理京東分頁欄:json

 

 spider3:處理京東列表:多線程

 spider4:處理產品詳情:架構

 

根據上面的框架圖。咱們發現每個spider都須要跟MQ連接,第一級的Spider不須要對MQ進行消費,最後一級的Spider不須要負責Mq數據的生產。 其餘的spider既須要對MQ進行消費,也須要對MQ進行生產。框架

所以咱們給沒一個spider都綁上一個消費者和生產者,框架示意圖以下:異步

         pipline獲取想要的數據後,在reids儲存爬取過的路徑,若是有重複爬取過的路徑就不進行保存。elasticsearch

 

  WebMagic做爲一款優秀爬蟲框架,拓展性良好,咱們在原先的框架上稍做拓展。

首先附上Spider拓展後的代碼:

 

 

package com.chinaredstar.jc.core.spider;

import com.chinaredstar.jc.core.page.CrawlerPage;
import com.chinaredstar.jc.crawler.consumer.Consumer;
import com.chinaredstar.jc.infras.utils.JSONUtil;
import com.rabbitmq.client.QueueingConsumer;
import org.apache.commons.collections.map.HashedMap;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Spider;
import us.codecraft.webmagic.SpiderListener;
import us.codecraft.webmagic.processor.PageProcessor;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;


/**
 * @author zhuangj
 * @date 2017/12/1
 */
public class JcSpider extends Spider {

    /**
     * 隊列消費者,爲spider提供數據
     */
    private Consumer consumer;

    /**
     * 隊列消費者,爲spider提供數據
     */
    private String  consumerQueueName;

    /**
     * 用於確認Mq中的消息是否執行完畢
     */
    private Map<String,QueueingConsumer.Delivery> ackMap=new HashedMap();

    /**
     *  剩餘消息數量
     */
    private Integer messageNum=0;

    /**
     * 父節點爬蟲,父節點中止,子節點才能中止
     */
    private JcSpider parentSpider;


    public JcSpider(PageProcessor pageProcessor) {
        super(pageProcessor);
        exitWhenComplete=false;
    }

    public Consumer getConsumer() {
        return consumer;
    }

    public void setConsumer(Consumer consumer) {
        this.consumer = consumer;
    }

    public JcSpider getParentSpider() {
        return parentSpider;
    }

    public void setParentSpider(JcSpider parentSpider) {
        this.parentSpider = parentSpider;
    }


    public Integer getMessageNum() {
        return messageNum;
    }

    public void setMessageNum(Integer messageNum) {
        this.messageNum = messageNum;
    }

    public String getConsumerQueueName() {
        return consumerQueueName;
    }

    public void setConsumerQueueName(String consumerQueueName) {
        this.consumerQueueName = consumerQueueName;
    }

    @Override
    protected void initComponent() {
        super.initComponent();
        this.setSpiderListeners(new ArrayList<>());
        this.requestMessageListen();
//        this.startConsumer(consumerQueueName);
    }

    public void startConsumer(String queueName) {
        if(consumer==null){
            this.exitWhenComplete=true;
            return;
        }
        logger.info("queueName:{},startConsumer",queueName);
        JcSpider jcSpider = this;
        Runnable myRunnable = () -> {
            try {
                messageNum=consumer.getQueueMsgNum(queueName);

                Status parentStatus = Status.Stopped;
                if(parentSpider!=null){
                     parentStatus=parentSpider.getStatus();
                }

                while (!parentStatus.equals(Status.Stopped) || messageNum > 0) {
                    if(!jcSpider.getStatus().equals(Status.Running)){
                        Thread.sleep(500);
                    }
                    QueueingConsumer.Delivery delivery = consumer.getDeliveryMessage(queueName);
                    String message = new String(delivery.getBody());
                    CrawlerPage crawlerPage = JSONUtil.toObject(message, CrawlerPage.class);
                    Request request = crawlerPage.translateRequest();

                    //添加監聽
                    ackMap.put(request.getUrl(),delivery);
                    jcSpider.addRequest(request);
                    messageNum=consumer.getQueueMsgNum(queueName);
                    if(messageNum==0){
                        Thread.sleep(500);
                    }
                }
                System.out.println("spider:"+getUUID()+",consumer stop");
                if(parentSpider!=null){
                    System.out.println("parentStatus:"+parentSpider.getStatus().name());
                }
                System.out.println("messageNum:"+messageNum);
                //父級沒有消息,消息隊列沒有消息,爬蟲完成後就退出了
                Thread.sleep(2000);
                this.exitWhenComplete=true;
            } catch (Exception e) {
                e.printStackTrace();
            }
        };
        Thread thread = new Thread(myRunnable);
        thread.start();
    }


    /**
     * 添加請求RequestMessage
     */
    private void requestMessageListen(){
        this.getSpiderListeners().add(new SpiderListener() {
            @Override
            public void onSuccess(Request request) {
                ackMq(request);
            }
            @Override
            public void onError(Request request) {
                ackMq(request);
            }
        });
    }

    public  void ackMq(Request request){
        try {
            QueueingConsumer.Delivery delivery=ackMap.get(request.getUrl());
            if(delivery!=null){
                consumer.ackMessage(delivery);
                ackMap.remove(request.getUrl());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


}

 

 

 

咱們在原先的webMagic spider基礎上添加一個異步的消費者consumer(consumer封裝了rabbitMq的消費操做,比較簡單就不附代碼),它的做用

  • 負責讀取MQ中待消費的信息,並將須要爬取數據添加的spider的requestList。
  • 記錄全部讀取到消息。當spider消費完這段消息後,返回消息的ack給MQ,表示消息已經被成功消費。
  • 讀取queue中的消息剩餘量,做爲關閉spider的條件之一

       

spider基礎上添加一個父級的spider,它的做用:

  • 配合consumer讀取消息剩餘量關閉spider。若是父級的spider不存在或者已經關閉,當前spider已經消費完畢,queue中也沒有剩餘的消息。當前的spider就能夠關閉了

 

spider根據級別添加MqPipeline或者EsPipeline,將處理後的數據添加到MQ或者ES之中:

 

MqPipeline代碼以下:

package com.chinaredstar.jc.core.pipeline;

import com.chinaredstar.jc.core.page.CrawlerPage;
import com.chinaredstar.jc.core.util.RedisUtil;
import com.chinaredstar.jc.crawler.producer.Producer;
import com.chinaredstar.jc.infras.utils.JSONUtil;
import org.apache.commons.collections4.CollectionUtils;
import us.codecraft.webmagic.ResultItems;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.pipeline.Pipeline;

import java.io.IOException;
import java.util.List;

/**
 * 消息隊列pipeline
 * @author lenovo
 * @date 2017/12/1
 */
public class MqPipeline implements Pipeline {

    private Producer producer;

    public MqPipeline(Producer producer) {
        this.producer = producer;
    }

    public Producer getProducer() {
        return producer;
    }

    public void setProducer(Producer producer) {
        this.producer = producer;
    }

    @Override
    public void process(ResultItems resultItems, Task task) {

        try {
            List<CrawlerPage> crawlerPageList= resultItems.get("nextPageList");
            if(CollectionUtils.isEmpty(crawlerPageList)) {
                return;
            }
            String key=task.getUUID();

            for(CrawlerPage page:crawlerPageList){
                //校驗路徑是否爬取過
                String url=page.getCurrentUrl();
                if(RedisUtil.sContain(key,url)){
                    continue;
                }
                producer.basicPublish(JSONUtil.toJSonString(page));
                RedisUtil.sAdd(key,url);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

 

 

EsPipeline以下:

package com.chinaredstar.jc.core.pipeline;

import com.chinaredstar.jc.core.es.EsConnectionPool;
import com.chinaredstar.jc.infras.utils.json.JsonFormatter;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentType;
import us.codecraft.webmagic.ResultItems;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.pipeline.Pipeline;

import java.io.IOException;
import java.net.UnknownHostException;

/**
 * Created by zhuangj on 2017/12/1.
 */
public class EsPipeline implements Pipeline {

    private String taskName;

    private EsConnectionPool pool=new EsConnectionPool(3);

    public EsPipeline(String taskName) {
        this.taskName=taskName;
    }

    @Override
    public void process(ResultItems resultItems, Task task) {
        try {
            TransportClient client=pool.get();
            this.createIndex(client,taskName);
            this.insertData(client,taskName,"crawler",null, JsonFormatter.toJsonAsString(resultItems.getAll()));
//            System.out.println("save ES:" + JsonFormatter.toJsonAsString(resultItems.getAll()));
            pool.returnToPool(client);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 建立索引
     */
    private boolean createIndex(TransportClient client,String index) {
        try {
            if (isIndexExist(client,index)) {
                return true;
            }
            client.admin().indices().create(new CreateIndexRequest(index)).actionGet();
            return true;
        }catch (Exception e){
            return false;
        }
    }


    /**
     * 查詢索引是否存在
     * @param index
     * @return
     */
    private boolean isIndexExist(TransportClient client,String index) throws UnknownHostException, InterruptedException {
        IndicesExistsRequest request = new IndicesExistsRequest(index);
        IndicesExistsResponse response = client.admin().indices().exists(request).actionGet();
        return response.isExists();
    }


    /**
     * 導入數據
     *
     * @param index
     * @param type
     * @param id
     * @param data
     * @throws IOException
     */
    private BulkResponse insertData(TransportClient client,String index, String type, String id, String data) throws IOException, InterruptedException {
        //核心方法BulkRequestBuilder拼接多個Json
        BulkRequestBuilder bulkRequest = client.prepareBulk();
        IndexRequestBuilder requestBuilder = client.prepareIndex(index, type, id).setSource(data, XContentType.JSON);
        bulkRequest.add(requestBuilder);
        //插入文檔至ES, 完成!
        BulkResponse bulkRequestBuilder=bulkRequest.execute().actionGet();
        return bulkRequestBuilder;
    }

}

上方的EsPipine中建立了一個EsConnectionPool,使用池技術重用ES的conection,提升了數據儲存到ES中速度。

(後面發現Esclient使用異步方式來處理請求,自己自帶多線程功能。此處不須要使用池技術,只須要將client設置爲單例,就能夠了。)

 

 

 在業務代碼中,將spider建立出來,並遞歸地建立子級spider

package com.chinaredstar.jc.crawler.biz.service.impl;

import com.chinaredstar.jc.core.downloader.JcHttpClientDownloader;
import com.chinaredstar.jc.core.page.CrawlerPage;
import com.chinaredstar.jc.core.pipeline.EsPipeline;
import com.chinaredstar.jc.core.pipeline.MqPipeline;
import com.chinaredstar.jc.core.processor.jd.JdProcessorLevelEnum;
import com.chinaredstar.jc.core.spider.JcSpider;
import com.chinaredstar.jc.crawler.biz.service.IJdService;
import com.chinaredstar.jc.crawler.channel.MqChannel;
import com.chinaredstar.jc.crawler.common.MqConnectionFactory;
import com.chinaredstar.jc.crawler.consumer.Consumer;
import com.chinaredstar.jc.crawler.exchange.DefaultExchange;
import com.chinaredstar.jc.crawler.exchange.Exchange;
import com.chinaredstar.jc.crawler.producer.Producer;
import org.springframework.stereotype.Service;
import us.codecraft.webmagic.processor.PageProcessor;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;

/**
 *
 * 京東數據爬取服務類
 * @author zhuangj
 * @date 2017/11/29
 */
@Service
public class JdServiceImpl implements IJdService {

    @Override
    public List<CrawlerPage> startSpider(String url,String taskName,Integer maxLevel) throws IOException, TimeoutException, InterruptedException {
        for(int level=1;level<maxLevel;level++){
            createJcSpider(url, taskName, maxLevel,level,null);
        }
       return null;
    }

    @Override
    public List<CrawlerPage> startSpider(String url, String taskName, Integer level, Integer maxLevel) throws IOException, TimeoutException, InterruptedException {
        createJcSpider(url, taskName, maxLevel,level,null);
        return null;
    }



    private void createJcSpider(String url, String taskName, Integer maxLevel,Integer level,JcSpider parentSpider) throws IOException, TimeoutException, InterruptedException {
        PageProcessor pageProcessor= JdProcessorLevelEnum.getProcessorByLevel(level);
        JcSpider jcSpider=new JcSpider(pageProcessor);
        jcSpider.setUUID(taskName+level);
        jcSpider.setDownloader(new JcHttpClientDownloader());
        jcSpider.setParentSpider(parentSpider);


        String producerQueueName=taskName+level;
        String consumerQueueName=taskName+(level-1);

        MqChannel mqChannelProducer=createMqChannel(producerQueueName);
        Producer producer=createProduct(mqChannelProducer);

        MqChannel mqChannelConsumer=createMqChannel(consumerQueueName);
        Consumer consumer=createConsumer(mqChannelConsumer);
        jcSpider.setConsumer(consumer);
        jcSpider.setConsumerQueueName(consumerQueueName);



        //最後一級直接進入ES,因此不用進MQ,不須要MQ生產者
        if(level<maxLevel){
            jcSpider.addPipeline(new MqPipeline(producer));
        }else {
            jcSpider.addPipeline(new EsPipeline(taskName));
        }

        //第一級不須要從MQ中取數據,因此不須要消費者
        if(level==1){
            jcSpider.addUrl(url);
            jcSpider.setConsumer(null);
        }

        jcSpider.thread(10).start();
        jcSpider.startConsumer(consumerQueueName);




        //建立子集
        if(level<maxLevel){
            //稍等等待父級spider和consumer啓動
            Thread.sleep(2000);
            createJcSpider(url,taskName,maxLevel,level+1,jcSpider);
        }
    }






    /**
     * 建立消費者
     * @param mqChannel
     * @return
     * @throws IOException
     * @throws TimeoutException
     */
    private Producer createProduct(MqChannel mqChannel) throws IOException, TimeoutException {
        return mqChannel.createProducer();
    }

    /**
     * 建立生產者
     * @param mqChannel
     * @return
     * @throws IOException
     * @throws TimeoutException
     */
    private Consumer createConsumer(MqChannel mqChannel) throws IOException, TimeoutException {
        return mqChannel.createConsumer();
    }

    /**
     * 建立鏈接渠道
     * @return
     * @throws IOException
     * @throws TimeoutException
     */
    private MqChannel createMqChannel(String queueName) throws IOException, TimeoutException {
        MqChannel mqChannel=MqConnectionFactory.getConnectionChannel();
        Exchange exchange=new DefaultExchange(mqChannel.getChannel(),queueName);
        mqChannel.setExchange(exchange);
        return  mqChannel;
    }

}

 

最後啓動項目,跑一遍結果:

rabbitMq中的建立的相應的隊列而且跑起了數據,unacked問題還沒有解決。

 

數據存入ES:

由於分析相對簡單,只獲取了部分數據。

 

有任何的不合適的地方還請指正。

相關文章
相關標籤/搜索