python-自動判斷數據類型,並將數據導入到MySQL中

前言:最近寫了一個自動導入mysql中數據表的腳本,後來發現有一個to_sql()函數也能夠實現相似的功能,測試了該函數後發現,沒法實現對數據類型的自動準確判斷。(有多是我沒理解到位,有經驗的朋友請指出)不羅嗦,直接放示例文件和代碼。
示例文件:[WL75TR@EL9@5OOY6~_EBBO.pngmysql

# -*- coding: utf-8 -*-
"""
Created on Tue Jun 23 11:39:34 2020

@author: Ray
"""


import re
import sys
import pymysql
import logging
import argparse
import numpy as np
import pandas as pd
from warnings import filterwarnings



#--------------------------------------------------------------------------報錯日誌-----------------------------------------------------------------------------------------------#

logging.basicConfig(filename = 'load.log', level=logging.DEBUG, format = "%(levelname)s %(asctime)s:%(name)s:%(message)s", datefmt = '%Y-%m-%d', filemode = 'w')
filterwarnings("error", category = pymysql.Warning) # 捕獲mysql錯誤信息。


#--------------------------------------------------------------------------參數設置-----------------------------------------------------------------------------------------------#

parser = argparse.ArgumentParser(description = 'Inofrmation of database table.')
parser.add_argument('-data', action = 'store', help = 'Path of data source.', required = True) # 指定數據源或數據地址
parser.add_argument('-database', help = 'Database name.', required = True) # 指定存儲得庫名
parser.add_argument('-tablename', help = 'Table name.', required = True) # 指定存儲的表名
parser.add_argument('-droptable', help = 'If exists table, drop table', required = False, default = False, type = bool) # 若是建表時存在相同表名是否刪除
parser.add_argument('-primaryKey', nargs = '+', help = 'PrimaryKey column.(Example format: ID or ID Pos)', required = False, default = None) # 設定的主鍵
parser.add_argument('-index', nargs = '+', help = 'Index column.(Example format: MAF or MAF Pos)', required = False, default = None) # 指定索引字段(默認爲空)
parser.add_argument('-index_union', help = 'Pass index_union dictionary.(Example format: MAF-Pos or MAF-Pos,Chrom-Gene)', required = False, default = None) # 指定索引字段(默認爲空)
parser.add_argument('-host', help = 'Host name.', required = True) # 主機名或IP地址
parser.add_argument('-user', help = 'User name.', required = True) # 用戶名
parser.add_argument('-password', help = 'Password', required = True) # 密碼
args = parser.parse_args()    
wide_connect = None
wide_cursor = None
typeList = [] # 建表語句
priDict = dict() # 定義主鍵()
        


#--------------------------------------------------------------------------數據導入-----------------------------------------------------------------------------------------------#

# 判斷原始數據是否含有重複名
with open(args.data, 'r') as data:
    for line in data:
        li = line.strip().split('\t')
        break
        
set_lst = set(li)
        
if len(set_lst) == len(li):
    pass    
else:
    logging.warning("Duplicate column names.")
    sys.exit()
    
data = pd.read_table(args.data, sep = '\t')
if len(data.columsn) < 2:
    logging.debug("列太少")
    sys.exit()

# 數據規範
if data.isnull().any().any() == True: # 判斷是否存在null值
    for line, j in zip(data.values, range(len(data.values))): # 輸出null的位置
        for i in line:
            if pd.isnull(i):
                logging.error("The NULL in the {}".format(j + 1))
                sys.exit()  
 
if data.isna().any().any() == True:
    for line, j in zip(data.values, range(len(data.values))): # 輸出null的位置
        for i in line:
            if pd.isna(i):
                logging.error("The NaN in the {}".format(j + 1))
                sys.exit()

if set(['MAF']).issubset(data.columns): # 判斷是否存在MAF列,判斷是否全小於0.5
    MAF_max = max(i for i in data['MAF']) 
    if MAF_max > 0.5:
        logging.error("MAF must be less than 0.5")
        sys.exit()

if set(['Gene']).issubset(data.columns): # 若是用戶數據有Gene列,判斷其格式
    for i ,j in zip(data['Gene'], range(len(data['Gene']))):
        if re.search(':', i) != None:
            logging.error("The bug in the {} line of Gene column".format(j + 1))
            sys.exit()
        else:
            continue



#--------------------------------------------------------------------------MySQL連接----------------------------------------------------------------------------------------------#

wide_connect = pymysql.connect(host = args.host,
                               user = args.user,
                               password = args.password,
                               port = 3306, charset = 'utf8',
                               local_infile = True) # ***loacl_infile容許導入本地文件***
wide_cursor = wide_connect.cursor()



#--------------------------------------------------------------------------數據類型-----------------------------------------------------------------------------------------------#
      
# 針對整數型數值定義其格式-----------------------------------------------------------------------------------------------------------------------------------------------------------
int_fun = lambda x: 'TINYINT({})'.format(len(str(x))) if -128 <= x <= 127\
            else 'SMALLINT({})'.format(len(str(x))) if -32768<= x <= 32767\
            else 'MEDIUMINT({})'.format(len(str(x))) if -8388608 <= x <= 8388607\
            else 'INT({})'.format(len(str(x))) if -2147483648 <= x <= 2147483647\
            else 'BIGINT({})'.format(len(str(x)))

int_fun_unsigned = lambda x: 'TINYINT({}) UNSIGNED '.format(len(str(x))) if x <= 255\
            else 'SMALLINT({}) UNSIGNED '.format(len(str(x))) if x <= 65535\
            else 'MEDIUMINT({}) UNSIGNED '.format(len(str(x))) if x <= 16777215\
            else 'INT({}) UNSIGNED '.format(len(str(x))) if x <= 4294967295\
            else 'BIGINT({}) UNSIGNED '.format(len(str(x)))
        
              
# 針對浮點型數據定義其格式-----------------------------------------------------------------------------------------------------------------------------------------------------------
def float_fun(column):
    flo = [str(i).split('.') for i in data[column] if pd.notnull(i)]
    m = max([len(i[0]) for i in flo if len(i) >= 2]) # 整數位最大長度
    d = min([len(i[1]) for i in flo if len(i) >= 2]) # 小數位最大長度
    return 'FLOAT ({m}, {d})'.format(m = m + d + 2, d = d + 2)



#--------------------------------------------------------------------------建表語句-----------------------------------------------------------------------------------------------#
typeList = []
for column in data.columns:
    # 若是字段是整數型
    if data[column].dtypes in [np.dtype('int16'), np.dtype('int32'), np.dtype('int64'), np.dtype('uint16'), np.dtype('uint32'), np.dtype('uint64')]:
        if -1 in np.sign(data[column]):
            int_max = data[column].max() # 整數長度
            typeList.append("`{}`".format(column) + int_fun(int_max) + "DEFAULT NULL")
            continue
        else:
            int_max = data[column].max() # 整數長度
            typeList.append("`{}`".format(column) + int_fun_unsigned(int_max) + "DEFAULT NULL")
            continue
    # 若是字段是浮點型
    if data[column].dtype in [np.dtype('float16'), np.dtype('float32'), np.dtype('float64')]:
        typeList.append("`{}`".format(column) + float_fun(column) + "DEFAULT NULL")
        # 若是字段是文本
    else:
        str_len = max([len(i) for i in data[column] if pd.notnull(i)])
        typeList.append("`{}`".format(column) + "VARCHAR({})".format(str_len) + "DEFAULT NULL")
    
# 若是定義了主鍵,則該字段默認不爲空----------------------------------------------------------------------------------------------------------------------------------------------------
for column, c in zip(data.columns, range(len(data.columns))):
    if column in args.primaryKey:
        typeList[c] = typeList[c].replace('DEFAULT NULL', 'NOT NULL')
        priDict[column] = column
                    


#----------------------------------------------------------------------------建表-------------------------------------------------------------------------------------------------#

# 指定主鍵(咱們通常定義ID爲主鍵)
if args.primaryKey != None:
    if len(args.primaryKey) == 1: # 若是定義一個鍵是主鍵
        primaryKey_statement = ["PRIMARY KEY (`{}`)".format(col) for col in data.columns if col in args.primaryKey][0]
    else:
        primaryKey_statement = "PRIMARY KEY ({})".format(','.join(['`' + col + '`'for col in data.columns if col in args.primaryKey]))
        

# 指定索引
if args.index != None and args.index_union != None: # 若是建立聯合索引,在將args.index中的每一個元素單首創建索引的同時,將args.index_union中的每個元素建立索引
    index_statement = ','.join(["KEY `{}` (`{}`)".format(col, col) for col in data.columns if col in args.index])
    index_statement =  index_statement + ',' + ','.join(["KEY `{}` ({})".format(index, ','.join('`' + k + '`' for k in index.split('-'))) for index in args.index_union.split(',')])
elif args.index != None and args.index_union == None: # 若是不建立聯合索引,將self.key中的每一個元素建立索引
    index_statement = ','.join(["KEY `{}` (`{}`)".format(col, col) for col in data.columns if col in args.index])
elif args.index == None and args.index_union != None:
    index_statement = ','.join(["KEY `{}` ({})".format(index, ','.join('`' + k + '`' for k in index.split('-'))) for index in args.index_union.split(',')])
    
# 執行建表語句----------------------------------------------------------------------------------------------------------------------------------------------------------------------
wide_cursor.execute("USE {};".format(args.database)) # 選擇數據庫

try:
    if args.droptable: # 是否刪除已經存在的表
        wide_cursor.execute("DROP TABLE IF EXISTS {};".format(args.tablename))
except pymysql.Warning:
    logging.debug("不存在{}表".format(args.tablename))
    
if 'primaryKey_statement' in dir() and 'index_statement' in dir(): 
    wide_cursor.execute("CREATE TABLE IF NOT EXISTS `{table_}`(\
                        {data_demand_statement_},\
                        {primary_key_},\
                        {index_statement_}) ENGINE=MyISAM DEFAULT CHARSET=utf8;".\
                        format(table_ = args.tablename, # 表名
                               data_demand_statement_ = ','.join(typeList), # 字段格式語句
                               primary_key_ = primaryKey_statement, # 主鍵語句
                               index_statement_ = index_statement # 索引語句
                               )
                        )
elif 'primaryKey_statement' not in dir() and 'index_statement' in dir():
    wide_cursor.execute("CREATE TABLE IF NOT EXISTS `{table_}`(\
                        {data_demand_statement_},\
                        {index_statement_}) ENGINE=MyISAM DEFAULT CHARSET=utf8;".\
                        format(table_ = args.tablename, # 表名
                               data_demand_statement_ = ','.join(typeList), # 字段格式語句
                               index_statement_ = index_statement # 索引語句
                               )
                        )
elif 'primaryKey_statement' in dir() and 'index_statement' not in dir():
    wide_cursor.execute("CREATE TABLE IF NOT EXISTS `{table_}`(\
                        {data_demand_statement_},\
                        {primary_key_}) ENGINE=MyISAM DEFAULT CHARSET=utf8;".\
                        format(table_ = args.tablename, # 表名
                               data_demand_statement_ = ','.join(typeList), # 字段格式語句
                               primary_key_ = primaryKey_statement # 索引語句
                               )
                        )
else:
     wide_cursor.execute("CREATE TABLE IF NOT EXISTS `{table_}`(\
                        {data_demand_statement_}) ENGINE=MyISAM DEFAULT CHARSET=utf8;".\
                        format(table_ = args.tablename, # 表名
                               data_demand_statement_ = ','.join(typeList), # 字段格式語句
                               )
                        )


    
        
#----------------------------------------------------------------------------存儲-------------------------------------------------------------------------------------------------#
wide_cursor.execute("USE {}".format(args.database))

# 導入數據表語句------------------------------------------------------------------------------------------------------------------------------------------------------------
try:
    wide_cursor.execute("LOAD DATA LOCAL INFILE '{}' INTO TABLE {}.{} FIELDS TERMINATED BY '\\t' IGNORE 1 LINES".format(args.data, args.database, args.tablename))
except pymysql.Warning:
    logging.warning("Duplicate primarikey.")
    wide_cursor.execute("drop table {}".format(args.tablename))
    
    
wide_connect.commit() # 數據提交
wide_cursor.close() # 關閉連接

代碼有不少不成熟的地方,你們在使用過程當中發現有錯誤請指出。
在學習的過程當中借鑑了這篇文章。(https://blog.csdn.net/weixin_38153458/article/details/85289433?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.nonecase&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.nonecasesql

相關文章
相關標籤/搜索