python 多進程和協程配合使用

1、需求分析

有一批key已經寫入到3個txt文件中,每個txt文件有30萬行記錄。
如今須要讀取這些txt文件,判斷key是否在數據倉庫中。(redis或者mysql)python

爲空的記錄,須要寫入到日誌文件中!mysql

 

任務分工

1. 使用多進程技術,每個進程讀取一個txt文件redis

2. 使用協程技術,批量讀取txt文件記錄。好比一次性讀取 2000條記錄sql

 

注意:打開文件操做,最好在一個進程中,重複打開文件,會形成系統資源浪費!多線程

 

2、完整代碼

#!/usr/bin/env python3
# coding: utf-8
"""
多線程和協程配合使用示例
"""

import os
import time
from gevent import monkey;monkey.patch_all()
from gevent.pool import Pool
from functools import partial
from multiprocessing import Process

COROUTINE_NUMBER = 2000  # 協程池數量
pool = Pool(COROUTINE_NUMBER)  # 使用協程池

# 模擬數據倉庫,測試數據
data_dict = {"1":"x1","3":"x3","5":"x5","7":"x7","9":"x9"}

class TestProgram(object):  # 測試程序
    def __init__(self):
        self.BASE_DIR = os.path.dirname(os.path.abspath(__file__))  # 項目根目錄

    def write_log(self,number, content, colour='white', skip=False):
        """
        寫入日誌文件
        :param content: 寫入內容
        :param colour: 顏色
        :param skip: 是否跳過打印時間
        :return:
        """
        # 顏色代碼
        colour_dict = {
            'red': 31,  # 紅色
            'green': 32,  # 綠色
            'yellow': 33,  # 黃色
            'blue': 34,  # 藍色
            'purple_red': 35,  # 紫紅色
            'bluish_blue': 36,  # 淺藍色
            'white': 37,  # 白色
        }
        choice = colour_dict.get(colour)  # 選擇顏色

        path = os.path.join(self.BASE_DIR, "output_%s.log" % number)  # 日誌文件
        with open(path, mode='a+', encoding='utf-8') as f:
            if skip is False:  # 不跳過打印時間時
                content = time.strftime('%Y-%m-%d %H:%M:%S') + ' ' + content

            info = "\033[1;{};1m{}\033[0m".format(choice, content)
            print(info)
            f.write(content + "\n")

    def has_null(self, key, number):
        """
        輸出key
        :param key: 鍵值
        :param number: 文件標記
        :return: bool
        """
        key = key.strip()
        if not data_dict.get(key):
            self.write_log(number,"錯誤,{} 記錄爲空".format(key),"red")
            return False

        print(key)
        return True

    def read_file(self, number):
        """
        讀取文件
        :param number: 文件標記
        :return:
        """
        file_name = os.path.join(self.BASE_DIR, "data", "%s.txt" % number)
        # print(file_name)
        self.write_log(number, "開始讀取文件 {}".format(file_name),"green")
        with open(file_name, encoding='utf-8') as f:
            # 使用協程池,執行任務。語法: pool.map(func,iterator)
            # partial使用偏函數傳遞參數
            # 注意:has_null第一個參數,必須是迭代器遍歷的值
            pool.map(partial(self.has_null, number=number), f)

        self.write_log(number, "結束文件讀取 {} 完成".format(file_name),"green")
        return True

    def run(self, number):
        """
        讀取指定的文件,判斷每個key是否爲空
        :param number:
        :return:
        """
        startime = time.time()  # 開始時間

        # 清空日誌
        path = os.path.join(self.BASE_DIR, "output_%s.log" % number)  # 日誌文件
        with open(path, mode='w') as f:
            pass

        self.read_file(number)

        endtime = time.time()
        take_time = endtime - startime

        if take_time < 1:  # 判斷不足1秒時
            take_time = 1  # 設置爲1秒
        # 計算花費時間
        m, s = divmod(take_time, 60)
        h, m = divmod(m, 60)

        self.write_log(number, "%s.txt 花費時間 %02d:%02d:%02d" % (number,h, m, s),"green")

    def main(self):
        """
        使用多線程執行程序
        :return:
        """
        # 文件標記列表
        file_list = ["7001", "7002", "7003"]

        p_lst = []  # 線程列表
        for i in file_list:
            # self.run(i)
            p = Process(target=self.run, args=(i,))  # 子進程調用函數
            p.start()  # 啓動子進程
            p_lst.append(p)  # 將全部進程寫入列表中

        for p in p_lst: p.join()  # 檢測p是否結束,若是沒有結束就阻塞直到結束,不然不阻塞


TestProgram().main()  # 啓動主程序,它會開啓3個進程。
View Code

 

執行輸出:app

相關文章
相關標籤/搜索