Scrapy框架中的Pipeline組件

簡介

在下圖中能夠看到items.py與pipeline.py,其中items是用來定義抓取內容的實體;pipeline則是用來處理抓取的item的管道
2018-05-20_21-21-40.png
Item管道的主要責任是負責處理有蜘蛛從網頁中抽取的Item,他的主要任務是清晰、驗證和存儲數據。當頁面被蜘蛛解析後,將被髮送到Item管道,並通過幾個特定的次序處理數據。每一個Item管道的組件都是有一個簡單的方法組成的Python類。獲取了Item並執行方法,同時還須要肯定是否須要在Item管道中繼續執行下一步或是直接丟棄掉不處理。簡而言之,就是經過spider爬取的數據都會經過這個pipeline處理,能夠在pipeline中不進行操做或者執行相關對數據的操做。html

管道的功能

1.清理HTML數據
2.驗證解析到的數據(檢查Item是否包含必要的字段)
3.檢查是不是重複數據(若是重複就刪除)
4.將解析到的數據存儲到數據庫中python

Pipeline中的操做

process_item(item, spider)
每個item管道組件都會調用該方法,而且必須返回一個item對象實例或raise DropItem異常。被丟掉的item將不會在管道組件進行執行。此方法有兩個參數,一個是item,即要處理的Item對象,另外一個參數是spider,即爬蟲。
此外,咱們也能夠在類中實現如下方法
open_spider(spider)當spider執行的時候將調用該方法
close_spider(spider)當spider關閉的時候將調用該方法mysql

定製本身的Pipeline組件:

1.生成json數據sql

class JsonWithEncodingPipeline(object):
    def __init__(self):
        self.file=codecs.open('article.json', 'w', encoding="utf-8")
    def process_item(self, item, spider):
        lines=json.dumps(dict(item), ensure_ascii=False) + '\n'
        self.file.write(lines)
        return item
    def spider_closed(self, spider):
        self.file.close()

2.操做mysql關係數據庫數據庫

class MysqlPipeline(object):
    def __init__(self):
        self.conn=MySQLdb.connect('localhost', 'root', '*****', 'article_spider', charset="utf8", use_unicode=True)
        self.cursor=self.conn.cursor()

    def process_item(self, item, spider):
        insert_sql="""
            insert into article_items(title, url, url_object_id , create_date)
            VALUES(%s, %s, %s, %s)
        """
        self.cursor.execute(insert_sql, (item["title"], item["url"], item['url_object_id'], item["create_date"]))
        self.conn.commit()

3.異步操做mysql關係數據庫json

# 異步處理關係數據庫
class MysqlTwistedPipline(object):
    def __init__(self, dbpool):
        self.dbpool=dbpool

    @classmethod
    def from_settings(cls, settings):
        dbparms=dict(
            host=settings["MYSQL_HOST"],    #這裏要在settings中事先定義好
            db=settings["MYSQL_DBNAME"],
            user=settings["MYSQL_USER"],
            passwd=settings["MYSQL_PASSWORD"],
            charset="utf8",
            cursorclass=MySQLdb.cursors.DictCursor,
            use_unicode=True,
        )
        dbpool=adbapi.ConnectPool("MySQLdb", **dbparms)

        return cls(dbpool)

    def process_item(self, item, spider):
    # 使用twisted將mysql插入變成異步執行
        query = self.dbpool.runInteraction(self.do_insert, item)
        query.addErrback(self.handle_error)


    def handle_error(self, failure, item, spider):
       #處理異步插入的異常
        print(failure)

    def do_insert(self, cursor, item):
        #執行具體的插入
        insert_sql = """
                    insert into article_items(title, url, url_object_id , create_date)
                    VALUES(%s, %s, %s, %s)
                """
        self.cursor.execute(insert_sql, (item["title"], item["url"], item['url_object_id'], item["create_date"]))

4.數據去重api

from scrapy.exceptions import DropItem

class DuplicatesPipeline(object):

    def __init__(self):
        self.ids_seen = set()

    def process_item(self, item, spider):
        if item['id'] in self.ids_seen:
            raise DropItem("Duplicate item found: %s" % item)
        else:
            self.ids_seen.add(item['id'])
            return item

使用組件

# Configure item pipelines
# See https://doc.scrapy.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {
   # 'ArticleSpider.pipelines.ArticlespiderPipeline': 300,
  # 'scrapy.pipelines.images.ImagesPipeline': 1,
   'ArticleSpider.pipelines.MysqlPipeline': 1,
   # 'ArticleSpider.pipelines.JsonExporterPipeline': 2,
   # 'ArticleSpider.pipelines.ArticleImagePipeline': 1
}

每一個pipeline後面有一個數值,這個數組的範圍是0-1000,這個數值是這些在pipeline中定義的類的優先級,越小越優先。
在異步處理數據庫的時候會傳遞一個參數爲後面的操做進行初始化,process_item()函數其實是將處理的操做傳回給這個_init__。數組

相關文章
相關標籤/搜索