多線程 Threading Multiprocessing(Python)

多線程是加速程序計算的有效方式,Python的多線程模塊threading上手快速簡單,學習莫煩多線程教程動手操做了一遍,這裏記錄一下。python

1 Threading

1.1 添加線程

import threading
#獲取已激活的線程數
print(threading.active_count())  #1

#查看全部線程信息
print(threading.enumerate())  #[<_MainThread(MainThread, started 18496)>]

#查看如今正在運行的線程
print(threading.current_thread()) #<_MainThread(MainThread, started 18496)>


import threading

def thread_job():
    print('This is a thread of %s' % threading.current_thread())

def runMain():
    thread = threading.Thread(target=thread_job,) #定義線程
    thread.start()  #線程開始工做

if __name__ == '__main__':
    runMain()

#輸出
This is a thread of <Thread(Thread-1, started 12324)>

1.2 join功能

不加join功能,線程任務還未完成便輸出all done。git

import threading
import time

def thread_job():
    print('T1 start.\n')
    for i in range(10):
        time.sleep(0.1)  #任務間隔0.1秒
    print('T1 finish.\n')

add_thread = threading.Thread(target=thread_job, name='T1')
add_thread.start()

print('all done.\n')
#輸出
T1 start.
all done.
T1 finish.

若要遵循順序,在啓動線程後調用join , 使用join控制多個線程的執行順序,效果以下。github

import threading
import time

def thread_job():
    print('T1 start.\n')
    for i in range(10):
        time.sleep(0.1)  #任務間隔0.1秒
    print('T1 finish.\n')

add_thread = threading.Thread(target=thread_job, name='T1')
add_thread.start()
add_thread.join()

print('all done.\n')
#輸出
T1 start.
T1 finish.
all done.

1.3 存儲進程結果Queue

將數據列表中的數據傳入,使用四個線程處理,將結果保存在Queue中,線程執行完後,從Queue中獲取存儲的結果多線程

#導入線程 隊列的標準模塊
import threading
import time
from queue import Queue

定義一個被多線程調用的函數:函數的參數時一個列表l和一個隊列q,函數的功能是對列表的每一個元素進行平方計算,將結果保存在隊列中app

def job(l,q):
    for i in range(len(l)):
        l[i] = l[i]**2
    q.put(l)

定義一個多線程函數:在多線程函數中定義一個Queue ,用來保存返回值 ,代替return ,定義一個多線程列表 ,初始化一個多維數據列表async

def mulithreading():
    q = Queue() #q中存放返回值 代替return的返回值
    threads = []
    data = [[1,2,3],[3,4,5],[4,4,4],[5,5,5]]

在多線程函數中定義四個線程,啓動線程,將每一個線程添加到多線程的列表中函數

for i in range(4): #定義四個線程
        t = threading.Thread(target=job,args=(data[i],q))
        t.start()
        threads.append(t) #把每一個線程append到線程列表中

分別join四個線程到主線程學習

for thread in threads:
        thread.join()

定義一個空列表results 將四個線程運行後保存在隊列中的結果返回給resultsui

results = []
    for _ in range(4):
        results.append(q.get()) #q.get()按順序從q中拿出一個值
    print(results)

完整代碼:線程

#導入線程 隊列的標準模塊
import threading
import time
from queue import Queue

#定義一個被多線程調用的函數
def job(l,q):
    for i in range(len(l)):
        l[i] = l[i]**2
    q.put(l)
#定義一個多線程函數
def mulithreading():
    q = Queue() #q中存放返回值 代替return的返回值
    threads = []
    data = [[1,2,3],[3,4,5],[4,4,4],[5,5,5]]
    for i in range(4): #定義四個線程
        t = threading.Thread(target=job,args=(data[i],q))
        t.start()
        threads.append(t) #把每一個線程append到線程列表中
    #分別join四個線程到主線程
    for thread in threads:
        thread.join()
    #定義一個空列表results 將四個線程運行後保存在隊列中的結果返回給results
    results = []
    for _ in range(4):
        results.append(q.get()) #q.get()按順序從q中拿出一個值
    print(results)
if __name__ == '__main__':
    mulithreading()

#輸出
[[1, 4, 9], [9, 16, 25], [16, 16, 16], [25, 25, 25]]

1.4 GIL 不必定有效率

