python--再看並行之協程線程進程

一、gevent協程適合I/O密集,不適合CPU密集。

三、gevent協程沒法發揮多核優點,事實上,協程只是以單線程的方式在運行。

三、子程序就是協程的一種特例

項目實際應用

from gevent import monkey
from gevent.monkey import patch_all
from gevent.pool import Pool
import time

def lr_classify():
    print('lr_classify')
    
def rf_classify():
    print('rf_classify')
    
def svm_classify():
    print('svm_classify')

def decisionTree_classify():
    print('decisionTree_classify')
    
def gbt_classify():
    print('gbt_classify')
    
def naive_bayes_classify():
    print('naive_bayes_classify')

model_dict = {'logistic':lr_classify
              ,'random_forest':rf_classify
              ,'linear_svm':svm_classify
              ,'decision_tree':decisionTree_classify
              ,'gbt':gbt_classify
              ,'naive_bayes':naive_bayes_classify}

def get_task(names):
    return [task for name,task in model_dict.items() if name in names]
#     return model_dict.get(name,'no such model')


start = time.time()

model_list = ['logistic','gbt', 'naive_bayes', 'linear_svm', 'decision_tree', 'random_forest']

names = model_dict.keys()&(set(model_list))

tasks = get_task(names)
p = Pool(6)

# jobs =  [i for i in range(10)]
for task in tasks:
    p.spawn(task)
# for task in tasks:
#     p.apply_async(task,(None,))
#     p.spawn
p.join()
end = time.time()
print('cost {} seconds in total...'.format((end-start)))

#輸出結果以下:
lr_classify
svm_classify
gbt_classify
naive_bayes_classify
decisionTree_classify
rf_classify
cost 0.0018892288208007812 seconds in total...

線程進程協程比較

# 多線程
import threading
import time

def loop_5(interval):
    for i in range(5):
        print('loop_5: ',i)
        time.sleep(interval)

def loop_10(interval):
    for i in range(10):
        print('loop_10: ',i)
        time.sleep(interval)
        
if __name__ == '__main__':
    print('start...:')
    start = time.time()
    threads = []
    tasks = [loop_5,loop_10]
    for task in tasks:
        t = threading.Thread(target=task,args=(1,))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    end = time.time()
    print('end...and cost {} seconds in total...'.format((end-start)))

# 輸出以下
start...:
loop_5: loop_10:   00

loop_5: loop_10:  1
 1
loop_10:  2
loop_5:  2
loop_10:  3
loop_5:  3
loop_10:  4
loop_5:  4
loop_10:  5
loop_10:  6
loop_10:  7
loop_10:  8
loop_10:  9
end...and cost 10.02077603340149 seconds in total...

#多進程

from multiprocessing  import Pool
import time

def loop_5(interval):
    for i in range(5):
        print('loop_5: ',i)
        time.sleep(interval)
        
def loop_10(interval):
    for i in range(10):
        print('loop_10: ',i)
        time.sleep(interval)
        
if __name__ == '__main__':
    print('start ...')
    start = time.time()
    p = Pool(2)
    tasks = [loop_5,loop_10]
    for task in tasks:
        p.apply_async(task,args=(1,))
    p.close()
    p.join()    
    end = time.time()
    print('end...cost {} seconds in total...'.format((end-start)))

# 然而,發現多進程仍然耗費 10 秒左右,不難理解啊,由於loop_5雖然跟10交替跑,可是仍是要等待10跑完纔會主進程結束啊。。。
start ...
loop_5:  0
loop_10:  0
loop_5:  1
loop_10:  1
loop_5:  2
loop_10:  2
loop_5:  3
loop_10:  3
loop_5:  4
loop_10:  4
loop_10:  5
loop_10:  6
loop_10:  7
loop_10:  8
loop_10:  9
end...cost 10.231749534606934 seconds in total...

### 不使用進程池

from multiprocessing  import Process
import time

def loop_5(interval):
    for i in range(5):
        print('loop_5: ',i)
        time.sleep(interval)
        
def loop_10(interval):
    for i in range(10):
        print('loop_10: ',i)
        time.sleep(interval)
        
if __name__ == '__main__':
    print('start ...')
    start = time.time()
    tasks = [loop_5,loop_10]
    processes = []
    for task in tasks:
        p = Process(target=task,args=(1,))
        processes.append(p)
        p.start()
    for p in processes:
        p.join()
    end = time.time()
    print('end...cost {} seconds in total...'.format((end-start)))

