Scrapy 框架,持久化文件相關

持久化相關

相關文件

items.py

數據結構模板文件。定義數據屬性。html

pipelines.py

管道文件。接收數據(items),進行持久化操做。python

持久化流程

▨ 爬蟲文件爬取到數據後,將數據封裝到 items 對象 mysql

▨  items.py  用  yield 關鍵字將 items對象 提交給 pipelines.py  redis

▨ 在管道文件中的  process_item  方法中接收 item對象 進行存儲sql

▨  settings.py  配置文件中開啓管道mongodb

持久化方式

文件保存 ( 普通文件 / csv / json  )

* scrapy crawl maoyan -o maoyan.csv
* scrapy crawl maoyan -o maoyan.json

導出時須要配合 settings.py 配置導出編碼格式數據庫

FEED_EXPORT_ENCODING='utf-8'

ps:django

  scrapy1.6版本以前, 導出csv出現空行, json

    解決方法(修改源碼exporters.py)api

  路徑 :python安裝目錄的Lib\site-packages\scrapy\exporters.py

  搜索 csv ,添加 newline='' 參數

數據庫保存 ( redis / mongo / mysql )

詳見下面實例

圖片保存

官方文檔  這裏

pipline 設置

import scrapy
from scrapy.pipelines.images import ImagesPipeline


class MyImagesPipeline(ImagesPipeline):

    # 本質上就是拿到了 item 裏面的 "image_urls" 字段的 url 地址而後再次發起請求帶隊列中
    # 重寫get_media_requests方法, 指定圖片持久化的字段
    def get_media_requests(self, item, info):
        yield scrapy.Request(item['img_link'])  # 這裏指定 item 中要被圖片持久的字段

    # 此函數用於處理拿到請求後的數據時的異常以及其餘操做, 好比這裏實現的操做就是拿到 url 後保存在了 item 的 image_file_path 字段中
    # 重寫此函數能夠拿到 圖片的 地址, 用於數據庫保存等
    def item_completed(self, results, item, info):
        for ok, value in results:
            image_file_path = value["path"]
        item['image_file_path'] = image_file_path
        return item

 

配置設置

使用內置的 圖片管道, 須要在 settings.py  中設置保存路徑以及大小限制

ITEM_PIPELINES = {'xxxx.pipelines.MyImagesPipeline': 1}
import os

project_dir = os.path.abspath(os.path.dirname(__file__))
IMAGES_STORE = os.path.join(project_dir, "images")

IMAGES_MIN_HEIGHT
= 110 # 最小高 IMAGES_MIN_WIDTH = 110 # 最小寬

源碼位置

綜合實例 - 文件存入

爬蟲文件

import scrapy
from secondblood.items import SecondbloodItem
class QiubaidemoSpider(scrapy.Spider):
    name = 'qiubaiDemo'
    allowed_domains = ['www.qiushibaike.com']
    start_urls = ['http://www.qiushibaike.com/']
    def parse(self, response):
        odiv = response.xpath('//div[@id="content-left"]/div')
        for div in odiv:
            # xpath函數返回的爲列表,列表中存放的數據爲Selector類型的數據。咱們解析到的內容被封裝在了Selector對象中,須要調用extract()函數將解析的內容從Selecor中取出。
            author = div.xpath('.//div[@class="author clearfix"]//h2/text()').extract_first()
            author = author.strip('\n') # 過濾空行
            content = div.xpath('.//div[@class="content"]/span/text()').extract_first()
            content = content.strip('\n') # 過濾空行
            # 將解析到的數據封裝至items對象中
            item = SecondbloodItem()
            item['author'] = author
            item['content'] = content
            yield item # 提交item到管道文件(pipelines.py)

items文件

items.py
import scrapy
class SecondbloodItem(scrapy.Item):
    # define the fields for your item here like:
    # name = scrapy.Field()
    author = scrapy.Field() # 存儲做者
    content = scrapy.Field() # 存儲段子內容

管道文件

pipelines.py

from scrapy.exceptions import DropItem

