【sqlalchemy】automap自動映射mysql數據庫

項目需求... 直接上碼mysql

  1. ORM文件

忘記說了,mysqldb的配置settings文件,要放在ORM文件同級目錄下。

# -*- coding:utf-8 -*-
import os, sys
print(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from sqlalchemy import create_engine, update, Table, Column, Integer, String, MetaData, ForeignKey
from sqlalchemy.orm import Session
from sqlalchemy.ext.automap import automap_base

import pymysql
pymysql.install_as_MySQLdb()

from ..mytools.settings import mysqldb
import json, time


class MysqlSaveData(object):
    """
    mysql數據庫,自動映射工具類;
    配置文件在當前工具類文件統計目錄下的settings中的mysqdb
    mysql:
        'master_host': 主數據庫ip
        'slave_host': 從數據庫ip
        'master_port': 主數據庫端口
        'slave_port': 從數據庫端口
        'master_db': 主,所需映射數據庫名稱
        'slave_db': 從,所需映射數據庫名稱
        'master_user_name': 主數據庫用戶名
        'slave_user_name': 從數據庫用戶名
        'master_password': 主數據庫密碼
        'slave_password': 從數據庫密碼
    """
    # 主數據庫
    master_engine_str = "mysql+pymysql://" + mysqldb['master_user_name'] + ":" + mysqldb['master_password'] + "@" + \
                        mysqldb['master_host'] + ":" + str(mysqldb['master_port']) + "/" + mysqldb[
                            'master_db'] + "?charset=utf8mb4"
    master_engine = create_engine(master_engine_str, echo=False)  # echo: 是否打印映射時的sql語句
    master_session = Session(master_engine)
    master_Base = automap_base()
    master_Base.prepare(master_engine, reflect=True)

    # 從數據庫
    slave_engine_str = "mysql+pymysql://" + mysqldb['slave_user_name'] + ":" + mysqldb['slave_password'] + "@" + \
                       mysqldb['slave_host'] + ":" + str(mysqldb['slave_port']) + "/" + mysqldb[
                           'slave_db'] + "?charset=utf8mb4"
    slave_engine = create_engine(slave_engine_str, echo=False)  # echo: 是否打印映射時的sql語句
    slave_session = Session(slave_engine)
    slave_Base = automap_base()
    slave_Base.prepare(slave_engine, reflect=True)

    def __init__(self, table_name):  # table_name
        """
        建立表對象
        :param table_name:  表名稱
        """
        self.table_name = table_name
        self.master_table = self.master_Base.classes.__getitem__(key=table_name)
        # print(self.master_table)
        self.slave_table = self.slave_Base.classes.__getitem__(key=table_name)

    def create_master_connection(self):
        """
        建立主數據庫鏈接對象
        :return:
        """

    def create_slave_connection(self, table_name):
        """
        新建從數據庫鏈接對象
        :return:
        """
        # 從數據庫
        slave_engine = create_engine(self.slave_engine_str, echo=False)  # echo: 是否打印映射時的sql語句
        slave_session = Session(slave_engine)
        slave_Base = automap_base()
        slave_Base.prepare(slave_engine, reflect=True)
        slave_table = slave_Base.classes.__getitem__(key=table_name)

        return slave_table, slave_session

    def _update(self, dict_v, item, **kwargs):
        """
        更新數據庫數據;
        如需更新數據,繼承該類爲父類,複寫此_update()方法

        :param kwargs:  查詢條件;k:v形式;k爲表內字段名稱
        :param dict_v:  所需更新的數據,字典類型
        :param item:  爬蟲爬取的數據,字典類型
        :return:
        """
        pass

    def update_commit(self):
        """
        更新數據專用提交接口
        :return:
        """
        try:
            self.master_session.commit()
        except Exception as e:
            print('更新失敗')
            print(e)
            self.master_session.rollback()
        else:
            print('更新成功')
        finally:
            self.master_session.close()

    def _add(self, dict_v):
        """
        新數據添加
        :return:
        """
        data = self.master_table(
            **dict_v
        )
        self.master_session.add(data)
        try:
            self.master_session.commit()
        except Exception as e:
            print('插入失敗')
            print(e)
            self.master_session.rollback()
        else:
            print('插入成功')
        finally:
            self.master_session.close()

    def _find(self, **kwargs):
        """
        單條查詢
        :param kwargs:  查詢條件;k:v形式;k爲表內字段名稱
        :return:
        """
        result = self.slave_session.query(self.slave_table).filter_by(**kwargs).first()
        return result

    def process_data_and_save(self, dict_v, item=None, **kwargs):
        """
        映射mysql已有數據庫表結構
        :param dict_v:  表內字段名稱的複製,字典類型
        :param kwargs: 查詢條件,k:v形式; k爲表內字段名稱
        :return:
        """
        if not self._find(**kwargs):
            # 添加新數據
            self._add(dict_v)
        else:
            # 更新數據庫數據
            self._update(dict_v, item,**kwargs)

        # 建立提交數據後的新從機鏈接對象
        slave_table, slave_session = self.create_slave_connection(self.table_name)
        # 返回當前插入成功數據id
        result = slave_session.query(slave_table).filter_by(**kwargs).first().id
        return result


if __name__ == '__main__':
    item = {}  # 須要插入的數據
	
	# 表與數據字段映射關係;不設置默認爲None
	# 注意:若是字段的值爲字符串之外數據,須要使用json轉換,記得設置「ensure_ascii=False」
    dict_v = dict(
        content=item["content_list"] if isinstance(item['content_list'], str) else json.dumps(item['content_list'],                                  ensure_ascii=False),
        newstime=item["publish_date"],
        title=item["title"],
        sitename="",
        author=item["authro"],
        introduction=item["intro"]
        )

    m = MysqlSaveData(table_name='a_community_news')

    ids = m.process_data_and_save(dict_v=dict_v, title=item['title'])

    print(ids)

時間倉促,也沒有na那麼多的廢話,多跑兩下,基本都清楚了sql

相關文章
相關標籤/搜索