python-- python threadpool 的前世此生

引出

首先須要瞭解的是threadpool 的用途,他更適合於用到一些大量的短任務合集,而非一些時間長的任務,換句話說,適合大量的CPU密集型短任務,那些消耗時間較長的IO密集型長任務適合用協程去解決。html

目前,python 標準庫(特指python2.X)中的threadpool模塊是在 multiprocessing.pool.threadpool,或者multiprocessing.dummy.ThreadPool(dummy模塊是針對threading 多線程的進一步封裝)。該模塊有個缺點就是在全部線程執行完以前沒法強制退出。實現原理大同小異:實例化pool的時候會建立指定數目的線程,把task 傳給一個task-queue,線程會讀取task-queue 的task,沒有就阻塞,讀取到後就執行,並將結果交給一個result-queue。python

除了標準庫中的threadpool,還有一些使用比較多的threadpool,如下展開。git

pip 中的 ThreadPool

安裝簡單:pip install threadpool
使用以下:github

1
2
3
4
pool = ThreadPool(poolsize)   # 定義線程池,指定線程數量
requests = makeRequests(some_callable, list_of_args, callback) # 調用makeRequests建立了要開啓多線程的函數,以及函數相關參數和回調函數
[pool.putRequest(req) for req in requests] # 全部要運行多線程的請求扔進線程池
pool.wait() # 等待全部線程完成後退出

原理相似,源碼解讀能夠參考python——有一種線程池叫作本身寫的線程池 ,該博客還給出了對其的一些優化。多線程

本身定製 threadpool

根據須要的功能定製適合本身的threadpool 也是一種常見的手段,經常使用的功能好比:是否須要返回線程執行後的返回值,線程執行完以後銷燬仍是阻塞等等。如下爲本身常常用的的一個比較簡潔的threadpool,感謝@kaito-kidd提供,源碼:函數

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# coding: utf8

"""
線程池,用於高效執行某些任務。
"""

import Queue
import threading


class Task(threading.Thread):

""" 任務 """

def __init__(self, num, input_queue, output_queue, error_queue):
super(Task, self).__init__()
self.thread_name = "thread-%s" % num
self.input_queue = input_queue
self.output_queue = output_queue
self.error_queue = error_queue
self.deamon = True

def run(self):
"""run
"""
while 1:
try:
func, args = self.input_queue.get(block=False)
except Queue.Empty:
print "%s finished!" % self.thread_name
break
try:
result = func(*args)
except Exception as exc:
self.error_queue.put((func.func_name, args, str(exc)))
else:
self.output_queue.put(result)


class Pool(object):

""" 線程池 """

def __init__(self, size):
self.input_queue = Queue.Queue()
self.output_queue = Queue.Queue()
self.error_queue = Queue.Queue()
self.tasks = [
Task(i, self.input_queue, self.output_queue,
self.error_queue) for i in range(size)
]

def add_task(self, func, args):
"""添加單個任務
"""
if not isinstance(args, tuple):
raise TypeError("args must be tuple type!")
self.input_queue.put((func, args))

def add_tasks(self, tasks):
"""批量添加任務
"""
if not isinstance(tasks, list):
raise TypeError("tasks must be list type!")
for func, args in tasks:
self.add_task(func, args)

def get_results(self):
"""獲取執行結果集
"""
while not self.output_queue.empty():
print "Result: ", self.output_queue.get()

def get_errors(self):
"""獲取執行失敗的結果集
"""
while not self.error_queue.empty():
func, args, error_info = self.error_queue.get()
print "Error: func: %s, args : %s, error_info : %s" \
% (func.func_name, args, error_info)

def run(self):
"""執行
"""
for task in self.tasks:
task.start()
for task in self.tasks:
task.join()


def test(i):
"""test """
result = i * 10
return result


def main():
""" main """
pool = Pool(size=5)
pool.add_tasks([(test, (i,)) for i in range(100)])
pool.run()


if __name__ == "__main__":
main()

 

閱讀原文post

相關文章
相關標籤/搜索