class SecondbloodPipeline(object):

    def __init__(self,path):
        self.f = None
        self.path = path    
        # 寫入文件的路徑參數 ,放在 setting 中了。
        # 經過 from_crawler 來拿到 path 

    @classmethod
    def from_crawler(cls, crawler): 
        """
        初始化時候,用於建立pipeline對象
        """
        print('File.from_crawler')
        path = crawler.settings.get('HREF_FILE_PATH') 
        return cls(path)

    def open_spider(self,spider):
        """
        爬蟲開始執行時,調用 
        用於 文件的打開
        """
        # if spider.name == "chouti":  # spider參數 用於篩選個性化定製 
        print('File.open_spider')
        self.f = open(self.path,'a+')

    def process_item(self, item, spider):
        # f = open('xx.log','a+')
        # f.write(item['href']+'\n')
        # f.close() 
        # 這樣寫太low了,每次都要打開關閉文件
        # 所以選擇 將 文件操做繞開每次循環。
        print('File',item['author'])
        print('File',item['content'])
        self.f.write(item['author'] + ':' + item['content'] + '\n')
        
        # return item      # 交給下一個pipeline的process_item方法
        raise DropItem()# 後續的 pipeline的process_item方法再也不執行

    def close_spider(self,spider):
        """
        爬蟲關閉時,被調用
        用於 文件的關閉 
        """
        print('File.close_spider')
        self.f.close()

 注意:pipeline 是全部爬蟲公用,若是想要給某個爬蟲定製須要使用spider參數本身進行處理

ps:

數據的處理固然能夠寫入 數據庫,或者 redis 以下實例

# -*- coding: utf-8 -*-
# Define your item pipelines here
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html

#導入數據庫的類
import pymysql
 
class QiubaiproPipelineByMysql(object):
    conn = None  #mysql的鏈接對象聲明
    cursor = None#mysql遊標對象聲明
    def open_spider(self,spider):
        print('開始爬蟲')
        #連接數據庫
        self.conn = pymysql.Connect(host='127.0.0.1',port=3306,user='root',password='123456',db='qiubai')
    
    #編寫向數據庫中存儲數據的相關代碼
    def process_item(self, item, spider):
        #1.連接數據庫
        #2.執行sql語句
        sql = 'insert into qiubai values("%s","%s")'%(item['author'],item['content'])
        self.cursor = self.conn.cursor()
        #執行事務
        try:
            self.cursor.execute(sql)
            self.conn.commit()
        except Exception as e:
            print(e)
            self.conn.rollback()
        return item
 
    def close_spider(self,spider):
        print('爬蟲結束')
        self.cursor.close()
        self.conn.close()
MySQL 的數據處理
# -*- coding: utf-8 -*-
# Define your item pipelines here
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html
 
import redis
class QiubaiproPipelineByRedis(object):
    conn = None
    def open_spider(self,spider):
        print('開始爬蟲')
        #建立連接對象
        self.conn = redis.Redis(host='127.0.0.1',port=6379)
 
    def process_item(self, item, spider):
        dict = {
            'author':item['author'],
            'content':item['content']
        }
        #寫入redis中
        self.conn.lpush('data', dict)
        return item
redis 的數據處理

配置文件

settings.py
#開啓管道
 
ITEM_PIPELINES = {
    'secondblood.pipelines.SecondbloodPipeline': 300, # 300表示爲優先級,值越小優先級越高
}

優先級順序 

能夠寫多個Pipeline類彼此優先級惟一以標識順序

  一、若是優先級高的 Pipelineprocess_item 返回一個具體值或者None,會自動傳給下一個 piplineprocess_item,

  二、若是隻想讓第一個 Pipeline 執行,那得讓第一個 piplineprocess_item 拋出異常 raise DropItem()

  三、能夠用 spider.name == '爬蟲名'  來控制哪些爬蟲用哪些 pipeline

綜合實例 - pymongo

'''
#一、settings.py
HOST="127.0.0.1"
PORT=27017
USER="root"
PWD="123"
DB="amazon"
TABLE="goods"

'''
from scrapy.exceptions import DropItem
from pymongo import MongoClient

