前言:最近寫了一個自動導入mysql中數據表的腳本,後來發現有一個to_sql()函數也能夠實現相似的功能,測試了該函數後發現,沒法實現對數據類型的自動準確判斷。(有多是我沒理解到位,有經驗的朋友請指出)不羅嗦,直接放示例文件和代碼。
示例文件:mysql
# -*- coding: utf-8 -*- """ Created on Tue Jun 23 11:39:34 2020 @author: Ray """ import re import sys import pymysql import logging import argparse import numpy as np import pandas as pd from warnings import filterwarnings #--------------------------------------------------------------------------報錯日誌-----------------------------------------------------------------------------------------------# logging.basicConfig(filename = 'load.log', level=logging.DEBUG, format = "%(levelname)s %(asctime)s:%(name)s:%(message)s", datefmt = '%Y-%m-%d', filemode = 'w') filterwarnings("error", category = pymysql.Warning) # 捕獲mysql錯誤信息。 #--------------------------------------------------------------------------參數設置-----------------------------------------------------------------------------------------------# parser = argparse.ArgumentParser(description = 'Inofrmation of database table.') parser.add_argument('-data', action = 'store', help = 'Path of data source.', required = True) # 指定數據源或數據地址 parser.add_argument('-database', help = 'Database name.', required = True) # 指定存儲得庫名 parser.add_argument('-tablename', help = 'Table name.', required = True) # 指定存儲的表名 parser.add_argument('-droptable', help = 'If exists table, drop table', required = False, default = False, type = bool) # 若是建表時存在相同表名是否刪除 parser.add_argument('-primaryKey', nargs = '+', help = 'PrimaryKey column.(Example format: ID or ID Pos)', required = False, default = None) # 設定的主鍵 parser.add_argument('-index', nargs = '+', help = 'Index column.(Example format: MAF or MAF Pos)', required = False, default = None) # 指定索引字段(默認爲空) parser.add_argument('-index_union', help = 'Pass index_union dictionary.(Example format: MAF-Pos or MAF-Pos,Chrom-Gene)', required = False, default = None) # 指定索引字段(默認爲空) parser.add_argument('-host', help = 'Host name.', required = True) # 主機名或IP地址 parser.add_argument('-user', help = 'User name.', required = True) # 用戶名 parser.add_argument('-password', help = 'Password', required = True) # 密碼 args = parser.parse_args() wide_connect = None wide_cursor = None typeList = [] # 建表語句 priDict = dict() # 定義主鍵() #--------------------------------------------------------------------------數據導入-----------------------------------------------------------------------------------------------# # 判斷原始數據是否含有重複名 with open(args.data, 'r') as data: for line in data: li = line.strip().split('\t') break set_lst = set(li) if len(set_lst) == len(li): pass else: logging.warning("Duplicate column names.") sys.exit() data = pd.read_table(args.data, sep = '\t') if len(data.columsn) < 2: logging.debug("列太少") sys.exit() # 數據規範 if data.isnull().any().any() == True: # 判斷是否存在null值 for line, j in zip(data.values, range(len(data.values))): # 輸出null的位置 for i in line: if pd.isnull(i): logging.error("The NULL in the {}".format(j + 1)) sys.exit() if data.isna().any().any() == True: for line, j in zip(data.values, range(len(data.values))): # 輸出null的位置 for i in line: if pd.isna(i): logging.error("The NaN in the {}".format(j + 1)) sys.exit() if set(['MAF']).issubset(data.columns): # 判斷是否存在MAF列,判斷是否全小於0.5 MAF_max = max(i for i in data['MAF']) if MAF_max > 0.5: logging.error("MAF must be less than 0.5") sys.exit() if set(['Gene']).issubset(data.columns): # 若是用戶數據有Gene列,判斷其格式 for i ,j in zip(data['Gene'], range(len(data['Gene']))): if re.search(':', i) != None: logging.error("The bug in the {} line of Gene column".format(j + 1)) sys.exit() else: continue #--------------------------------------------------------------------------MySQL連接----------------------------------------------------------------------------------------------# wide_connect = pymysql.connect(host = args.host, user = args.user, password = args.password, port = 3306, charset = 'utf8', local_infile = True) # ***loacl_infile容許導入本地文件*** wide_cursor = wide_connect.cursor() #--------------------------------------------------------------------------數據類型-----------------------------------------------------------------------------------------------# # 針對整數型數值定義其格式----------------------------------------------------------------------------------------------------------------------------------------------------------- int_fun = lambda x: 'TINYINT({})'.format(len(str(x))) if -128 <= x <= 127\ else 'SMALLINT({})'.format(len(str(x))) if -32768<= x <= 32767\ else 'MEDIUMINT({})'.format(len(str(x))) if -8388608 <= x <= 8388607\ else 'INT({})'.format(len(str(x))) if -2147483648 <= x <= 2147483647\ else 'BIGINT({})'.format(len(str(x))) int_fun_unsigned = lambda x: 'TINYINT({}) UNSIGNED '.format(len(str(x))) if x <= 255\ else 'SMALLINT({}) UNSIGNED '.format(len(str(x))) if x <= 65535\ else 'MEDIUMINT({}) UNSIGNED '.format(len(str(x))) if x <= 16777215\ else 'INT({}) UNSIGNED '.format(len(str(x))) if x <= 4294967295\ else 'BIGINT({}) UNSIGNED '.format(len(str(x))) # 針對浮點型數據定義其格式----------------------------------------------------------------------------------------------------------------------------------------------------------- def float_fun(column): flo = [str(i).split('.') for i in data[column] if pd.notnull(i)] m = max([len(i[0]) for i in flo if len(i) >= 2]) # 整數位最大長度 d = min([len(i[1]) for i in flo if len(i) >= 2]) # 小數位最大長度 return 'FLOAT ({m}, {d})'.format(m = m + d + 2, d = d + 2) #--------------------------------------------------------------------------建表語句-----------------------------------------------------------------------------------------------# typeList = [] for column in data.columns: # 若是字段是整數型 if data[column].dtypes in [np.dtype('int16'), np.dtype('int32'), np.dtype('int64'), np.dtype('uint16'), np.dtype('uint32'), np.dtype('uint64')]: if -1 in np.sign(data[column]): int_max = data[column].max() # 整數長度 typeList.append("`{}`".format(column) + int_fun(int_max) + "DEFAULT NULL") continue else: int_max = data[column].max() # 整數長度 typeList.append("`{}`".format(column) + int_fun_unsigned(int_max) + "DEFAULT NULL") continue # 若是字段是浮點型 if data[column].dtype in [np.dtype('float16'), np.dtype('float32'), np.dtype('float64')]: typeList.append("`{}`".format(column) + float_fun(column) + "DEFAULT NULL") # 若是字段是文本 else: str_len = max([len(i) for i in data[column] if pd.notnull(i)]) typeList.append("`{}`".format(column) + "VARCHAR({})".format(str_len) + "DEFAULT NULL") # 若是定義了主鍵,則該字段默認不爲空---------------------------------------------------------------------------------------------------------------------------------------------------- for column, c in zip(data.columns, range(len(data.columns))): if column in args.primaryKey: typeList[c] = typeList[c].replace('DEFAULT NULL', 'NOT NULL') priDict[column] = column #----------------------------------------------------------------------------建表-------------------------------------------------------------------------------------------------# # 指定主鍵(咱們通常定義ID爲主鍵) if args.primaryKey != None: if len(args.primaryKey) == 1: # 若是定義一個鍵是主鍵 primaryKey_statement = ["PRIMARY KEY (`{}`)".format(col) for col in data.columns if col in args.primaryKey][0] else: primaryKey_statement = "PRIMARY KEY ({})".format(','.join(['`' + col + '`'for col in data.columns if col in args.primaryKey])) # 指定索引 if args.index != None and args.index_union != None: # 若是建立聯合索引,在將args.index中的每一個元素單首創建索引的同時,將args.index_union中的每個元素建立索引 index_statement = ','.join(["KEY `{}` (`{}`)".format(col, col) for col in data.columns if col in args.index]) index_statement = index_statement + ',' + ','.join(["KEY `{}` ({})".format(index, ','.join('`' + k + '`' for k in index.split('-'))) for index in args.index_union.split(',')]) elif args.index != None and args.index_union == None: # 若是不建立聯合索引,將self.key中的每一個元素建立索引 index_statement = ','.join(["KEY `{}` (`{}`)".format(col, col) for col in data.columns if col in args.index]) elif args.index == None and args.index_union != None: index_statement = ','.join(["KEY `{}` ({})".format(index, ','.join('`' + k + '`' for k in index.split('-'))) for index in args.index_union.split(',')]) # 執行建表語句---------------------------------------------------------------------------------------------------------------------------------------------------------------------- wide_cursor.execute("USE {};".format(args.database)) # 選擇數據庫 try: if args.droptable: # 是否刪除已經存在的表 wide_cursor.execute("DROP TABLE IF EXISTS {};".format(args.tablename)) except pymysql.Warning: logging.debug("不存在{}表".format(args.tablename)) if 'primaryKey_statement' in dir() and 'index_statement' in dir(): wide_cursor.execute("CREATE TABLE IF NOT EXISTS `{table_}`(\ {data_demand_statement_},\ {primary_key_},\ {index_statement_}) ENGINE=MyISAM DEFAULT CHARSET=utf8;".\ format(table_ = args.tablename, # 表名 data_demand_statement_ = ','.join(typeList), # 字段格式語句 primary_key_ = primaryKey_statement, # 主鍵語句 index_statement_ = index_statement # 索引語句 ) ) elif 'primaryKey_statement' not in dir() and 'index_statement' in dir(): wide_cursor.execute("CREATE TABLE IF NOT EXISTS `{table_}`(\ {data_demand_statement_},\ {index_statement_}) ENGINE=MyISAM DEFAULT CHARSET=utf8;".\ format(table_ = args.tablename, # 表名 data_demand_statement_ = ','.join(typeList), # 字段格式語句 index_statement_ = index_statement # 索引語句 ) ) elif 'primaryKey_statement' in dir() and 'index_statement' not in dir(): wide_cursor.execute("CREATE TABLE IF NOT EXISTS `{table_}`(\ {data_demand_statement_},\ {primary_key_}) ENGINE=MyISAM DEFAULT CHARSET=utf8;".\ format(table_ = args.tablename, # 表名 data_demand_statement_ = ','.join(typeList), # 字段格式語句 primary_key_ = primaryKey_statement # 索引語句 ) ) else: wide_cursor.execute("CREATE TABLE IF NOT EXISTS `{table_}`(\ {data_demand_statement_}) ENGINE=MyISAM DEFAULT CHARSET=utf8;".\ format(table_ = args.tablename, # 表名 data_demand_statement_ = ','.join(typeList), # 字段格式語句 ) ) #----------------------------------------------------------------------------存儲-------------------------------------------------------------------------------------------------# wide_cursor.execute("USE {}".format(args.database)) # 導入數據表語句------------------------------------------------------------------------------------------------------------------------------------------------------------ try: wide_cursor.execute("LOAD DATA LOCAL INFILE '{}' INTO TABLE {}.{} FIELDS TERMINATED BY '\\t' IGNORE 1 LINES".format(args.data, args.database, args.tablename)) except pymysql.Warning: logging.warning("Duplicate primarikey.") wide_cursor.execute("drop table {}".format(args.tablename)) wide_connect.commit() # 數據提交 wide_cursor.close() # 關閉連接
代碼有不少不成熟的地方,你們在使用過程當中發現有錯誤請指出。
在學習的過程當中借鑑了這篇文章。(https://blog.csdn.net/weixin_38153458/article/details/85289433?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.nonecase&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.nonecase)sql