數據結構模板文件。定義數據屬性。html
管道文件。接收數據(items),進行持久化操做。python
▨ 爬蟲文件爬取到數據後,將數據封裝到 items 對象 mysql
▨ items.py 用 yield 關鍵字將 items對象 提交給 pipelines.py redis
▨ 在管道文件中的 process_item 方法中接收 item對象 進行存儲sql
▨ settings.py 配置文件中開啓管道mongodb
* scrapy crawl maoyan -o maoyan.csv
* scrapy crawl maoyan -o maoyan.json
導出時須要配合 settings.py 配置導出編碼格式數據庫
FEED_EXPORT_ENCODING='utf-8'
ps:django
scrapy1.6版本以前, 導出csv出現空行, json
解決方法(修改源碼exporters.py)api
路徑 :python安裝目錄的Lib\site-packages\scrapy\exporters.py
搜索 csv ,添加 newline='' 參數
詳見下面實例
import scrapy from scrapy.pipelines.images import ImagesPipeline class MyImagesPipeline(ImagesPipeline): # 本質上就是拿到了 item 裏面的 "image_urls" 字段的 url 地址而後再次發起請求帶隊列中 # 重寫get_media_requests方法, 指定圖片持久化的字段 def get_media_requests(self, item, info): yield scrapy.Request(item['img_link']) # 這裏指定 item 中要被圖片持久的字段 # 此函數用於處理拿到請求後的數據時的異常以及其餘操做, 好比這裏實現的操做就是拿到 url 後保存在了 item 的 image_file_path 字段中 # 重寫此函數能夠拿到 圖片的 地址, 用於數據庫保存等 def item_completed(self, results, item, info): for ok, value in results: image_file_path = value["path"] item['image_file_path'] = image_file_path return item
使用內置的 圖片管道, 須要在 settings.py 中設置保存路徑以及大小限制
ITEM_PIPELINES = {'xxxx.pipelines.MyImagesPipeline': 1}
import os project_dir = os.path.abspath(os.path.dirname(__file__)) IMAGES_STORE = os.path.join(project_dir, "images")
IMAGES_MIN_HEIGHT = 110 # 最小高 IMAGES_MIN_WIDTH = 110 # 最小寬
import scrapy from secondblood.items import SecondbloodItem class QiubaidemoSpider(scrapy.Spider): name = 'qiubaiDemo' allowed_domains = ['www.qiushibaike.com'] start_urls = ['http://www.qiushibaike.com/'] def parse(self, response): odiv = response.xpath('//div[@id="content-left"]/div') for div in odiv: # xpath函數返回的爲列表,列表中存放的數據爲Selector類型的數據。咱們解析到的內容被封裝在了Selector對象中,須要調用extract()函數將解析的內容從Selecor中取出。 author = div.xpath('.//div[@class="author clearfix"]//h2/text()').extract_first() author = author.strip('\n') # 過濾空行 content = div.xpath('.//div[@class="content"]/span/text()').extract_first() content = content.strip('\n') # 過濾空行 # 將解析到的數據封裝至items對象中 item = SecondbloodItem() item['author'] = author item['content'] = content yield item # 提交item到管道文件(pipelines.py)
import scrapy class SecondbloodItem(scrapy.Item): # define the fields for your item here like: # name = scrapy.Field() author = scrapy.Field() # 存儲做者 content = scrapy.Field() # 存儲段子內容
pipelines.py
from scrapy.exceptions import DropItem class SecondbloodPipeline(object): def __init__(self,path): self.f = None self.path = path # 寫入文件的路徑參數 ,放在 setting 中了。 # 經過 from_crawler 來拿到 path @classmethod def from_crawler(cls, crawler): """ 初始化時候,用於建立pipeline對象 """ print('File.from_crawler') path = crawler.settings.get('HREF_FILE_PATH') return cls(path) def open_spider(self,spider): """ 爬蟲開始執行時,調用 用於 文件的打開 """ # if spider.name == "chouti": # spider參數 用於篩選個性化定製 print('File.open_spider') self.f = open(self.path,'a+') def process_item(self, item, spider): # f = open('xx.log','a+') # f.write(item['href']+'\n') # f.close() # 這樣寫太low了,每次都要打開關閉文件 # 所以選擇 將 文件操做繞開每次循環。 print('File',item['author']) print('File',item['content']) self.f.write(item['author'] + ':' + item['content'] + '\n') # return item # 交給下一個pipeline的process_item方法 raise DropItem()# 後續的 pipeline的process_item方法再也不執行 def close_spider(self,spider): """ 爬蟲關閉時,被調用 用於 文件的關閉 """ print('File.close_spider') self.f.close()
注意:pipeline 是全部爬蟲公用,若是想要給某個爬蟲定製須要使用spider參數本身進行處理
ps:
數據的處理固然能夠寫入 數據庫,或者 redis 以下實例
# -*- coding: utf-8 -*- # Define your item pipelines here # Don't forget to add your pipeline to the ITEM_PIPELINES setting # See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html #導入數據庫的類 import pymysql class QiubaiproPipelineByMysql(object): conn = None #mysql的鏈接對象聲明 cursor = None#mysql遊標對象聲明 def open_spider(self,spider): print('開始爬蟲') #連接數據庫 self.conn = pymysql.Connect(host='127.0.0.1',port=3306,user='root',password='123456',db='qiubai') #編寫向數據庫中存儲數據的相關代碼 def process_item(self, item, spider): #1.連接數據庫 #2.執行sql語句 sql = 'insert into qiubai values("%s","%s")'%(item['author'],item['content']) self.cursor = self.conn.cursor() #執行事務 try: self.cursor.execute(sql) self.conn.commit() except Exception as e: print(e) self.conn.rollback() return item def close_spider(self,spider): print('爬蟲結束') self.cursor.close() self.conn.close()
# -*- coding: utf-8 -*- # Define your item pipelines here # Don't forget to add your pipeline to the ITEM_PIPELINES setting # See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html import redis class QiubaiproPipelineByRedis(object): conn = None def open_spider(self,spider): print('開始爬蟲') #建立連接對象 self.conn = redis.Redis(host='127.0.0.1',port=6379) def process_item(self, item, spider): dict = { 'author':item['author'], 'content':item['content'] } #寫入redis中 self.conn.lpush('data', dict) return item
#開啓管道 ITEM_PIPELINES = { 'secondblood.pipelines.SecondbloodPipeline': 300, # 300表示爲優先級,值越小優先級越高 }
能夠寫多個Pipeline類彼此優先級惟一以標識順序
一、若是優先級高的 Pipeline 的 process_item 返回一個具體值或者None,會自動傳給下一個 pipline 的process_item,
二、若是隻想讓第一個 Pipeline 執行,那得讓第一個 pipline 的 process_item 拋出異常 raise DropItem()
三、能夠用 spider.name == '爬蟲名' 來控制哪些爬蟲用哪些 pipeline
''' #一、settings.py HOST="127.0.0.1" PORT=27017 USER="root" PWD="123" DB="amazon" TABLE="goods" ''' from scrapy.exceptions import DropItem from pymongo import MongoClient class MongoPipeline(object): '''二、把解析好的item對象作一個持久化,保存到數據庫中''' def __init__(self,db,collection,host,port,user,pwd): self.db = db self.collection = collection #文檔(表) self.host = host self.port = port self.user = user self.pwd = pwd @classmethod def from_crawler(cls,crawler): '''一、Scrapy會先經過getattr判斷咱們是否自定義了from_crawler,有則調它來完 成實例化''' db = crawler.settings.get("DB") collection = crawler.settings.get("COLLECTION") host = crawler.settings.get("HOST") port = crawler.settings.get("PORT") user = crawler.settings.get("USER") pwd = crawler.settings.get("PWD") return cls(db,collection,host,port,user,pwd) #cls是當前的類,類加括號執行__init__方法 def open_spider(self,spider): '''三、爬蟲剛啓動時執行一次''' print('==============>爬蟲程序剛剛啓動') self.client = MongoClient('mongodb://%s:%s@%s:%s'%( self.user, self.pwd, self.host, self.port )) def close_spider(self,spider): '''五、關閉爬蟲程序''' print('==============>爬蟲程序運行完畢') self.client.close() def process_item(self, item, spider): '''四、操做並執行持久化''' # return表示會被後續的pipeline繼續處理 d = dict(item) if all(d.values()): self.client[self.db][self.collection].save(d) #保存到數據庫 return item # 表示將item丟棄,不會被後續pipeline處理 # raise DropItem() class FilePipeline(object): def __init__(self, file_path): self.file_path=file_path @classmethod def from_crawler(cls, crawler): """ Scrapy會先經過getattr判斷咱們是否自定義了from_crawler,有則調它來完 成實例化 """ file_path = crawler.settings.get('FILE_PATH') return cls(file_path) def open_spider(self, spider): """ 爬蟲剛啓動時執行一次 """ print('==============>爬蟲程序剛剛啓動') self.fileobj=open(self.file_path,'w',encoding='utf-8') def close_spider(self, spider): """ 爬蟲關閉時執行一次 """ print('==============>爬蟲程序運行完畢') self.fileobj.close() def process_item(self, item, spider): # 操做並進行持久化 # return表示會被後續的pipeline繼續處理 d = dict(item) if all(d.values()): self.fileobj.write(r"%s\n" %str(d)) return item # 表示將item丟棄,不會被後續pipeline處理 # raise DropItem()
此示例是使用了 pymysql 的方式
可使用 ORM 的方式, 詳情可使用 scrapy-djangoitem 模塊完成
class MysqlPipeline(object): # 採用同步的機制寫入mysql def __init__(self): self.conn = MySQLdb.connect('192.168.0.106', 'root', 'root', 'article_spider', charset="utf8", use_unicode=True) self.cursor = self.conn.cursor() def process_item(self, item, spider): insert_sql = """ insert into jobbole_article(title, url, create_date, fav_nums) VALUES (%s, %s, %s, %s) """ self.cursor.execute(insert_sql, (item["title"], item["url"], item["create_date"], item["fav_nums"])) self.conn.commit()
import pymysql from twisted.enterprise import adbapi class MysqlTwistedPipline(object): def __init__(self, dbpool): self.dbpool = dbpool @classmethod def from_settings(cls, settings): dbparms = dict( host=settings["MYSQL_HOST"], db=settings["MYSQL_DBNAME"], user=settings["MYSQL_USER"], passwd=settings["MYSQL_PASSWORD"], charset='utf8', cursorclass=MySQLdb.cursors.DictCursor, use_unicode=True, ) dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms) return cls(dbpool) def process_item(self, item, spider): # 使用twisted將mysql插入變成異步執行 query = self.dbpool.runInteraction(self.do_insert, item) query.addErrback(self.handle_error, item, spider) # 處理異常 def handle_error(self, failure, item, spider): # 處理異步插入的異常 print(failure) def do_insert(self, cursor, item): # 執行具體的插入 # 根據不一樣的item 構建不一樣的sql語句並插入到mysql中 insert_sql, params = item.get_insert_sql() cursor.execute(insert_sql, params)