在下圖中能夠看到items.py與pipeline.py,其中items是用來定義抓取內容的實體;pipeline則是用來處理抓取的item的管道
Item管道的主要責任是負責處理有蜘蛛從網頁中抽取的Item,他的主要任務是清晰、驗證和存儲數據。當頁面被蜘蛛解析後,將被髮送到Item管道,並通過幾個特定的次序處理數據。每一個Item管道的組件都是有一個簡單的方法組成的Python類。獲取了Item並執行方法,同時還須要肯定是否須要在Item管道中繼續執行下一步或是直接丟棄掉不處理。簡而言之,就是經過spider爬取的數據都會經過這個pipeline處理,能夠在pipeline中不進行操做或者執行相關對數據的操做。html
1.清理HTML數據
2.驗證解析到的數據(檢查Item是否包含必要的字段)
3.檢查是不是重複數據(若是重複就刪除)
4.將解析到的數據存儲到數據庫中python
process_item(item, spider)
每個item管道組件都會調用該方法,而且必須返回一個item對象實例或raise DropItem異常。被丟掉的item將不會在管道組件進行執行。此方法有兩個參數,一個是item,即要處理的Item對象,另外一個參數是spider,即爬蟲。
此外,咱們也能夠在類中實現如下方法
open_spider(spider)
當spider執行的時候將調用該方法
close_spider(spider)
當spider關閉的時候將調用該方法mysql
1.生成json數據sql
class JsonWithEncodingPipeline(object): def __init__(self): self.file=codecs.open('article.json', 'w', encoding="utf-8") def process_item(self, item, spider): lines=json.dumps(dict(item), ensure_ascii=False) + '\n' self.file.write(lines) return item def spider_closed(self, spider): self.file.close()
2.操做mysql關係數據庫數據庫
class MysqlPipeline(object): def __init__(self): self.conn=MySQLdb.connect('localhost', 'root', '*****', 'article_spider', charset="utf8", use_unicode=True) self.cursor=self.conn.cursor() def process_item(self, item, spider): insert_sql=""" insert into article_items(title, url, url_object_id , create_date) VALUES(%s, %s, %s, %s) """ self.cursor.execute(insert_sql, (item["title"], item["url"], item['url_object_id'], item["create_date"])) self.conn.commit()
3.異步操做mysql關係數據庫json
# 異步處理關係數據庫 class MysqlTwistedPipline(object): def __init__(self, dbpool): self.dbpool=dbpool @classmethod def from_settings(cls, settings): dbparms=dict( host=settings["MYSQL_HOST"], #這裏要在settings中事先定義好 db=settings["MYSQL_DBNAME"], user=settings["MYSQL_USER"], passwd=settings["MYSQL_PASSWORD"], charset="utf8", cursorclass=MySQLdb.cursors.DictCursor, use_unicode=True, ) dbpool=adbapi.ConnectPool("MySQLdb", **dbparms) return cls(dbpool) def process_item(self, item, spider): # 使用twisted將mysql插入變成異步執行 query = self.dbpool.runInteraction(self.do_insert, item) query.addErrback(self.handle_error) def handle_error(self, failure, item, spider): #處理異步插入的異常 print(failure) def do_insert(self, cursor, item): #執行具體的插入 insert_sql = """ insert into article_items(title, url, url_object_id , create_date) VALUES(%s, %s, %s, %s) """ self.cursor.execute(insert_sql, (item["title"], item["url"], item['url_object_id'], item["create_date"]))
4.數據去重api
from scrapy.exceptions import DropItem class DuplicatesPipeline(object): def __init__(self): self.ids_seen = set() def process_item(self, item, spider): if item['id'] in self.ids_seen: raise DropItem("Duplicate item found: %s" % item) else: self.ids_seen.add(item['id']) return item
# Configure item pipelines # See https://doc.scrapy.org/en/latest/topics/item-pipeline.html ITEM_PIPELINES = { # 'ArticleSpider.pipelines.ArticlespiderPipeline': 300, # 'scrapy.pipelines.images.ImagesPipeline': 1, 'ArticleSpider.pipelines.MysqlPipeline': 1, # 'ArticleSpider.pipelines.JsonExporterPipeline': 2, # 'ArticleSpider.pipelines.ArticleImagePipeline': 1 }
每一個pipeline後面有一個數值,這個數組的範圍是0-1000,這個數值是這些在pipeline中定義的類的優先級,越小越優先。
在異步處理數據庫的時候會傳遞一個參數爲後面的操做進行初始化,process_item()函數其實是將處理的操做傳回給這個_init__。數組