python 多進程與多線程配合拷貝文件目錄

版本一:使用shutil進行拷貝多線程

 1 # -*- coding: utf-8 -*-
 2 # @author: Tele
 3 # @Time : 2019/04/02 下午 3:09
 4 # 待改進:
 5 # 1.拷貝邏輯使用原生的io
 6 # 2.針對大文件在進程內部實現多線程方式進行拷貝
 7 
 8 
 9 import time  10 import re  11 import os  12 import shutil  13 import multiprocessing  14 
 15 
 16 # 遍歷文件夾
 17 def walk_file(file):  18     file_list = list()  19     for root, dirs, files in os.walk(file):  20         # 遍歷文件
 21         for f in files:  22  file_list.append(f)  23     return file_list  24 
 25 
 26 # 計算文件數量
 27 def get_file_count(dir):  28     return len(walk_file(dir))  29 
 30 
 31 def copy(src, target, queue):  32     target_number = 1
 33     if os.path.isdir(src):  34         target_number = get_file_count(src)  35  shutil.copytree(src, target)  36     else:  37  shutil.copyfile(src, target)  38     # 將拷貝完成的文件數量放入隊列中
 39  queue.put(target_number)  40 
 41 
 42 def copy_dir(src, desc):  43     total_number = get_file_count(src)  44     # 分隔符檢測
 45     src = check_separator(src)  46     desc = check_separator(desc)  47     # print("src:",src)
 48     # print("desc:",desc)
 49 
 50     file_dir_list = [src + "/" + i for i in os.listdir(src)]  51     if os.path.exists(desc):  52  shutil.rmtree(desc)  53     pool = multiprocessing.Pool(3)  54 
 55     # 建立隊列
 56     queue = multiprocessing.Manager().Queue()  57 
 58     # 一個文件/目錄開啓一個進程去拷貝
 59     for f_name in file_dir_list:  60         target = desc + "/" + f_name[index_list("/", f_name)[1] + 1:]  61         # print(target)
 62         # 建立target目錄
 63         parent_path = os.path.split(target)[0]  64         if not os.path.exists(parent_path):  65  os.makedirs(parent_path)  66         pool.apply_async(copy, args=(f_name, target, queue,))  67 
 68     start = time.time()  69  pool.close()  70     # pool.join()
 71     count = 0  72     while True:  73         count += queue.get()  74         # 格式化輸出時兩個%輸出一個%,不換行,每次定位到行首,實現覆蓋
 75         print("\r拷貝進度爲 %.2f %%" % (count * 100 / total_number), end="")  76         if count >= total_number:  77             break
 78     end = time.time()  79     print()  80     print("耗時-----", (end - start), "s")  81 
 82 
 83 # 查找指定字符出現的所有索引位置
 84 def index_list(c, s):  85     return [i.start() for i in re.finditer(c, s)]  86 
 87 
 88 # 檢測目錄結尾是否有 "/"
 89 def check_separator(path):  90     if path.rindex("/") == len(path) - 1:  91         return path[0:path.rindex("/")]  92     return path  93 
 94 
 95 def main():  96     copy_dir("f:/ftp_mypc/", "e:/ftp_mypc/")  97 
 98 
 99 if __name__ == '__main__': 100     main()

