用python執行sql來驗證數據是否準時導入了目標庫python
目前業務當中有不少場景是從其餘廠商那裏抽取數據到咱們本身的數據庫。mysql
這樣就會出現數據沒有同步過來的問題。有時候是咱們本身同步任務失敗,有時候是廠商的數據沒有即時生成。sql
爲防止這種狀況對後續數據處理帶來不良影響,因此寫了一個定時任務去查看數據源數據是否準時抽取到了。數據庫
一、首先是數據庫的配置fetch
二、要驗證的sql和錯誤提示的說明用||分隔。debug
三、執行sql並判斷的python腳本日誌
我這裏只是樣例,能夠根據本身的實際對sql和python進行定製orm
import os, sys, time
import logging.config
import ConfigParser
from datetime import datetime, timedelta
import MySQLdb
# import pandas as pd
from interval import Interval, IntervalSetblog
reload(sys)
sys.setdefaultencoding("utf-8")utf-8
# read mysql config
config = ConfigParser.ConfigParser()
config.read(sys.path[0] + '/prod.conf')
host = config.get('mysql', 'host')
user = config.get('mysql', 'user')
passwd = config.get('mysql', 'passwd')
database = config.get('mysql', 'database')
retry_count = config.get('mysql', 'retry_count')
data_check_sql_logpath = config.get('logpath', 'data_check_sql_logpath')
curr_date_str = time.strftime('%Y-%m-%d', time.localtime())
logger = logging.getLogger('')
logger.setLevel(logging.DEBUG)
# 建立一個handler,用於寫入日誌文件
fileHandler = logging.FileHandler(data_check_sql_logpath + '/data_check_sql_' + curr_date_str + '.log')
fileHandler.setLevel(logging.DEBUG)
# 再建立一個handler,用於輸出到控制檯
consoleHandler = logging.StreamHandler()
consoleHandler.setLevel(logging.DEBUG)
# 定義handler的輸出格式
formatter = logging.Formatter('%(asctime)s - %(lineno)d - %(name)s - %(levelname)s - %(message)s')
fileHandler.setFormatter(formatter)
consoleHandler.setFormatter(formatter)
# 給logger添加handler
logger.addHandler(fileHandler)
logger.addHandler(consoleHandler)
conn = None
def db_conn():
"""
Mysql connection.
Parameters
----------
Returns
-------
conn : Connection
"""
try:
return MySQLdb.connect(host, user, passwd, database, charset='utf8')
except Exception as e:
logger.error(repr(e))
return db_retry()
def db_retry():
try:
conn.ping(True)
return conn
except Exception as e:
logger.error(e)
logger.error('Mysql connection is closed, Now retry connect...')
retry = 0
while retry < retry_count:
try:
logger.debug('Retry times is %i' % (retry + 1))
return MySQLdb.connect(host, user, passwd, database, charset='utf8')
except Exception as e:
logger.error(repr(e))
retry += 1
else:
return None
def db_select():
"""
Mysql table select.
Parameters
----------
Returns
-------
no return
"""
global conn
#
error_msg=""
conn = db_conn()
if conn is None:
logger.error('Still cannot connect mysql after 1 times retry.')
return None
cur = conn.cursor()
file = open("/ddhome/usr/bin/python/data_check_sql/data_check_sql.txt")
for line in file:
arr = line.split('||')
sql = arr[0]
msg = arr[1]
logger.debug('--------------------------')
logger.debug(':' + sql)
cur.execute(sql)
count = cur.fetchone()[0]
if (count <= 0):
error_msg=''+error_msg+msg
logger.error(msg+'沒有抽取到當天的數據!')
# print msg+'沒有抽取到當天的數據!'
else:
countStr = bytes(count)
logger.debug(msg+'抽取到當天的數據:'+countStr+'條')
# print msg+'抽取到當天的數據:'+countStr+'條'
logger.debug('--------------------------')
conn.commit()
#若是有當天數據是0條的,則拋出異常
assert len(error_msg) < 2
if __name__ == "__main__":
db_select()
四、執行該python腳本。我是加到jenkins裏面進行調度的
python /ddhome/usr/bin/python/data_check_sql/data_check_sql.py