最近我在用梯度降低算法繪製神經網絡的數據時,遇到了一些算法性能的問題。梯度降低算法的代碼以下(僞代碼):python
1
2
3
|
def
gradient_descent
(
)
:
# the gradient descent code
plotly
.
write
(
X
,
Y
)
|
通常來講,當網絡請求 plot.ly 繪圖時會阻塞等待返回,因而也會影響到其餘的梯度降低函數的執行速度。git
一種解決辦法是每調用一次 plotly.write 函數就開啓一個新的線程,可是這種方法感受不是很好。 我不想用一個像 cerely(一種分佈式任務隊列)同樣大而全的任務隊列框架,由於框架對於個人這點需求來講過重了,而且個人繪圖也並不須要 redis 來持久化數據。程序員
那用什麼辦法解決呢?我在 python 中寫了一個很小的任務隊列,它能夠在一個單獨的線程中調用 plotly.write函數。下面是程序代碼。github
1
2
3
4
5
|
from
threading
import
Thread
import
Queue
import
time
class
TaskQueue
(
Queue
.
Queue
)
:
|
首先咱們繼承 Queue.Queue 類。從 Queue.Queue 類能夠繼承 get 和 put 方法,以及隊列的行爲。web
1
2
3
4
|
def
__init__
(
self
,
num_workers
=
1
)
:
Queue
.
Queue
.
__init__
(
self
)
self
.
num_workers
=
num_workers
self
.
start_workers
(
)
|
初始化的時候,咱們能夠不用考慮工做線程的數量。redis
1
2
3
4
|
def
add_task
(
self
,
task
,
*
args
,
*
*
kwargs
)
:
args
=
args
or
(
)
kwargs
=
kwargs
or
{
}
self
.
put
(
(
task
,
args
,
kwargs
)
)
|
咱們把 task, args, kwargs 以元組的形式存儲在隊列中。*args 能夠傳遞數量不等的參數,**kwargs 能夠傳遞命名參數。算法
1
2
3
4
5
|
def
start_workers
(
self
)
:
for
i
in
range
(
self
.
num_workers
)
:
t
=
Thread
(
target
=
self
.
worker
)
t
.
daemon
=
True
t
.
start
(
)
|
咱們爲每一個 worker 建立一個線程,而後在後臺刪除。數據庫
下面是 worker 函數的代碼:編程
1
2
3
4
5
6
|
def
worker
(
self
)
:
while
True
:
tupl
=
self
.
get
(
)
item
,
args
,
kwargs
=
self
.
get
(
)
item
(
*
args
,
*
*
kwargs
)
self
.
task_done
(
)
|
worker 函數獲取隊列頂端的任務,並根據輸入參數運行,除此以外,沒有其餘的功能。下面是隊列的代碼:緩存
咱們能夠經過下面的代碼測試:
1
2
3
4
5
6
7
8
9
10
11
12
|
def
blokkah
(
*
args
,
*
*
kwargs
)
:
time
.
sleep
(
5
)
print
「
Blokkah
mofo
!」
q
=
TaskQueue
(
num_workers
=
5
)
for
item
in
range
(
1
)
:
q
.
add_task
(
blokkah
)
q
.
join
(
)
# wait for all the tasks to finish.
print
「
All
done
!」
|
Blokkah 是咱們要作的任務名稱。隊列已經緩存在內存中,而且沒有執行不少任務。下面的步驟是把主隊列當作單獨的進程來運行,這樣主程序退出以及執行數據庫持久化時,隊列任務不會中止運行。可是這個例子很好地展現瞭如何從一個很簡單的小任務寫成像工做隊列這樣複雜的程序。
1
2
3
|
def
gradient_descent
(
)
:
# the gradient descent code
queue
.
add_task
(
plotly
.
write
,
x
=
X
,
y
=
Y
)
|
修改以後,個人梯度降低算法工做效率彷佛更高了。若是你很感興趣的話,能夠參考下面的代碼。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
from
threading
import
Thread
import
Queue
import
time
class
TaskQueue
(
Queue
.
Queue
)
:
def
__init__
(
self
,
num_workers
=
1
)
:
Queue
.
Queue
.
__init__
(
self
)
self
.
num_workers
=
num_workers
self
.
start_workers
(
)
def
add_task
(
self
,
task
,
*
args
,
*
*
kwargs
)
:
args
=
args
or
(
)
kwargs
=
kwargs
or
{
}
self
.
put
(
(
task
,
args
,
kwargs
)
)
def
start_workers
(
self
)
:
for
i
in
range
(
self
.
num_workers
)
:
t
=
Thread
(
target
=
self
.
worker
)
t
.
daemon
=
True
t
.
start
(
)
def
worker
(
self
)
:
while
True
:
tupl
=
self
.
get
(
)
item
,
args
,
kwargs
=
self
.
get
(
)
item
(
*
args
,
*
*
kwargs
)
self
.
task_done
(
)
def
tests
(
)
:
def
blokkah
(
*
args
,
*
*
kwargs
)
:
time
.
sleep
(
5
)
print
"Blokkah mofo!"
q
=
TaskQueue
(
num_workers
=
5
)
for
item
in
range
(
10
)
:
q
.
add_task
(
blokkah
)
q
.
join
(
)
# block until all tasks are done
print
"All done!"
if
__name__
==
"__main__"
:
tests
(
)
|
問啊-一鍵呼叫程序員答題神器,牛人一對一服務,開發者編程必備官方網站:www.wenaaa.com
QQ羣290551701 彙集不少互聯網精英,技術總監,架構師,項目經理!開源技術研究,歡迎業內人士,大牛及新手有志於從事IT行業人員進入!