python 的多線程 threading 有時候並非特別理想. 最主要的緣由是就是, Python 的設計上, 有一個必要的環節, 就是 Global Interpreter Lock (GIL). 這個東西讓 Python 仍是一次性只能處理一個東西.

import threading
from queue import Queue
import copy
import time

def job(l, q):
    res = sum(l)
    q.put(res)

def multithreading(l):
    q = Queue()
    threads = []
    for i in range(4):
        t = threading.Thread(target=job, args=(copy.copy(l), q), name='T%i' % i)
        t.start()
        threads.append(t)
    [t.join() for t in threads]
    total = 0
    for _ in range(4):
        total += q.get()
    print(total)

def normal(l):
    total = sum(l)
    print(total)

if __name__ == '__main__':
    l = list(range(10000000))
    s_t = time.time()
    normal(l*4)
    print('normal: ',time.time()-s_t)
    s_t = time.time()
    multithreading(l)
    print('multithreading: ', time.time()-s_t)
#輸出
199999980000000
normal:  1.7343778610229492
199999980000000
multithreading:  2.218825340270996

程序 threading 和 Normal 運行了同樣屢次的運算. 可是咱們發現 threading 卻沒有快多少, 按理來講, 咱們預期會要快3-4倍, 由於有創建4個線程, 可是並無. 這就是其中的 GIL 在做怪.

1.5 線程鎖

不使用鎖

import threading

def job1():  #全局變量A的值每次加1,循環10次
    global A
    for i in range(10):
        A += 1
        print('job1',A)

def job2(): #全局變量A的值每次加10,循環10次
    global A
    for i in range(10):
        A += 10
        print('job2',A)

if __name__ == '__main__':
    A = 0
    t1 = threading.Thread(target=job1)
    t2 = threading.Thread(target=job2)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

#輸出 打印結果很是混亂
job1 1
job1 2
job1 3
job1 4
job2 14
job1 15
job2 25
job1 26
job2 36
job1 37
job2 47
job1 48
job2 58
job1 59
job2 69
job1 70
job2 80
job2 90
job2 100
job2 110

使用鎖
lock在不一樣線程使用同一共享內存時,可以確保線程之間互不影響,使用lock的方法是, 在每一個線程執行運算修改共享內存以前,執行lock.acquire()將共享內存上鎖, 確保當前線程執行時,內存不會被其餘線程訪問,執行運算完畢後,使用lock.release()將鎖打開, 保證其餘的線程可使用該共享內存。

import threading

def job1():
    global A;lock = threading.Lock()
    lock.acquire()
    for i in range(10):
        A += 1
        print('job1',A)
    lock.release()

def job2():
    global A;lock = threading.Lock()
    lock.acquire()
    for i in range(10):
        A += 10
        print('job2',A)
    lock.release()

if __name__ == '__main__':
    A = 0
    t1 = threading.Thread(target=job1)
    t2 = threading.Thread(target=job2)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

#輸出  使用lock後 執行完一個線程後再執行另外一個線程。使用lock和不使用lock,最後打印輸出的結果是不一樣的。
job1 1
job1 2
job1 3
job1 4
job1 5
job1 6
job1 7
job1 8
job1 9
job1 10
job2 20
job2 30
job2 40
job2 50
job2 60
job2 70
job2 80
job2 90
job2 100
job2 110

2 Multiprocessing

多進程 Multiprocessing 和多線程 threading 相似, 都是在 python 中用來並行運算的。不過既然有了 threading, 爲何 Python 還要出一個 multiprocessing 呢? 由於要用來彌補 threading 的一些劣勢, 好比在 threading 教程中提到的GIL, python 把 multiprocessing 和 threading 的使用方法作的幾乎差很少,使用多線程發揮電腦多核系統的威力。

2.1添加Process

#導入線程進程標準模塊
import multiprocessing as mp
import threading as td

#定義一個被線程和進程調用的函數
def job(a,d):
    print('AA')

#建立線程和進程
t1=td.Thread(target=job,args=(1,2))
p1=mp.Process(target=job,args=(1,2))

#啓動線程和進程
t1.start()
p1.start()

#鏈接線程和進程
t1.join()
p1.join()

#能夠看出線程和進程的使用方式類似

完整代碼

#導入線程進程標準模塊
import multiprocessing as mp
import threading as td

