項目需求... 直接上碼mysql
# -*- coding:utf-8 -*- import os, sys print(os.path.dirname(os.path.abspath(__file__))) sys.path.append(os.path.dirname(os.path.abspath(__file__))) from sqlalchemy import create_engine, update, Table, Column, Integer, String, MetaData, ForeignKey from sqlalchemy.orm import Session from sqlalchemy.ext.automap import automap_base import pymysql pymysql.install_as_MySQLdb() from ..mytools.settings import mysqldb import json, time class MysqlSaveData(object): """ mysql數據庫,自動映射工具類; 配置文件在當前工具類文件統計目錄下的settings中的mysqdb mysql: 'master_host': 主數據庫ip 'slave_host': 從數據庫ip 'master_port': 主數據庫端口 'slave_port': 從數據庫端口 'master_db': 主,所需映射數據庫名稱 'slave_db': 從,所需映射數據庫名稱 'master_user_name': 主數據庫用戶名 'slave_user_name': 從數據庫用戶名 'master_password': 主數據庫密碼 'slave_password': 從數據庫密碼 """ # 主數據庫 master_engine_str = "mysql+pymysql://" + mysqldb['master_user_name'] + ":" + mysqldb['master_password'] + "@" + \ mysqldb['master_host'] + ":" + str(mysqldb['master_port']) + "/" + mysqldb[ 'master_db'] + "?charset=utf8mb4" master_engine = create_engine(master_engine_str, echo=False) # echo: 是否打印映射時的sql語句 master_session = Session(master_engine) master_Base = automap_base() master_Base.prepare(master_engine, reflect=True) # 從數據庫 slave_engine_str = "mysql+pymysql://" + mysqldb['slave_user_name'] + ":" + mysqldb['slave_password'] + "@" + \ mysqldb['slave_host'] + ":" + str(mysqldb['slave_port']) + "/" + mysqldb[ 'slave_db'] + "?charset=utf8mb4" slave_engine = create_engine(slave_engine_str, echo=False) # echo: 是否打印映射時的sql語句 slave_session = Session(slave_engine) slave_Base = automap_base() slave_Base.prepare(slave_engine, reflect=True) def __init__(self, table_name): # table_name """ 建立表對象 :param table_name: 表名稱 """ self.table_name = table_name self.master_table = self.master_Base.classes.__getitem__(key=table_name) # print(self.master_table) self.slave_table = self.slave_Base.classes.__getitem__(key=table_name) def create_master_connection(self): """ 建立主數據庫鏈接對象 :return: """ def create_slave_connection(self, table_name): """ 新建從數據庫鏈接對象 :return: """ # 從數據庫 slave_engine = create_engine(self.slave_engine_str, echo=False) # echo: 是否打印映射時的sql語句 slave_session = Session(slave_engine) slave_Base = automap_base() slave_Base.prepare(slave_engine, reflect=True) slave_table = slave_Base.classes.__getitem__(key=table_name) return slave_table, slave_session def _update(self, dict_v, item, **kwargs): """ 更新數據庫數據; 如需更新數據,繼承該類爲父類,複寫此_update()方法 :param kwargs: 查詢條件;k:v形式;k爲表內字段名稱 :param dict_v: 所需更新的數據,字典類型 :param item: 爬蟲爬取的數據,字典類型 :return: """ pass def update_commit(self): """ 更新數據專用提交接口 :return: """ try: self.master_session.commit() except Exception as e: print('更新失敗') print(e) self.master_session.rollback() else: print('更新成功') finally: self.master_session.close() def _add(self, dict_v): """ 新數據添加 :return: """ data = self.master_table( **dict_v ) self.master_session.add(data) try: self.master_session.commit() except Exception as e: print('插入失敗') print(e) self.master_session.rollback() else: print('插入成功') finally: self.master_session.close() def _find(self, **kwargs): """ 單條查詢 :param kwargs: 查詢條件;k:v形式;k爲表內字段名稱 :return: """ result = self.slave_session.query(self.slave_table).filter_by(**kwargs).first() return result def process_data_and_save(self, dict_v, item=None, **kwargs): """ 映射mysql已有數據庫表結構 :param dict_v: 表內字段名稱的複製,字典類型 :param kwargs: 查詢條件,k:v形式; k爲表內字段名稱 :return: """ if not self._find(**kwargs): # 添加新數據 self._add(dict_v) else: # 更新數據庫數據 self._update(dict_v, item,**kwargs) # 建立提交數據後的新從機鏈接對象 slave_table, slave_session = self.create_slave_connection(self.table_name) # 返回當前插入成功數據id result = slave_session.query(slave_table).filter_by(**kwargs).first().id return result if __name__ == '__main__': item = {} # 須要插入的數據 # 表與數據字段映射關係;不設置默認爲None # 注意:若是字段的值爲字符串之外數據,須要使用json轉換,記得設置「ensure_ascii=False」 dict_v = dict( content=item["content_list"] if isinstance(item['content_list'], str) else json.dumps(item['content_list'], ensure_ascii=False), newstime=item["publish_date"], title=item["title"], sitename="", author=item["authro"], introduction=item["intro"] ) m = MysqlSaveData(table_name='a_community_news') ids = m.process_data_and_save(dict_v=dict_v, title=item['title']) print(ids)
時間倉促,也沒有na那麼多的廢話,多跑兩下,基本都清楚了sql