#輸出結果以下:
start ...
loop_5:  0
loop_10:  0
loop_5:  1
loop_10:  1
loop_5:  2
loop_10:  2
loop_5:  3
loop_10:  3
loop_5:  4
loop_10:  4
loop_10:  5
loop_10:  6
loop_10:  7
loop_10:  8
loop_10:  9
end...cost 10.139782667160034 seconds in total...

協程進行文件複製

from gevent.pool import Pool
import time

def copy_file(src,target):
    with open(src,'r') as fr:
        with open(target,'w') as fw:
            for line in fr:
                fw.write(line)
                
if __name__ == '__main__':
    print('start...')
    start = time.time()
    p = Pool(6)
    
    args = [('./test.py','./test2.py'),('./test.py','./test3.py')]
    
    for arg in args:
        p.spawn(copy_file,*arg)
    p.join()
    end = time.time()
    print('cost {} seconds in total...'.format((end-start)))

多線程進行文件複製

import threading
import asyncio
import time
def copy(src,tar):
    print('{} start...'.format(threading.current_thread().name))
    with open(src,'rb') as binFileInputStream:
        with open(tar,'wb') as binFileOutputStream:
            binFileOutputStream.write(binFileInputStream.read())
    time.sleep(10)
    print('{} end...'.format(threading.current_thread().name))
    
threads = []
args = [('./test.py','./hella.py'),('./diabetes.csv','./diabetes.py')]
for i,arg in enumerate(args):
    t = threading.Thread(target=copy,args=arg,name='thread-{}'.format(i))
    threads.append(t)
for thread in threads:
    thread.start()
    
for thread in threads:
    thread.join()

多進程

from multiprocessing import Pool
import time
import random
import os

def copy_file_multiprocess(src,target):
    with open(src,'rb') as fr:
        with open(target,'wb') as fw:
            print('{} start to copy file...'.format(os.getpid()))
            fw.write(fr.read())
            time.sleep(random.random()*3)
            print('pid {} finished...'.format(os.getpid()))

if __name__ == '__main__':
    p = Pool(3)
    args = [('./config.txt','./babao/config.txt'),('./config.txt','./babao/config2.txt'),('./config.txt','./babao/config3.txt')]
    for i in range(3):
        p.apply_async(copy_file_multiprocess,args[i])
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All suprocesses finished!')

擴展知識

Python多線程編程時常常會用到join()和setDaemon()方法,基本用法以下:

join([time]): 等待至線程停止。這阻塞調用線程直至線程的join() 方法被調用停止-正常退出或者拋出未處理的異常-或者是可選的超時發生。
setDaemon,將該線程標記爲守護線程或用戶線程
 
一、join ()方法:主線程A中,建立了子線程B,而且在主線程A中調用了B.join(),那麼,主線程A會在調用的地方等待,直到子線程B完成操做後,才能夠接着往下執行,那麼在調用這個線程時可使用被調用線程的join方法。
原型:join([timeout]),裏面的參數時可選的,表明線程運行的最大時間,即若是超過這個時間,無論這個此線程有沒有執行完畢都會被回收,而後主線程或函數都會接着執行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import threading
import time
 
 
class MyThread(threading.Thread):
    def __init__(self, id):
        threading.Thread.__init__(self)
        self.id = id
 
    def run(self):
        x = 0
        time.sleep(10)
        print(self.id)
        print('線程結束:'+str(time.time()))
 
if __name__ == "__main__":
    t1 = MyThread(999)
    print('線程開始:'+str(time.time()))
    t1.start()
    print('主線程打印開始:'+str(time.time()))
    for i in range(5):
        print(i)
    time.sleep(2)
    print('主線程打印結束:' + str(time.time()))
線程開始:1497534590.2784667
主線程打印開始:1497534590.2794669
0
1
2
3
4
主線程打印結束:1497534592.279581
999
線程結束:1497534600.2800388

從打印結果可知,線程t1 start後,主線程並無等線程t1運行結束後再執行,而是在線程執行的同時,執行了後面的語句。

 

如今,把join()方法加到啓動線程後面(其餘代碼不變)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import threading
import time
 
 
class MyThread(threading.Thread):
    def __init__(self, id):
        threading.Thread.__init__(self)
        self.id = id
 
    def run(self):
        x = 0
        time.sleep(10)
        print(self.id)
        print('線程結束:'+str(time.time()))
 