#定義一個被進程調用的函數
def job(a,d):
    print('AA')

if __name__ == '__main__':
    p1 = mp.Process(target=job, args=(1, 2)) #建立進程
    p1.start() #啓動進程
    p1.join()  #鏈接進程
#輸出
AA

2.2 存儲進程輸出 Queue

Queue的功能是將每一個核或線程的運算結果放在隊裏中, 等到每一個線程或核運行完畢後再從隊列中取出結果, 繼續加載運算。由於多線程調用的函數不能有返回值, 因此使用Queue存儲多個線程運算的結果。
定義一個被多線程調用的函數,q 就像一個隊列,用來保存每次函數運行的結果

#定義一個多線程調用函數
def job(q): #注:該函數沒有返回值
    res = 0
    for i in range(1000):
        res += i+i**2+i**3
    q.put(res) #queue

定義兩個線程函數,用來處理同一個任務, args 的參數只要一個值的時候,參數後面須要加一個逗號,表示args是可迭代的,後面可能還有別的參數,不加逗號會出錯

p1 = mp.Process(target=job,args=(q,))
p2 = mp.Process(target=job,args=(q,))

完整代碼實現

import multiprocessing as mp

#定義一個多線程調用函數
def job(q): #注:該函數沒有返回值
    res = 0
    for i in range(1000):
        res += i+i**2+i**3
    q.put(res) #queue
if __name__ == '__main__':
    q=mp.Queue()  #定義一個多線程隊列 存儲結果
    p1 = mp.Process(target=job, args=(q,))
    p2 = mp.Process(target=job, args=(q,))
    p1.start() #啓動線程  分兩批處理
    p2.start()
    p1.join() #鏈接線程
    p2.join()
    res1=q.get() #分兩批輸出 將結果分別保存
    res2=q.get()
    print(res1+res2)
#輸出
499667166000

2.3效率對比

對比下多進程,多線程和什麼都不作時的消耗時間,看看哪一種方式更有效率。

import multiprocessing as mp
def job(q):
    res=0
    for i in range(1000000):
        res += i + i**2 + i**3
    q.put(res)

