版本一:使用shutil進行拷貝多線程
1 # -*- coding: utf-8 -*-
2 # @author: Tele
3 # @Time : 2019/04/02 下午 3:09
4 # 待改進:
5 # 1.拷貝邏輯使用原生的io
6 # 2.針對大文件在進程內部實現多線程方式進行拷貝
7
8
9 import time 10 import re 11 import os 12 import shutil 13 import multiprocessing 14
15
16 # 遍歷文件夾
17 def walk_file(file): 18 file_list = list() 19 for root, dirs, files in os.walk(file): 20 # 遍歷文件
21 for f in files: 22 file_list.append(f) 23 return file_list 24
25
26 # 計算文件數量
27 def get_file_count(dir): 28 return len(walk_file(dir)) 29
30
31 def copy(src, target, queue): 32 target_number = 1
33 if os.path.isdir(src): 34 target_number = get_file_count(src) 35 shutil.copytree(src, target) 36 else: 37 shutil.copyfile(src, target) 38 # 將拷貝完成的文件數量放入隊列中
39 queue.put(target_number) 40
41
42 def copy_dir(src, desc): 43 total_number = get_file_count(src) 44 # 分隔符檢測
45 src = check_separator(src) 46 desc = check_separator(desc) 47 # print("src:",src)
48 # print("desc:",desc)
49
50 file_dir_list = [src + "/" + i for i in os.listdir(src)] 51 if os.path.exists(desc): 52 shutil.rmtree(desc) 53 pool = multiprocessing.Pool(3) 54
55 # 建立隊列
56 queue = multiprocessing.Manager().Queue() 57
58 # 一個文件/目錄開啓一個進程去拷貝
59 for f_name in file_dir_list: 60 target = desc + "/" + f_name[index_list("/", f_name)[1] + 1:] 61 # print(target)
62 # 建立target目錄
63 parent_path = os.path.split(target)[0] 64 if not os.path.exists(parent_path): 65 os.makedirs(parent_path) 66 pool.apply_async(copy, args=(f_name, target, queue,)) 67
68 start = time.time() 69 pool.close() 70 # pool.join()
71 count = 0 72 while True: 73 count += queue.get() 74 # 格式化輸出時兩個%輸出一個%,不換行,每次定位到行首,實現覆蓋
75 print("\r拷貝進度爲 %.2f %%" % (count * 100 / total_number), end="") 76 if count >= total_number: 77 break
78 end = time.time() 79 print() 80 print("耗時-----", (end - start), "s") 81
82
83 # 查找指定字符出現的所有索引位置
84 def index_list(c, s): 85 return [i.start() for i in re.finditer(c, s)] 86
87
88 # 檢測目錄結尾是否有 "/"
89 def check_separator(path): 90 if path.rindex("/") == len(path) - 1: 91 return path[0:path.rindex("/")] 92 return path 93
94
95 def main(): 96 copy_dir("f:/ftp_mypc/", "e:/ftp_mypc/") 97
98
99 if __name__ == '__main__': 100 main()
這樣作仍然有些小問題,對於大文件能夠在進程內部採用多線程的方式,能夠看到使用shutil進行拷貝時咱們沒有辦法實現字節切割,因而有了下面的版本二app
版本二:async
1 # -*- coding: utf-8 -*-
2 # @author: Tele
3 # @Time : 2019/04/02 下午 3:09
4 # 使用多進程拷貝文件夾,對於大文件進程內部又使用了多線程進行拷貝
5 # 使用進程池實現多進程時,使用的消息隊列要使用multiprocessing.Manager().Queue()建立
6
7 import time 8 import re 9 import os 10 import shutil 11 import multiprocessing 12 import math 13 from concurrent.futures import ThreadPoolExecutor, wait 14
15 # 設置單個文件的最大值:209715200 200M
16 MAX_SINGLE_FILE_SIZE = 209715200
17 mutex = multiprocessing.Lock() 18 executor = ThreadPoolExecutor(max_workers=3) 19
20
21 # 遍歷文件夾
22 def walk_file(file): 23 file_list = list() 24 for root, dirs, files in os.walk(file): 25 # 遍歷文件
26 for f in files: 27 file_list.append(f) 28
29 # 空文件夾處理
30 for d in dirs: 31 if len(os.listdir(os.path.join(root, d))) == 0: 32 file_list.append(d) 33 return file_list 34
35
36 # 計算文件數量
37 def get_file_count(dir): 38 return len(walk_file(dir)) 39
40
41 def copy(src, target, queue): 42 target_number = 1
43 buffer = 1024
44 # 文件夾
45 if os.path.isdir(src): 46 target_number = get_file_count(src) 47 for root, dirs, files in os.walk(src): 48 # 遍歷文件
49 for f in files: 50 drive = os.path.splitdrive(target)[0] 51 target = drive + os.path.splitdrive(os.path.join(root, f))[1] 52 copy_single_file(buffer, os.path.join(root, f), target) 53 # 空文件夾
54 for d in dirs: 55 drive = os.path.splitdrive(target)[0] 56 target = drive + os.path.splitdrive(os.path.join(root, d))[1] 57 # 檢查文件的層級目錄
58 if not os.path.exists(target): 59 os.makedirs(target) 60 else: 61 copy_single_file(buffer, src, target) 62 # 將拷貝完成的文件數量放入隊列中
63 queue.put(target_number) 64
65
66 # 拷貝單文件
67 def copy_single_file(buffer, src, target): 68 file_size = os.path.getsize(src) 69 rs = open(src, "rb") 70
71 # 檢查文件的層級目錄
72 parent_path = os.path.split(target)[0] 73 if not os.path.exists(parent_path): 74 os.makedirs(parent_path) 75
76 ws = open(target, "wb") 77 # 小文件直接讀寫
78 if file_size <= MAX_SINGLE_FILE_SIZE: 79 while True: 80 content = rs.read(buffer) 81 ws.write(content) 82 if len(content) == 0: 83 break
84 ws.flush() 85 else: 86 # 設置每一個線程拷貝的字節數 50M
87 PER_THREAD_SIZE = 52428800
88 # 構造參數並執行
89 task_list = list() 90 for i in range(math.ceil(file_size / PER_THREAD_SIZE)): 91 byte_size = PER_THREAD_SIZE 92 # 最後一個線程拷貝的字節數應該是取模
93 if i == math.ceil(file_size / PER_THREAD_SIZE) - 1: 94 byte_size = file_size % PER_THREAD_SIZE 95 start = i * PER_THREAD_SIZE + i 96 t = executor.submit(copy_file_thread, start, byte_size, rs, ws) 97 task_list.append(t) 98 wait(task_list) 99 if rs: 100 rs.close() 101 if ws: 102 ws.close() 103
104
105 # 多線程拷貝
106 def copy_file_thread(start, byte_size, rs, ws): 107 mutex.acquire() 108 buffer = 1024
109 count = 0 110 rs.seek(start) 111 ws.seek(start) 112 while True: 113 if count + buffer <= byte_size: 114 content = rs.read(buffer) 115 count += len(content) 116 write(content, ws) 117 else: 118 content = rs.read(byte_size % buffer) 119 count += len(content) 120 write(content, ws) 121 break
122 # global total_count
123 # total_count += byte_size
124 # print("\r拷貝進度爲%.2f %%" % (total_count * 100 / file_size), end="")
125 mutex.release() 126
127
128 def write(content, ws): 129 ws.write(content) 130 ws.flush() 131
132
133 def copy_dir(src, desc): 134 # 得到待拷貝的文件總數(含空文件夾)
135 total_number = get_file_count(src) 136 # 分隔符檢測
137 src = check_separator(src) 138 desc = check_separator(desc) 139 # print("src:",src)
140 # print("desc:",desc)
141
142 file_dir_list = [src + "/" + i for i in os.listdir(src)] 143 if os.path.exists(desc): 144 shutil.rmtree(desc) 145
146 # 進程池
147 pool = multiprocessing.Pool(3) 148
149 # 建立隊列
150 queue = multiprocessing.Manager().Queue() 151
152 # 一個文件/目錄開啓一個進程去拷貝
153 for f_name in file_dir_list: 154 target = os.path.splitdrive(desc)[0] + "/" + os.path.splitdrive(f_name)[1] 155 # target = desc + "/" + f_name[index_list("/", f_name)[1] + 1:]
156 # print(target)
157 # 建立target目錄
158 parent_path = os.path.split(target)[0] 159 if not os.path.exists(parent_path): 160 os.makedirs(parent_path) 161 pool.apply_async(copy, args=(f_name, target, queue)) 162
163 start = time.time() 164 pool.close() 165 # pool.join()
166 count = 0 167 while True: 168 count += queue.get() 169 # 格式化輸出時兩個%輸出一個%,不換行,每次定位到行首,實現覆蓋
170 print("\r當前進度爲 %.2f %%" % (count * 100 / total_number), end="") 171 if count >= total_number: 172 break
173
174 executor.shutdown() 175 end = time.time() 176 print() 177 print("耗時-----", (end - start), "s") 178
179
180 # 查找指定字符出現的所有索引位置
181 def index_list(c, s): 182 return [i.start() for i in re.finditer(c, s)] 183
184
185 # 檢測目錄結尾是否有 "/"
186 def check_separator(path): 187 if path.rindex("/") == len(path) - 1: 188 return path[0:path.rindex("/")] 189 return path 190
191
192 def main(): 193 copy_dir("f:/ftp_mypc/", "e:/ftp_mypc/") 194
195
196 if __name__ == '__main__': 197 main()