if __name__ == "__main__":
    t1 = MyThread(999)
    print('線程開始:'+str(time.time()))
    t1.start()
    t1.join()
    print('主線程打印開始:'+str(time.time()))
    for i in range(5):
        print(i)
    time.sleep(2)
    print('主線程打印結束:' + str(time.time()))
線程開始:1497535176.5019968
999
線程結束:1497535186.5025687
主線程打印開始:1497535186.5025687
0
1
2
3
4
主線程打印結束:1497535188.5026832

線程t1 start後,主線程停在了join()方法處,等子線程t1結束後,主線程繼續執行join後面的語句。 

 

二、setDaemon()方法。主線程A中,建立了子線程B,而且在主線程A中調用了B.setDaemon(),這個的意思是,把主線程A設置爲守護線程,這時候,要是主線程A執行結束了,就無論子線程B是否完成,一併和主線程A退出.這就是setDaemon方法的含義,這基本和join是相反的。此外,還有個要特別注意的:必須在start() 方法調用以前設置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import threading
import time
 
 
class MyThread(threading.Thread):
    def __init__(self, id):
        threading.Thread.__init__(self)
        self.id = id
 
    def run(self):
        x = 0
        time.sleep(10)
        print(self.id)
        print("This is:" + self.getName()) # 獲取線程名稱
        print('線程結束:' + str(time.time()))
 
if __name__ == "__main__":
    t1 = MyThread(999)
    print('線程開始:'+str(time.time()))
    t1.setDaemon(True)
    t1.start()
    print('主線程打印開始:'+str(time.time()))
    for i in range(5):
        print(i)
    time.sleep(2)
    print('主線程打印結束:' + str(time.time())) 
線程開始:1497536678.8509264
主線程打印開始:1497536678.8509264
0
1
2
3
4
主線程打印結束:1497536680.8510408

 

t1.setDaemon(True)的操做,將子線程設置爲了守護線程。根據setDaemon()方法的含義,父線程打印內容後便結束了,無論子線程是否執行完畢了。

若是在線程啓動前沒有加t1.setDaemon(True),輸出結果爲:

線程開始:1497536865.3215919
主線程打印開始:1497536865.3215919
0
1
2
3
4
主線程打印結束:1497536867.3217063
999
This is:Thread-1
線程結束:1497536875.3221638

程序運行中,執行一個主線程,若是主線程又建立一個子線程,主線程和子線程就分兵兩路,分別運行,那麼當主線程完成想退出時,會檢驗子線程是否完成,若是子線程未完成,則主線程會等待子線程完成後再退出;

有時咱們須要的是,子線程運行完,才繼續運行主線程,這時就能夠用join方法(在線程啓動後面);

可是有時候咱們須要的是,只要主線程完成了,無論子線程是否完成,都要和主線程一塊兒退出,這時就能夠用setDaemon方法(在線程啓動前面)。

這裏一個坑 (pool.map 中不能使用匿名函數), 就是 lambda 沒法被 pickle,可見匿名函數並非處處通用 ,正確的 下面再貼出

# -*- coding: utf-8 -*-
__author__ = 'Frank Li'
from functools import wraps,reduce
import threading as td
import multiprocessing as mp
from multiprocessing import Queue
from multiprocessing import Pool
import time

def time_count(func):
    @wraps(func)
    def inner(*args,**kw):
        start = time.time()
        result = func(*args,**kw)
        end = time.time()
        num = kw.get('num','default')
        print('func: {}-{} cost {:.2f}s\n'.format(func.__name__,num,end-start))# 這裏能夠插入日誌
        return result
    return inner

@time_count
def multicore(data,q,**kw):
    q.put(reduce(lambda x,y:x+y,data))

@time_count
def multi_thread(data,q,**kw):
    q.put(reduce(lambda x, y: x + y, data))

@time_count
def main_process():
    q = Queue()
    datas = [range(10**5),range(10**6),range(10**7),range(10**8)]
    processes = []
    for i,data in enumerate(datas):
        p = mp.Process(target=multicore,name='process-{}'.format(i),args=(data,q),kwargs={'num':i})
        p.start()
        processes.append(p)
    for p in processes:
        p.join()

    sum = 0
    for n in range(len(datas)):
        sum += q.get()
    print('main_process sum result: {}\n'.format(sum))