這樣作仍然有些小問題,對於大文件能夠在進程內部採用多線程的方式,能夠看到使用shutil進行拷貝時咱們沒有辦法實現字節切割,因而有了下面的版本二app

 版本二:async

 1 # -*- coding: utf-8 -*-
 2 # @author: Tele
 3 # @Time : 2019/04/02 下午 3:09
 4 # 使用多進程拷貝文件夾,對於大文件進程內部又使用了多線程進行拷貝
 5 # 使用進程池實現多進程時,使用的消息隊列要使用multiprocessing.Manager().Queue()建立
 6 
 7 import time  8 import re  9 import os  10 import shutil  11 import multiprocessing  12 import math  13 from concurrent.futures import ThreadPoolExecutor, wait  14 
 15 # 設置單個文件的最大值:209715200 200M
 16 MAX_SINGLE_FILE_SIZE = 209715200
 17 mutex = multiprocessing.Lock()  18 executor = ThreadPoolExecutor(max_workers=3)  19 
 20 
 21 # 遍歷文件夾
 22 def walk_file(file):  23     file_list = list()  24     for root, dirs, files in os.walk(file):  25         # 遍歷文件
 26         for f in files:  27  file_list.append(f)  28 
 29         # 空文件夾處理
 30         for d in dirs:  31             if len(os.listdir(os.path.join(root, d))) == 0:  32  file_list.append(d)  33     return file_list  34 
 35 
 36 # 計算文件數量
 37 def get_file_count(dir):  38     return len(walk_file(dir))  39 
 40 
 41 def copy(src, target, queue):  42     target_number = 1
 43     buffer = 1024
 44     # 文件夾
 45     if os.path.isdir(src):  46         target_number = get_file_count(src)  47         for root, dirs, files in os.walk(src):  48             # 遍歷文件
 49             for f in files:  50                 drive = os.path.splitdrive(target)[0]  51                 target = drive + os.path.splitdrive(os.path.join(root, f))[1]  52  copy_single_file(buffer, os.path.join(root, f), target)  53             # 空文件夾
 54             for d in dirs:  55                 drive = os.path.splitdrive(target)[0]  56                 target = drive + os.path.splitdrive(os.path.join(root, d))[1]  57                 # 檢查文件的層級目錄
 58                 if not os.path.exists(target):  59  os.makedirs(target)  60     else:  61  copy_single_file(buffer, src, target)  62     # 將拷貝完成的文件數量放入隊列中
 63  queue.put(target_number)  64 
 65 
 66 # 拷貝單文件
 67 def copy_single_file(buffer, src, target):  68     file_size = os.path.getsize(src)  69     rs = open(src, "rb")  70 
 71     # 檢查文件的層級目錄
 72     parent_path = os.path.split(target)[0]  73     if not os.path.exists(parent_path):  74  os.makedirs(parent_path)  75 
 76     ws = open(target, "wb")  77     # 小文件直接讀寫
 78     if file_size <= MAX_SINGLE_FILE_SIZE:  79         while True:  80             content = rs.read(buffer)  81  ws.write(content)  82             if len(content) == 0:  83                 break
 84  ws.flush()  85     else:  86         # 設置每一個線程拷貝的字節數 50M
 87         PER_THREAD_SIZE = 52428800
 88         # 構造參數並執行
 89         task_list = list()  90         for i in range(math.ceil(file_size / PER_THREAD_SIZE)):  91             byte_size = PER_THREAD_SIZE  92             # 最後一個線程拷貝的字節數應該是取模
 93             if i == math.ceil(file_size / PER_THREAD_SIZE) - 1:  94                 byte_size = file_size % PER_THREAD_SIZE  95             start = i * PER_THREAD_SIZE + i  96             t = executor.submit(copy_file_thread, start, byte_size, rs, ws)  97  task_list.append(t)  98  wait(task_list)  99     if rs: 100  rs.close() 101     if ws: 102  ws.close() 103 
104 
105 # 多線程拷貝
106 def copy_file_thread(start, byte_size, rs, ws): 107  mutex.acquire() 108     buffer = 1024
109     count = 0 110  rs.seek(start) 111  ws.seek(start) 112     while True: 113         if count + buffer <= byte_size: 114             content = rs.read(buffer) 115             count += len(content) 116  write(content, ws) 117         else: 118             content = rs.read(byte_size % buffer) 119             count += len(content) 120  write(content, ws) 121             break
122     # global total_count
123     # total_count += byte_size
124     # print("\r拷貝進度爲%.2f %%" % (total_count * 100 / file_size), end="")
125  mutex.release() 126 
127 
128 def write(content, ws): 129  ws.write(content) 130  ws.flush() 131 
132 
133 def copy_dir(src, desc): 134     # 得到待拷貝的文件總數(含空文件夾)
135     total_number = get_file_count(src) 136     # 分隔符檢測
137     src = check_separator(src) 138     desc = check_separator(desc) 139     # print("src:",src)
140     # print("desc:",desc)
141 
142     file_dir_list = [src + "/" + i for i in os.listdir(src)] 143     if os.path.exists(desc): 144  shutil.rmtree(desc) 145 
146     # 進程池
147     pool = multiprocessing.Pool(3) 148 
149     # 建立隊列
150     queue = multiprocessing.Manager().Queue() 151 
152     # 一個文件/目錄開啓一個進程去拷貝
153     for f_name in file_dir_list: 154         target = os.path.splitdrive(desc)[0] + "/" + os.path.splitdrive(f_name)[1] 155         # target = desc + "/" + f_name[index_list("/", f_name)[1] + 1:]
156         # print(target)
157         # 建立target目錄
158         parent_path = os.path.split(target)[0] 159         if not os.path.exists(parent_path): 160  os.makedirs(parent_path) 161         pool.apply_async(copy, args=(f_name, target, queue)) 162 
163     start = time.time() 164  pool.close() 165     # pool.join()
166     count = 0 167     while True: 168         count += queue.get() 169         # 格式化輸出時兩個%輸出一個%,不換行,每次定位到行首,實現覆蓋
170         print("\r當前進度爲 %.2f %%" % (count * 100 / total_number), end="") 171         if count >= total_number: 172             break
173 
174  executor.shutdown() 175     end = time.time() 176     print() 177     print("耗時-----", (end - start), "s") 178 
179 
180 # 查找指定字符出現的所有索引位置
181 def index_list(c, s): 182     return [i.start() for i in re.finditer(c, s)] 183 
184 
185 # 檢測目錄結尾是否有 "/"
186 def check_separator(path): 187     if path.rindex("/") == len(path) - 1: 188         return path[0:path.rindex("/")] 189     return path 190 
191 
192 def main(): 193     copy_dir("f:/ftp_mypc/", "e:/ftp_mypc/") 194 
195 
196 if __name__ == '__main__': 197     main()
相關文章
相關標籤/搜索