因爲python的GIL致使在處理計算密集型任務時,會比單進程處理還要慢,最好的方法是使用多進程來進行處理,每一個進程處理任務的一部分。html
import signal from multiprocessing import Process import os import time # 啓動進程數設置爲4 sub_process_count = 4 # 待處理任務列表 all_task_list = [x for x in range(20)] # 子進程的SIGTERM信號處理函數 def on_sub_process_sigterm(signo, frame): global sub_task_id global run_flag # 收到SIGTERM信號後修改運行標誌,子進程處理函數循環將會退出,最後子進程安全退出 run_flag = False # 子進程的處理函數 def sub_process_func(task_id): global sub_task_id global run_flag sub_task_id = task_id run_flag = True pid = os.getpid() print('[{}][{}] 子進程已啓動'.format(sub_task_id, pid)) for task in all_task_list[task_id::sub_process_count]: #使用python的list的切片和步長屬性,確保每一個進程處理任務列表中的一部分 if not run_flag: print('[{}][{}] 子進程收到退出標誌 '.format(sub_task_id, pid)) break print('[{}][{}] 子進程正在處理任務 {} '.format(sub_task_id, pid, task)) time.sleep(10) print('[{}][{}] 子進程安全退出'.format(sub_task_id, pid)) # 主進程的SIGTERM處理函數 def on_main_process_sigterm(signo, frame): global sub_process_list # 給全部子進程發送SIGTERM信號 for (task_id, p) in sub_process_list: p.terminate() if __name__ == '__main__': #存儲全部的子進程 global sub_process_list sub_process_list = [] # 註冊SIGTERM信號捕捉函數,來管理子進程的退出 signal.signal(signal.SIGTERM, on_main_process_sigterm) # 啓動子進程 for task_id in range(sub_process_count): p = Process(target=sub_process_func, args=(task_id,)) p.start() sub_process_list.append( (task_id, p) ) print('全部子進程已經啓動...') print('輸入下面的命令來退出全部進程:\nkill -SIGTERM {}'.format(os.getpid())) # 等待子進程安全退出 for (task_id, p) in sub_process_list: p.join() print('主進程安全退出') import signal from multiprocessing import Process import os import time # 啓動進程數設置爲4 sub_process_count = 4 # 待處理任務列表 all_task_list = [x for x in range(20)] # 子進程的SIGTERM信號處理函數 def on_sub_process_sigterm(signo, frame): global sub_task_id global run_flag # 收到SIGTERM信號後修改運行標誌,子進程處理函數循環將會退出,最後子進程安全退出 run_flag = False # 子進程的處理函數 def sub_process_func(task_id): global sub_task_id global run_flag sub_task_id = task_id run_flag = True pid = os.getpid() print('[{}][{}] 子進程已啓動'.format(sub_task_id, pid)) for task in all_task_list[task_id::sub_process_count]: #使用python的list的切片和步長屬性,確保每一個進程處理任務列表中的一部分 if not run_flag: print('[{}][{}] 子進程收到退出標誌 '.format(sub_task_id, pid)) break print('[{}][{}] 子進程正在處理任務 {} '.format(sub_task_id, pid, task)) time.sleep(10) print('[{}][{}] 子進程安全退出'.format(sub_task_id, pid)) # 主進程的SIGTERM處理函數 def on_main_process_sigterm(signo, frame): global sub_process_list # 給全部子進程發送SIGTERM信號 for (task_id, p) in sub_process_list: p.terminate() if __name__ == '__main__': #存儲全部的子進程 global sub_process_list sub_process_list = [] # 註冊SIGTERM信號捕捉函數,來管理子進程的退出 signal.signal(signal.SIGTERM, on_main_process_sigterm) # 啓動子進程 for task_id in range(sub_process_count): p = Process(target=sub_process_func, args=(task_id,)) p.start() sub_process_list.append( (task_id, p) ) print('全部子進程已經啓動...') print('輸入下面的命令來退出全部進程:\nkill -SIGTERM {}'.format(os.getpid())) # 等待子進程安全退出 for (task_id, p) in sub_process_list: p.join() print('主進程安全退出')
sub_process_count = 4 all_task_list = [x for x in range(20)] def sub_process_func(task_id): task_list = [] for task in all_task_list[task_id::sub_process_count]: # print('{}: {}'.format(task_id, task)) task_list.append(task) return task_list if __name__ == '__main__': all_task_list2 = [] for task_id in range(sub_process_count): task_list = sub_process_func(task_id) all_task_list2 += task_list assert len(all_task_list2) == len(all_task_list) #任務數量是一致的 assert set(all_task_list2) == set(all_task_list) #任務長度也是一致的