@time_count
def main_thread():
    q = Queue()
    datas = [range(10 ** 5), range(10 ** 6), range(10 ** 7), range(10 ** 8)]
    threads = []
    for i,data in enumerate(datas):
        t = td.Thread(target=multi_thread,name='thread-{}'.format(i),args=(data,q),kwargs={'num':i})
        t.start()
        threads.append(t)

    for t in threads:
        t.join()

    sum = 0
    for n in range(len(datas)):
        sum += q.get()
    print('main_thread sum result: {}\n'.format(sum))

def another_multi_process_sub(data):
    return reduce(lambda x, y: x + y, data)

@time_count
def another_multi_process():
    pool = Pool(processes=2)
    datas = [range(10 ** 5), range(10 ** 6), range(10 ** 7), range(10 ** 8)]
    results = []
    for data in datas:
        res = pool.map(lambda data:lambda x,y:x+y,(data,))
        results.extend(res)
    sum = reduce(lambda x,y:x+y,results)
    print('another_multi_process sum result: {}'.format(sum))

@time_count
def multi_process_pool():
    pool = Pool(processes=2)
    datas = [range(10 ** 5), range(10 ** 6), range(10 ** 7), range(10 ** 8)]
    dataset= []
    sum = reduce(lambda x,y:x+y,[ pool.apply_async(another_multi_process_sub,(data,)).get() for data in datas])
    print('multi_process_pool sum result: {}'.format(sum))

def main():
    # main_process()
    # main_thread()
    another_multi_process()
    # multi_process_pool()

@time_count
def single_process():
    datas = [range(10 ** 5), range(10 ** 6), range(10 ** 7), range(10 ** 8)]
    sum = 0
    for data in datas:
        sum += reduce(lambda x,y:x+y,data)
    print('\n single process sum result: {}'.format(sum))

if __name__ == '__main__':
    main()
    single_process()

對比多進程 多線程 效率 , 總結 優先使用順序 (多進程+協程>多進程 >= 單進程多線程 >=單進程單線程)

# -*- coding: utf-8 -*-
__author__ = 'Frank Li'
from functools import wraps,reduce
import threading as td
import multiprocessing as mp
from multiprocessing import Queue
from multiprocessing import Pool
import time

def time_count(func):
    @wraps(func)
    def inner(*args,**kw):
        start = time.time()
        result = func(*args,**kw)
        end = time.time()
        num = kw.get('num','default')
        print('func: {}-{} cost {:.2f}s\n'.format(func.__name__,num,end-start))# 這裏能夠插入日誌
        return result
    return inner

@time_count
def multicore(data,q,**kw):
    q.put(reduce(lambda x,y:x+y,data))

@time_count
def multi_thread(data,q,**kw):
    q.put(reduce(lambda x, y: x + y, data))

@time_count
def main_process():
    q = Queue()
    datas = [range(10**5),range(10**6),range(10**7),range(10**8)]
    processes = []
    for i,data in enumerate(datas):
        p = mp.Process(target=multicore,name='process-{}'.format(i),args=(data,q),kwargs={'num':i})
        p.start()
        processes.append(p)
    for p in processes:
        p.join()

    sum = 0
    for n in range(len(datas)):
        sum += q.get()
    print('main_process sum result: {}\n'.format(sum))

@time_count
def main_thread():
    q = Queue()
    datas = [range(10 ** 5), range(10 ** 6), range(10 ** 7), range(10 ** 8)]
    threads = []
    for i,data in enumerate(datas):
        t = td.Thread(target=multi_thread,name='thread-{}'.format(i),args=(data,q),kwargs={'num':i})
        t.start()
        threads.append(t)

    for t in threads:
        t.join()

    sum = 0
    for n in range(len(datas)):
        sum += q.get()
    print('main_thread sum result: {}\n'.format(sum))

def another_multi_process_sub(data):
    return reduce(lambda x, y: x + y, data)

@time_count
def another_multi_process():
    pool = Pool(processes=2)
    datas = [range(10 ** 5), range(10 ** 6), range(10 ** 7), range(10 ** 8)]
    results = []
    for data in datas:
        res = pool.map(another_multi_process_sub,(data,))
        results.extend(res)
    sum = reduce(lambda x,y:x+y,results)
    print('another_multi_process sum result: {}'.format(sum))

