1 # -*- coding: utf-8 -*-
2 # @author: Tele
3 # @Time : 2019/04/04 下午 12:25
4 # 多線程方式拷貝單個文件
5 import threading 6 import os 7 import math 8
9 rs = open("F:/ftp_mypc/a.flv", "rb") 10 # 62919061 60MB
11 file_size = os.path.getsize("F:/ftp_mypc/a.flv") 12 if os.path.exists("f:/b/b.flv"): 13 os.remove("f:/b/b.flv") 14 ws = open("f:/b/b.flv", "ab") 15 mutex = threading.Lock() 16 total_count = 0 17
18
19 def copy(start, byte_size): 20 # print(threading.current_thread().getName())
21 mutex.acquire() 22 buffer = 1024
23 count = 0 24 rs.seek(start) 25 ws.seek(start) 26 while True: 27 if count + buffer <= byte_size: 28 content = rs.read(buffer) 29 count += len(content) 30 write(content) 31 else: 32 content = rs.read(byte_size % buffer) 33 count += len(content) 34 write(content) 35 break
36 global total_count 37 total_count += byte_size 38 print("\r拷貝進度爲%.2f %%" % (total_count * 100 / file_size), end="") 39 mutex.release() 40
41
42 def write(content): 43 ws.write(content) 44 ws.flush() 45
46
47 def main(): 48 # 每一個線程拷貝的字節大小
49 per_thread_size = 30000000
50 for i in range(math.ceil(file_size / per_thread_size)): 51 byte_size = per_thread_size 52 if i == math.ceil(file_size / per_thread_size) - 1: 53 byte_size = file_size % per_thread_size 54 start = i * per_thread_size + i 55 t = threading.Thread(target=copy, args=(start, byte_size)) 56 t.start() 57
58 # t1 = threading.Thread(target=copy, args=(0, 30000000))
59 # t2 = threading.Thread(target=copy, args=(30000001, 30000000))
60 # t3 = threading.Thread(target=copy, args=(60000002, 2919061))
61 # t1.start()
62 # t2.start()
63 # t3.start()
64
65 # 子線程都結束後,釋放資源
66 if threading.activeCount() == 1: 67 if ws: 68 ws.close() 69 if rs: 70 rs.close() 71
72
73 if __name__ == '__main__': 74 main()
使用線程池:多線程
1 # -*- coding: utf-8 -*-
2 # @author: Tele
3 # @Time : 2019/04/04 下午 12:25
4 # 多線程方式拷貝單個文件,使用concurrent.futures.ThreadPoolExecutor線程池
5 import threading 6 import os 7 import math 8 from concurrent.futures import ThreadPoolExecutor, wait 9
10 rs = open("F:/ftp_mypc/a.flv", "rb") 11 # 62919061 60MB
12 file_size = os.path.getsize("F:/ftp_mypc/a.flv") 13 if os.path.exists("f:/b/b.flv"): 14 os.remove("f:/b/b.flv") 15 ws = open("f:/b/b.flv", "ab") 16 mutex = threading.Lock() 17 total_count = 0 18
19
20 def copy(start, byte_size): 21 # print(threading.current_thread().getName())
22 mutex.acquire() 23 buffer = 1024
24 count = 0 25 rs.seek(start) 26 ws.seek(start) 27 while True: 28 if count + buffer <= byte_size: 29 content = rs.read(buffer) 30 count += len(content) 31 write(content) 32 else: 33 content = rs.read(byte_size % buffer) 34 count += len(content) 35 write(content) 36 break
37 global total_count 38 total_count += byte_size 39 print("\r拷貝進度爲%.2f %%" % (total_count * 100 / file_size), end="") 40 mutex.release() 41
42
43 def write(content): 44 ws.write(content) 45 ws.flush() 46
47
48 def main(): 49 # 建立線程池
50 executor = ThreadPoolExecutor(max_workers=3) 51
52 # 構造參數列表
53 params_list = list() 54 per_thread_size = 30000000
55 for i in range(math.ceil(file_size / per_thread_size)): 56 byte_size = per_thread_size 57 if i == math.ceil(file_size / per_thread_size) - 1: 58 byte_size = file_size % per_thread_size 59 start = i * per_thread_size + i 60 params_list.append((start, byte_size)) 61
62 all_task = [executor.submit(copy, *params) for params in params_list] 63 # 等待任務完成
64 wait(all_task) 65 if ws: 66 ws.close() 67 if rs: 68 rs.close() 69
70
71 if __name__ == '__main__': 72 main()