import threading import time from impala.dbapi import connect import pandas as pd import os class MyThread(threading.Thread): def __init__(self, threadid, name, day, min): threading.Thread.__init__(self) self.threadID = threadid self.name = name self.day = day self.min = min def run(self): print("開始線程:" + self.name) run_task(self.name, self.day, self.min) print("退出線程:" + self.name) def run_task(thread_name, day, min): hour = 24 while hour: if hour < 11: _hour = '0' + str(hour - 1) else: _hour = hour - 1 try: query_and_write(day, _hour, min) except Exception as e: print(str(day) + str(_hour) + '-----need--rey--try-' + str(e)) time.sleep(10) continue time.sleep(2) print("%s: %s" % (thread_name, time.ctime(time.time()))) hour -= 1 def impala_conn_exec(sql): conn = connect(host='*.*.*.*', port=21050) cur = conn.cursor() cur.execute(sql) cur.close result = cur.fetchall() return result def query_and_write(_date, _hour, _min): """ 初始化sql,調用查詢並寫入CSV :param _date: :param _time: :return: """ sql = "select * " \ "from 表名 m where date= '{0}' " \ "and substr(cast(time as string),1,15) = '{0} {1}:{2}'/*SA(production)*/".format(_date, _hour, _min) print(sql) result = impala_conn_exec(sql) # result = [] df = pd.DataFrame(data=result) path = _date if os.path.exists(path): pass else: os.mkdir(path) file_name = "data_{0}-{1}-{2}.csv".format(_date, _hour, _min) df.to_csv(path + '/' + file_name, encoding="utf-8", index=False) # 按日期範圍循環 遍歷區間開閉示意: [起始日期,截止日期) for j in range(5, 7): daterange = "2020-02" if j < 10: daterange = daterange + "-0" + str(j) else: daterange = daterange + "-" + str(j) print(daterange) # 建立新線程 threads = [] for i in range(0, 6): if i < 10: id = "thread0" + str(i) else: id = "thread" + str(i) threadName = MyThread(1, id, daterange, i) threads.append(threadName) try: # 啓動線程 for thread in threads: thread.setDaemon(True) thread.start() time.sleep(1) # 等待全部線程結束 for thread in threads: thread.join() except Exception as e: print(e) j += 1 print("退出主線程")