@time_count
def multi_process_pool():
    pool = Pool(processes=2)
    datas = [range(10 ** 5), range(10 ** 6), range(10 ** 7), range(10 ** 8)]
    dataset= []
    sum = reduce(lambda x,y:x+y,[ pool.apply_async(another_multi_process_sub,(data,)).get() for data in datas])
    print('multi_process_pool sum result: {}'.format(sum))

def main():
    main_process()
    main_thread()
    another_multi_process()
    multi_process_pool()

@time_count
def single_process():
    datas = [range(10 ** 5), range(10 ** 6), range(10 ** 7), range(10 ** 8)]
    sum = 0
    for data in datas:
        sum += reduce(lambda x,y:x+y,data)
    print('\n single process sum result: {}'.format(sum))

if __name__ == '__main__':
    main()
    single_process()

下面結果 可能跟我電腦有其餘進程沒有關閉有關係,只是貼出一下,這個結果仁者見仁智者見智

# 結果以下 :

func: multicore-0 cost 0.06s

func: multicore-1 cost 0.50s

func: multicore-2 cost 3.51s

func: multicore-3 cost 27.56s

main_process sum result: 5050504944450000

func: main_process-default cost 28.36s

func: multi_thread-0 cost 0.08s

func: multi_thread-1 cost 0.67s

func: multi_thread-2 cost 5.00s

func: multi_thread-3 cost 25.31s

main_thread sum result: 5050504944450000

func: main_thread-default cost 25.36s

another_multi_process sum result: 5050504944450000
func: another_multi_process-default cost 24.31s

multi_process_pool sum result: 5050504944450000
func: multi_process_pool-default cost 25.84s


single process sum result: 5050504944450000
func: single_process-default cost 25.58s

多核cpu 共享內存 ,進程通訊安全問題, p1, p2 都有可能 搶到資源,一個搶到 一頓執行完事兒,第二個才能接着執行

# -*- coding: utf-8 -*-
__author__ = 'Frank Li'

import multiprocessing as mp
import time
'''
進程間共享內存,加鎖
'''

def add_num(v,num,num_lock):
    with num_lock:
        for _ in range(10):
            v.value+=num
            time.sleep(0.1)
            print(v.value)

if __name__ == '__main__':
    v = mp.Value('i',0)  ### 還有一個 mp.Array 
    num_lock = mp.Lock()
    p1 = mp.Process(target=add_num,args=(v,1,num_lock))
    p2 = mp.Process(target=add_num,args=(v,3,num_lock))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

線程 操做共享變量 安全 加鎖

# -*- coding: utf-8 -*-
__author__ = 'Frank Li'
import threading

def thread_job1():
    print('current threading... {}'.format(threading.current_thread()))
    global A,lock
    lock.acquire()
    for i in range(10):
        A+=1
        print('jb1 A value: {}'.format(A))
    lock.release()

def thread_job2():
    print('current threading... {}'.format(threading.current_thread()))
    global A,lock
    lock.acquire()
    for j in range(20):
        A+=5
        print('job2 value of A: {}'.format(A))
    lock.release()

def main():
    global A,lock
    A = 0
    lock = threading.Lock()

    t1 = threading.Thread(target=thread_job1,name='thread-1')
    t2 = threading.Thread(target=thread_job2,name='thread-2')
    t1.start()
    t2.start()
    t1.join()
    t2.join()

if __name__ == '__main__':
    main()

多線程

# -*- coding: utf-8 -*-
__author__ = 'Frank Li'
import threading
from threading import Thread
from queue import Queue

def thread_job(q,l=None):
    print(threading.current_thread())
    for i in range(len(l)):
        l[i] = l[i]**2
    q.put(l)

def multithreading(datas=None):
    q = Queue()
    threads = [] # 線程 列表
    # print(datas)
    for i,data in enumerate(datas):
        print(data)
        thread = Thread(target=thread_job,name='thread{}'.format(i),args=(q,data))
        thread.start()
        threads.append(thread)
    for thread in threads:
        thread.join()
    result = []

    for _ in range(len(datas)):
        result.append(q.get())
    print(result)

if __name__ == '__main__':
    datas = [[1,2],[3,4],[5,6],[7,8]]
    multithreading(datas)
相關文章
相關標籤/搜索