多線程拉取impala數據到本地

import threading
import time
from impala.dbapi import connect
import pandas as pd
import os


class MyThread(threading.Thread):
    def __init__(self, threadid, name, day, min):
        threading.Thread.__init__(self)
        self.threadID = threadid
        self.name = name
        self.day = day
        self.min = min

    def run(self):
        print("開始線程:" + self.name)
        run_task(self.name, self.day, self.min)
        print("退出線程:" + self.name)


def run_task(thread_name, day, min):
    hour = 24
    while hour:
        if hour < 11:
            _hour = '0' + str(hour - 1)
        else:
            _hour = hour - 1
        try:
            query_and_write(day, _hour, min)
        except Exception as e:
            print(str(day) + str(_hour) + '-----need--rey--try-' + str(e))
            time.sleep(10)
            continue
        time.sleep(2)
        print("%s: %s" % (thread_name, time.ctime(time.time())))
        hour -= 1


def impala_conn_exec(sql):
    conn = connect(host='*.*.*.*', port=21050)
    cur = conn.cursor()
    cur.execute(sql)
    cur.close
    result = cur.fetchall()
    return result


def query_and_write(_date, _hour, _min):
    """
    初始化sql,調用查詢並寫入CSV
    :param _date:
    :param _time:
    :return:
    """
    sql = "select * " \
          "from 表名 m where date= '{0}' " \
          "and substr(cast(time as string),1,15) = '{0} {1}:{2}'/*SA(production)*/".format(_date, _hour, _min)
    print(sql)
    result = impala_conn_exec(sql)
    # result = []
    df = pd.DataFrame(data=result)
    path = _date
    if os.path.exists(path):
        pass
    else:
        os.mkdir(path)
    file_name = "data_{0}-{1}-{2}.csv".format(_date, _hour, _min)
    df.to_csv(path + '/' + file_name, encoding="utf-8", index=False)


# 按日期範圍循環 遍歷區間開閉示意: [起始日期,截止日期)
for j in range(5, 7):
    daterange = "2020-02"
    if j < 10:
        daterange = daterange + "-0" + str(j)
    else:
        daterange = daterange + "-" + str(j)
    print(daterange)

    # 建立新線程
    threads = []
    for i in range(0, 6):
        if i < 10:
            id = "thread0" + str(i)
        else:
            id = "thread" + str(i)
        threadName = MyThread(1, id, daterange, i)
        threads.append(threadName)
    try:
        # 啓動線程
        for thread in threads:
            thread.setDaemon(True)
            thread.start()
            time.sleep(1)

        # 等待全部線程結束
        for thread in threads:
            thread.join()

    except Exception as e:
        print(e)

    j += 1

print("退出主線程")
相關文章
相關標籤/搜索