class MongoPipeline(object):
    '''二、把解析好的item對象作一個持久化,保存到數據庫中'''
    def __init__(self,db,collection,host,port,user,pwd):
        self.db = db
        self.collection = collection  #文檔(表)
        self.host = host
        self.port = port
        self.user = user
        self.pwd = pwd

    @classmethod
    def from_crawler(cls,crawler):
        '''一、Scrapy會先經過getattr判斷咱們是否自定義了from_crawler,有則調它來完
        成實例化'''
        db = crawler.settings.get("DB")
        collection = crawler.settings.get("COLLECTION")
        host = crawler.settings.get("HOST")
        port = crawler.settings.get("PORT")
        user = crawler.settings.get("USER")
        pwd = crawler.settings.get("PWD")
        return cls(db,collection,host,port,user,pwd)   #cls是當前的類,類加括號執行__init__方法

    def open_spider(self,spider):
        '''三、爬蟲剛啓動時執行一次'''
        print('==============>爬蟲程序剛剛啓動')
        self.client = MongoClient('mongodb://%s:%s@%s:%s'%(
            self.user,
            self.pwd,
            self.host,
            self.port
        ))

    def close_spider(self,spider):
        '''五、關閉爬蟲程序'''
        print('==============>爬蟲程序運行完畢')
        self.client.close()

    def process_item(self, item, spider):
        '''四、操做並執行持久化'''
        # return表示會被後續的pipeline繼續處理
        d = dict(item)
        if all(d.values()):
            self.client[self.db][self.collection].save(d)   #保存到數據庫
        return item
        # 表示將item丟棄,不會被後續pipeline處理
        # raise DropItem()



class FilePipeline(object):
    def __init__(self, file_path):
        self.file_path=file_path

    @classmethod
    def from_crawler(cls, crawler):
        """
        Scrapy會先經過getattr判斷咱們是否自定義了from_crawler,有則調它來完
        成實例化
        """
        file_path = crawler.settings.get('FILE_PATH')


        return cls(file_path)

    def open_spider(self, spider):
        """
        爬蟲剛啓動時執行一次
        """
        print('==============>爬蟲程序剛剛啓動')
        self.fileobj=open(self.file_path,'w',encoding='utf-8')

    def close_spider(self, spider):
        """
        爬蟲關閉時執行一次
        """
        print('==============>爬蟲程序運行完畢')
        self.fileobj.close()

    def process_item(self, item, spider):
        # 操做並進行持久化

        # return表示會被後續的pipeline繼續處理
        d = dict(item)
        if all(d.values()):
            self.fileobj.write(r"%s\n" %str(d))

        return item

        # 表示將item丟棄,不會被後續pipeline處理
        # raise DropItem()

綜合實例 - mysql 存入

此示例是使用了 pymysql 的方式

可使用 ORM 的方式, 詳情可使用 scrapy-djangoitem 模塊完成

同步存入

class MysqlPipeline(object):
    # 採用同步的機制寫入mysql
    def __init__(self):
        self.conn = MySQLdb.connect('192.168.0.106', 'root', 'root', 'article_spider', charset="utf8", use_unicode=True)
        self.cursor = self.conn.cursor()

    def process_item(self, item, spider):
        insert_sql = """
            insert into jobbole_article(title, url, create_date, fav_nums)
            VALUES (%s, %s, %s, %s)
        """
        self.cursor.execute(insert_sql, (item["title"], item["url"], item["create_date"], item["fav_nums"]))
        self.conn.commit()

異步存入

import pymysql
from twisted.enterprise import adbapi


class MysqlTwistedPipline(object):
    def __init__(self, dbpool):
        self.dbpool = dbpool

    @classmethod
    def from_settings(cls, settings):
        dbparms = dict(
            host=settings["MYSQL_HOST"],
            db=settings["MYSQL_DBNAME"],
            user=settings["MYSQL_USER"],
            passwd=settings["MYSQL_PASSWORD"],
            charset='utf8',
            cursorclass=MySQLdb.cursors.DictCursor,
            use_unicode=True,
        )
        dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)

        return cls(dbpool)

    def process_item(self, item, spider):
        # 使用twisted將mysql插入變成異步執行
        query = self.dbpool.runInteraction(self.do_insert, item)
        query.addErrback(self.handle_error, item, spider)  # 處理異常

    def handle_error(self, failure, item, spider):
        # 處理異步插入的異常
        print(failure)

    def do_insert(self, cursor, item):
        # 執行具體的插入
        # 根據不一樣的item 構建不一樣的sql語句並插入到mysql中
        insert_sql, params = item.get_insert_sql()
        cursor.execute(insert_sql, params)
相關文章
相關標籤/搜索