#因爲多進程是多核運算 多進程代碼命名爲multicore()
def multicore():
    q = mp.Queue()
    p1 = mp.Process(target=job, args=(q,))
    p2 = mp.Process(target=job, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get()
    res2 = q.get()
    print('multicore:',res1 + res2)

#建立多線程
import threading as td
def multithread():
    q = mp.Queue() # thread可放入process一樣的queue中
    t1 = td.Thread(target=job, args=(q,))
    t2 = td.Thread(target=job, args=(q,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    res1 = q.get()
    res2 = q.get()
    print('multithread:', res1 + res2)

#建立普通函數
def normal():
    res = 0
    for _ in range(2):
        for i in range(1000000):
            res += i + i**2 + i**3
    print('normal:', res)

import time
if __name__ == '__main__':
    st = time.time()
    normal()
    st1 = time.time()
    print('normal time:', st1 - st)
    multithread()
    st2 = time.time()
    print('multithread time:', st2 - st1)
    multicore()
    print('multicore time:', time.time() - st2)

#輸出
normal: 499999666667166666000000
normal time: 1.6875250339508057
multithread: 499999666667166666000000
multithread time: 3.1562907695770264
multicore: 499999666667166666000000
multicore time: 1.0937612056732178

此次運行時間依然是:多進程 < 普通 < 多線程。 發現多核/多進程最快,說明在同時間運行了多個任務。 而多線程的運行時間竟然比什麼都不作的程序還要慢一點,說明多線程仍是有短板。

2.4 進程池Pool

進程池就是將所要運行的東西,放到池子裏,Python會自行解決多進程的問題
2.4.1 進程池Pool()和map()

#定義一個Pool
pool = mp.Pool()

有了池子以後,就可讓池子對應某一個函數,向池子裏丟數據,池子就會返回函數返回的值。 Pool和以前的Process的不一樣點是丟向Pool的函數有返回值,而Process的沒有返回值。
接下來用map()獲取結果,在map()中須要放入函數和須要迭代運算的值,而後它會自動分配給CPU核,返回結果 res = pool.map(job, range(10))

import multiprocessing as mp

def job(x):
    return x*x

def multicore():
    pool = mp.Pool()
    res = pool.map(job,range(10))
    print(res)

if __name__ == '__main__':
    multicore()

#輸出
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

2.4.2 自定義核數量
怎麼知道Pool是否真的調用了多個核呢?能夠把迭代次數增大些,而後打開CPU負載看下CPU運行狀況
打開CPU負載:活動監視器 > CPU > CPU負載(單擊一下便可)
Pool默認大小是CPU的核數,咱們也能夠經過在Pool中傳入processes參數便可自定義須要的核數量,

def multicore():
    pool = mp.Pool(processes=3) # 定義CPU核數量爲3
    res = pool.map(job, range(10))
    print(res)

2.4.3 apply_async()
Pool除了map()外,還有能夠返回結果的方式,就是apply_async()。apply_async()中只能傳遞一個值,它只會放入一個核進行運算,可是傳入值時要注意是可迭代的,因此在傳入值後須要加逗號, 同時須要用get()方法獲取返回值

def multicore():
    pool = mp.Pool()
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    # 用get得到結果
    print(res.get())

#運行結果
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
4

2.4.4 apply_async()輸出多個結果
在apply_async()中多傳入幾個值
res = pool.apply_async(job, (2,3,4,)) #報錯 TypeError: job() takes exactly 1 argument (3 given) 即apply_async()只能輸入一組參數。
將apply_async() 放入迭代器中,定義一個新的multi_res
multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
取出值時須要一個一個取出來
print([res.get() for res in multi_res])
合併代碼

def multicore():
    pool = mp.Pool()
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    # 用get得到結果
    print(res.get())
    # 迭代器,i=0時apply一次,i=1時apply一次等等
    multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
    # 從迭代器中取出
    print([res.get() for res in multi_res])
#運行結果

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]  # map()
4
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # multi_res
  • Pool默認調用是CPU的核數,傳入processes參數可自定義CPU核數
  • map() 放入迭代參數,返回多個結果
  • apply_async()只能放入一組參數,並返回一個結果,若是想獲得map()的效果須要經過迭代

2.5 共享內存 shared memory

2.5.1 Shared Value
使用Value數據存儲在一個共享的內存表中。

import multiprocessing as mp

value1 = mp.Value('i', 0)
value2 = mp.Value('d', 3.14)

其中d和i參數用來設置數據類型的,d表示一個雙精浮點類型,i表示一個帶符號的整型
2.5.2 Shared Array
在Python的mutiprocessing中,有還有一個Array類,能夠和共享內存交互,來實如今進程之間共享數據
array = mp.Array('i', [1, 2, 3, 4])
這裏的Array和numpy中的不一樣,它只能是一維的,不能是多維的。一樣和Value 同樣,須要定義數據形式,不然會報錯。

2.6 進程鎖

2.6.1 不加鎖

import multiprocessing as mp

def job(x):
    return x*x

def multicore():
    pool = mp.Pool()
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    # 用get得到結果
    print(res.get())
    # 迭代器,i=0時apply一次,i=1時apply一次等等
    multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
    # 從迭代器中取出
    print([res.get() for res in multi_res])

if __name__ == '__main__':
    multicore()
#輸出
1
4
5
8
9
12
13
16
17
20

上面代碼中定義了一個共享變量v,兩個進程均可以對它進行操做。 在job()中咱們想讓v每隔0.1秒輸出一次累加num的結果,可是在兩個進程p1和p2 中設定了不一樣的累加值。因此來看下這兩個進程是否會出現衝突。
2.6.2 加鎖

import multiprocessing as mp
import time

def job(v, num, l):
    l.acquire() # 鎖住
    for _ in range(5):
        time.sleep(0.1)
        v.value += num # 獲取共享內存
        print(v.value)
    l.release() # 釋放

def multicore():
    l = mp.Lock() # 定義一個進程鎖
    v = mp.Value('i', 0) # 定義共享內存
    p1 = mp.Process(target=job, args=(v,1,l)) # 須要將lock傳入
    p2 = mp.Process(target=job, args=(v,3,l))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    multicore()
#運行一下,看看是否還會出現搶佔資源的狀況

1
2
3
4
5
8
11
14
17
20

運行結果顯示,進程鎖保證了進程p1的完整運行,而後才進行了進程p2的運行

相關文章
相關標籤/搜索