使用異步的twisted框架寫入數據

1.twisted框架介紹

  • Twisted是用Python實現的基於事件驅動的網絡引擎框架;html

  • Twisted支持許多常見的傳輸及應用層協議,包括TCP、UDP、SSL/TLS、HTTP、IMAP、SSH、IRC以及FTP。就像Python同樣,Twisted也具備「內置池」(batteries-included)的特色。Twisted對於其支持的全部協議都帶有客戶端和服務器實現,同時附帶有基於命令行的工具,使得配置和部署產品級的Twisted應用變得很是方便。mysql

  • 官網地址:https://twistedmatrix.com/trac/sql

2.MySQL數據庫信息保存到settings文件中

  • 首先咱們須要把MySQL數據庫中的配置信息保存到settings文件中,如:MYSQL_HOST = 'localhost'的形式;
MYSQL_HOST = 'localhost'
MYSQL_USER = 'xkd'
MYSQL_PASSWORD = '123456'
MYSQL_DATABASE = 'item_database'
MYSQL_PORT = 3306
MYSQL_OPTIONAL = dict(
    USE_UNICODE = True,
    CHARSET = 'utf8',
)
  • 而後從settings文件中將這些信息導入到pipeline.py文件中使用;
from .settings import MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE, MYSQL_PORT, MYSQL_OPTIONAL
class MysqlPipeline:
    def __init__(self):
        self.conn = MySQLdb.connect(host=MYSQL_HOST, user=MYSQL_USER, password=MYSQL_PASSWORD, database=MYSQL_DATABASE, use_unicode=MYSQL_OPTIONAL.get('USE_UNICODE'), charset=MYSQL_OPTIONAL.get('CHARSET'))
        self.cursor = self.conn.cursor()
    def process_item(self, item, spider):
        sql = 'insert into item(title, image_url, date, image_path, url, url_id)' \
              'values (%s, %s, %s, %s, %s, %s)'
        date = item['date']
        self.cursor.execute(sql, args=(item['title'], item['image_url'], date, item['image_path'], item['url'], item['url_id']))
        self.conn.commit()
        return item
    def spider_closed(self, spider):
        self.cursor.close()
        self.conn.close()

3.建立異步Pipeline寫入數據庫

  • 首先建立一個用於異步寫入數據的AIOMysqlItemPipeline類,而後在這個類的初始化方法中建立一個pool鏈接池;數據庫

  • 而後在from_settings()方法中獲取settings文件中的數據庫配置信息,並將配置信息存入一個字典中。使用Twisted中的adbapi獲取數據庫鏈接池對象,使用前須要導入adbapi,如:from twisted.enterprise import adbapi。使用時須要用到ConnectionPool鏈接池:pool=adbapi.ConnectionPool('MySQLdb',**params),參數MySQLdb是使用的數據庫引擎的名字,params就是要傳遞的數據庫配置信息;api

  • 接着在process_item()方法中使用數據庫鏈接池對象進行數據庫操做,自動傳遞cursor對象到數據庫操做方法runInteraction()的第一個參數(自定義方法)如:ret=self.connection_pool.runInteraction(self.mysql_insert,item)服務器

  • 還能夠設置出錯時的回調方法,自動傳遞出錯消息對象failure到錯誤處理方法的第一個參數(自定義方法)如:ret.addErrback(self.error_callback)網絡

  • 最後記得修改settings文件中的ITEM_PIPELINES配置,如:'XKD_Dribbble_Spider.pipelines.AIOMysqlItemPipeline': 2框架

from twisted.enterprise import adbapi
import MySQLdb.cursors
class AIOMysqlItemPipeline:
    def __init__(self, pool):
        self.connection_pool = pool
    # 1:調用類方法
    @classmethod
    def from_settings(cls, settings):
        connkw = {
            'host': MYSQL_HOST,
            'user': MYSQL_USER,
            'password': MYSQL_PASSWORD,
            'db': MYSQL_DATABASE,
            'port': MYSQL_PORT,
            'use_unicode': MYSQL_OPTIONAL.get('USE_UNICODE'),
            'charset': MYSQL_OPTIONAL.get('CHARSET'),
            'cursorclass': MySQLdb.cursors.DictCursor,
        }
        pool = adbapi.ConnectionPool('MySQLdb', **connkw)
        return cls(pool)
    # 2:執行process_item
    def process_item(self, item, spider):
        ret = self.connection_pool.runInteraction(self.mysql_insert, item)
        ret.addErrback(self.error_callback)
    def mysql_insert(self, cursor, item):
        sql = 'insert into item(title, image_url, date, image_path, url, url_id)' \
              'values (%s, %s, %s, %s, %s, %s)'
        date = item['date']
        cursor.execute(sql, args=(item['title'], item['image_url'], date, item['image_path'], item['url'], item['url_id']))
    def error_callback(self, error):
        print('insert_error =========== {}'.format(error))
修改settings文件
ITEM_PIPELINES = {
   # 'XKD_Dribbble_Spider.pipelines.XkdDribbbleSpiderPipeline': 300,
   # 當items.py模塊yield以後,默認就是下載image_url的頁面
   'XKD_Dribbble_Spider.pipelines.ImagePipeline': 1,
   'XKD_Dribbble_Spider.pipelines.AIOMysqlItemPipeline': 2,
}

參考:https://www.9xkd.com/user/plan-view.html?id=1784587600異步

相關文章
相關標籤/搜索